# COVID19 all cases in Brazil

Here, publicly available data for all cases with flu like disease in Brazil who went through the healthcare system is extracted from an Elastic API.

> **Data source**: all the data has been taken from the Brazilian Ministry of Health https://opendatasus.saude.gov.br/organization/ministerio-da-saude
>
>It requires translation from Portuguese to English

This script is modified compared to the version I used on Azure: this version can be run on a "standard" local computer, the data is splitted 1.000.000 cases by parquet file, the data also being splitted by federal state. 

The aim is to spare memory ressources and disk space.

In [None]:
import pandas as pd
from elasticsearch import Elasticsearch, exceptions, RequestsHttpConnection
import elasticsearch.helpers
from requests.auth import HTTPBasicAuth
from elasticsearch import ConnectionTimeout
from urllib.error import HTTPError
from whdh_py import DataLakeClient
import json
import multiprocessing as mp
import subprocess
import os

In [None]:
##### Input parameters #####
###List of abreviation designating Federative units of Brazil
federative_units = ['ac','al','ap','am','ba','ce','df','es','go','ma','mt','ms','mg','pa','pb','pr','pe','pi','rj','rn',
                    'rs','ro','rr','sc','to','se','sp']

In [None]:
### API call
es = Elasticsearch(hosts =["https://elasticsearch-saps.saude.gov.br"],
                   http_auth=('user-public-notificacoes','Za4qNXdyQNSa9YaA'),
                  connection_class=RequestsHttpConnection,use_ssl = True,verify_certs = True,
                  )

In [None]:
#####  Function to download non severe cases  #####
def download_nonsevere_cases(federative_units):
    ind_fu = 0
    while ind_fu < len(federative_units):
        fu = federative_units[ind_fu]
#query definition
        query= {"query":{"bool":{"should":[{"exists": {"field":"resultadoTeste"}},
                                       {"exists": {"field":"testes"}},
                                       {"exists":{"field":"classificacaoFinal"}}]}}}    
# Initialize the scroll
        try:
            page = es.search(index="desc-esus-notifica-estado-"+fu
                         ,http_auth=('user-public-notificacoes','Za4qNXdyQNSa9YaA')
                         ,scroll = '20m'
                         ,timeout='50m'
                         ,size = 10000
                         ,body=query
                        )
            sid = page['_scroll_id']
            scroll_size = page['hits']['total']['value']
            dump_list = page['hits']['hits']
            nber_part = 1
            nber_scroll = 2
        except (ConnectionTimeout):
            nber_scroll = 1
# Start scrolling
        if nber_scroll == 2:
            while (scroll_size > 0):
                try:
                    page = es.scroll(scroll_id = sid, scroll = '20m',request_timeout=50,
                                 http_auth=('user-public-notificacoes','Za4qNXdyQNSa9YaA'))
                    nber_scroll = nber_scroll+1
                    # Update the scroll ID
                    sid = page['_scroll_id']
                    # Get the number of results that we returned in the last scroll
                    scroll_size = len(page['hits']['hits'])
                    # Save data in parquet file
                    if scroll_size > 0:
                        dump_list.extend(page['hits']['hits'])
                        if len(dump_list) == 1000000:
                            df_data = pd.DataFrame(dump_list)
                            dump_list.clear()
                            df_data = df_data.drop(columns=['_index','_type','_id','_score'])
                            try:
                                df_output = pd.json_normalize(df_data['_source'])
                            except (AttributeError):
                                df_data['_source'] = df_data['_source'].apply(lambda x:eval(x))
                                df_output = pd.json_normalize(df_data['_source'])
                            del df_data
                            local_path='extracted_data_'+fu+'_part'+str(nber_part)+'.parquet'
                            df_output.to_parquet(local_path)
                            del df_output
                            nber_part += 1
                except (ConnectionTimeout):
                    nber_scroll = 1
                    scroll_size = 0
            if nber_scroll != 1:
                df_data = pd.DataFrame(dump_list)
                df_data = df_data.drop(columns=['_index','_type','_id','_score'])
                try:
                    df_output = pd.json_normalize(df_data['_source'])
                except (AttributeError):
                    df_data['_source'] = df_data['_source'].apply(lambda x:eval(x))
                    df_output = pd.json_normalize(df_data['_source'])
                del df_data
                local_path='extracted_data_'+fu+'_part'+str(nber_part)+'.parquet'
                df_output.to_parquet(local_path)
                del df_output
                ind_fu += 1

In [None]:
#####  Download non severe cases  #####
if __name__ == '__main__':
    ncores = mp.cpu_count()
    # multiprocessing pool object
    pool = mp.Pool(ncores)  
    # input list
    inputs = [federative_units]
    # map the function to the list and pass
    # function and input list as arguments
    pool.map(download_nonsevere_cases,inputs)
    pool.close()