In [1]:
import requests
import pandas as pd
import dotenv
import os
import datetime
import aiohttp
import asyncio
from tqdm.asyncio import tqdm
import psycopg2
import psycopg2.extensions
from psycopg2 import sql
import numpy as np

In [2]:
dotenv.load_dotenv()

True

### Definições para requisição http

Definição das variáveis para requisição http à api do github

In [3]:
# Configurações
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")  # Substitua pelo seu token
REPO_OWNER = "CSSEGISandData"
REPO_NAME = "COVID-19"
API_URL = f"https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/"
GITHUB_API_GRAPHQL_URL = "https://api.github.com/graphql"


HEADERS = {
    "Authorization": f"token {GITHUB_TOKEN}"
}

### Testando Limites de Requisição

Abaixo, bloco para testar quantas requisições faltam e quanto tempo falta para zerar o limite de requisições

In [4]:
response = requests.get("https://api.github.com/rate_limit", headers=HEADERS)

# Verifica o status da resposta
if response.status_code == 200:
    rate_limit = response.json()
    core = rate_limit['rate']
    limit = core['limit']
    remaining = core['remaining']
    reset_timestamp = core['reset']
    reset_time = datetime.datetime.fromtimestamp(reset_timestamp).strftime('%Y-%m-%d %H:%M:%S')

    print(f"Limite de requisições por hora: {limit}")
    print(f"Requisições restantes: {remaining}")
    print(f"Limite será zerado em: {reset_time}")
else:
    print(f"Erro ao verificar rate limit: {response.status_code}")

Limite de requisições por hora: 5000
Requisições restantes: 5000
Limite será zerado em: 2025-02-24 21:48:52


In [5]:
query = """
{
  rateLimit {
    limit
    remaining
    resetAt
  }
}
"""

response = requests.post(GITHUB_API_GRAPHQL_URL, json={'query': query}, headers=HEADERS)

if response.status_code == 200:
    rate_limit = response.json()['data']['rateLimit']
    limit = rate_limit['limit']
    remaining = rate_limit['remaining']
    reset_timestamp = rate_limit['resetAt']
    reset_time = datetime.datetime.strptime(reset_timestamp, "%Y-%m-%dT%H:%M:%SZ")

    print(f"Limite de requisições por hora: {limit}")
    print(f"Requisições restantes: {remaining}")
    print(f"Limite será zerado em: {reset_time.strftime('%Y-%m-%d %H:%M:%S')}")
else:
    print(f"Erro ao verificar rate limit: {response.status_code}")

Limite de requisições por hora: 5000
Requisições restantes: 4996
Limite será zerado em: 2025-02-25 01:31:54


### Listagem dos arquivos presentes no repositório

Bloco para listar todos os arquivos .csv presentes no repositório, da raiz às subpastas.

Foi utilizada a api graphql para conseguir obter a lista de arquivos com menos requisições, assim, economizando o limite de requisições por hora da api do github

In [6]:
# Query GraphQL para listar arquivos e subpastas
query = """
query ($caminho: String!) {
  repository(owner: "CSSEGISandData", name: "COVID-19") {
    object(expression: $caminho) {
      ... on Tree {
        entries {
          name
          type
          oid
        }
      }
    }
  }
}
"""

# Função recursiva para buscar arquivos CSV com caminho completo e URL de download
def listar_arquivos_recursivo(caminho="master:csse_covid_19_data"):
    arquivos = []
    variables = {"caminho": caminho}
    response = requests.post(GITHUB_API_GRAPHQL_URL, json={'query': query, 'variables': variables}, headers=HEADERS)
    
    if response.status_code == 200:
        dados = response.json()
        if 'data' in dados and dados['data']['repository']['object']:
            entradas = dados['data']['repository']['object']['entries']
            
            for item in entradas:
                # Se for subpasta, chamar recursivamente
                if item['type'] == 'tree':
                    subcaminho = caminho + "/" + item['name']
                    arquivos.extend(listar_arquivos_recursivo(subcaminho))  # Recurso recursivo
                # Se for CSV, adicionar à lista
                elif item['type'] == 'blob' and item['name'].endswith('.csv'):
                    caminho_sem_arquivo = caminho.replace("master:", "")
                    url_download = f"https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/{caminho_sem_arquivo}/{item['name']}"
                    arquivos.append({
                        "caminho_completo": caminho_sem_arquivo,
                        "nome_arquivo": item['name'],
                        "url_download": url_download
                    })
        else:
            print("Nenhum dado encontrado para o caminho:", caminho)
    else:
        print("Erro na requisição:", response.status_code, response.text)
    
    return arquivos

# Executar a função e armazenar em DataFrame
lista_csvs = listar_arquivos_recursivo()
df_csvs = pd.DataFrame(lista_csvs)

print("Arquivos CSV encontrados:")
display(df_csvs)



Arquivos CSV encontrados:


Unnamed: 0,caminho_completo,nome_arquivo,url_download
0,csse_covid_19_data,UID_ISO_FIPS_LookUp_Table.csv,https://raw.githubusercontent.com/CSSEGISandDa...
1,csse_covid_19_data/csse_covid_19_daily_reports,01-01-2021.csv,https://raw.githubusercontent.com/CSSEGISandDa...
2,csse_covid_19_data/csse_covid_19_daily_reports,01-01-2022.csv,https://raw.githubusercontent.com/CSSEGISandDa...
3,csse_covid_19_data/csse_covid_19_daily_reports,01-01-2023.csv,https://raw.githubusercontent.com/CSSEGISandDa...
4,csse_covid_19_data/csse_covid_19_daily_reports,01-02-2021.csv,https://raw.githubusercontent.com/CSSEGISandDa...
...,...,...,...
2207,csse_covid_19_data/csse_covid_19_time_series,time_series_covid19_confirmed_US.csv,https://raw.githubusercontent.com/CSSEGISandDa...
2208,csse_covid_19_data/csse_covid_19_time_series,time_series_covid19_confirmed_global.csv,https://raw.githubusercontent.com/CSSEGISandDa...
2209,csse_covid_19_data/csse_covid_19_time_series,time_series_covid19_deaths_US.csv,https://raw.githubusercontent.com/CSSEGISandDa...
2210,csse_covid_19_data/csse_covid_19_time_series,time_series_covid19_deaths_global.csv,https://raw.githubusercontent.com/CSSEGISandDa...


In [None]:
# Função para baixar um arquivo CSV mantendo a estrutura de diretórios
async def baixar_arquivo(session, url_download, caminho_completo, nome_arquivo):
    # Cria o caminho completo do diretório de destino
    pasta_destino = os.path.join("files", caminho_completo)
    os.makedirs(pasta_destino, exist_ok=True)
    caminho_arquivo = os.path.join(pasta_destino, nome_arquivo)
    
    # Se o arquivo já existe, pula o download
    if os.path.exists(caminho_arquivo):
        print(f"{nome_arquivo} já existe, pulando download.")
        return

    # Faz o download do arquivo
    async with session.get(url_download) as response:
        if response.status == 200:
            conteudo = await response.read()
            with open(caminho_arquivo, 'wb') as f:
                f.write(conteudo)
        else:
            print(f"Falha ao baixar {nome_arquivo}: {response.status}")

# Função principal para baixar todos os CSVs com estrutura de pastas
async def baixar_csvs_assincrono(arquivos):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for arquivo in arquivos:
            url_download = arquivo['url_download']
            caminho_completo = arquivo['caminho_completo']
            nome_arquivo = arquivo['nome_arquivo']
            tasks.append(baixar_arquivo(session, url_download, caminho_completo, nome_arquivo))
        
        # Executa as tasks com tqdm para mostrar o progresso
        for _ in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Baixando arquivos"):
            await _
            
# Executar a função para baixar os CSVs
await baixar_csvs_assincrono(df_csvs.to_dict(orient='records'))

In [None]:
print(os.getcwd())

In [8]:
# Diretório onde os arquivos CSV estão localizados
diretorio_csv = os.path.join(os.getcwd(),'files','csse_covid_19_data','csse_covid_19_daily_reports')

def processar_csvs(diretorio_csv):
    """
    Processa os arquivos CSV, adicionando a coluna 'date' e concatenando os dados.
    """
    dfs = []
    
    # Usando tqdm para mostrar o progresso ao ler os arquivos CSV
    for arquivo in tqdm(os.listdir(diretorio_csv), desc="Processando arquivos CSV", unit="arquivo"):
        if arquivo.endswith('.csv'):
            # Remover a extensão .csv e extrair a data
            arquivo_sem_extensao = arquivo.replace('.csv', '')
            data = arquivo_sem_extensao.split('-')
            data = f"{data[2]}-{data[0]}-{data[1]}"  # Formata como 'yyyy-mm-dd'
            
            # Carregar o CSV em um DataFrame
            df = pd.read_csv(os.path.join(diretorio_csv, arquivo))
            
            # Adicionar a coluna 'date'
            df['date'] = data
            
            # Adicionar à lista
            dfs.append(df)
    
    # Concatenar todos os DataFrames em um único
    df_final = pd.concat(dfs, ignore_index=True)
    return df_final

df_final = processar_csvs(diretorio_csv)

print("Arquivos concatenados com sucesso, incluindo a coluna de data!")

Processando arquivos CSV: 100%|██████████| 1143/1143 [00:38<00:00, 29.40arquivo/s]


Arquivos concatenados com sucesso, incluindo a coluna de data!


In [9]:
print(os.path.join(os.getcwd(),'files','final','daily_reports_concat.csv'))

dir = os.path.join(os.getcwd(),'files','final')
# Salvar o DataFrame final em um novo arquivo CSV
if not os.path.exists(dir):
    os.mkdir(dir)
    
caminho_csv_final = os.path.join(dir,'daily_reports_concat.csv')
    
# Abre o arquivo para escrita
with open(caminho_csv_final, mode='w', newline='') as file:
    # Escreve o cabeçalho uma única vez
    df_final.head(0).to_csv(file, index=False)
    
    # Salva em chunks com barra de progresso
    for i in tqdm(range(0, len(df_final), 10000), desc="Salvando CSV", unit="linhas"):
        df_final.iloc[i:i+10000].to_csv(file, header=False, index=False)

s:\git\otg\analista-bi\files\final\daily_reports_concat.csv


Salvando CSV: 100%|██████████| 429/429 [01:27<00:00,  4.88linhas/s]


In [31]:

print(df_final.columns)
print(df_final.head())

Index(['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update',
       'Lat', 'Long_', 'Confirmed', 'Deaths', 'Recovered', 'Active',
       'Combined_Key', 'Incident_Rate', 'Case_Fatality_Ratio', 'date',
       'Province/State', 'Country/Region', 'Last Update', 'Latitude',
       'Longitude', 'Incidence_Rate', 'Case-Fatality_Ratio'],
      dtype='object')
   FIPS Admin2 Province_State Country_Region          Last_Update       Lat  \
0   NaN    NaN            NaN    Afghanistan  2021-01-02 05:22:33  33.93911   
1   NaN    NaN            NaN        Albania  2021-01-02 05:22:33  41.15330   
2   NaN    NaN            NaN        Algeria  2021-01-02 05:22:33  28.03390   
3   NaN    NaN            NaN        Andorra  2021-01-02 05:22:33  42.50630   
4   NaN    NaN            NaN         Angola  2021-01-02 05:22:33 -11.20270   

       Long_  Confirmed  Deaths  Recovered  ...  Incident_Rate  \
0  67.709953    52513.0  2201.0    41727.0  ...     134.896578   
1  20.168300    58316.0

In [37]:
def tratar_dados(df):
    # Tratamento de NaN e NaT
    for col in tqdm(df.columns, desc='Tratando dados'):
        if df[col].dtype == 'object':
            df[col] = df[col].replace('NaT', np.nan)
        elif df[col].dtype == 'datetime64[ns]':
            df[col] = pd.to_datetime(df[col], errors='coerce')
    # Convertendo colunas de data para o formato adequado
    df['Last_Update'] = pd.to_datetime(df['Last_Update'], errors='coerce', format='%m/%d/%Y %H:%M')
    df['date'] = pd.to_datetime(df['date'], errors='coerce', format='%Y-%m-%d')
    df['Last Update'] = pd.to_datetime(df['Last Update'], errors='coerce', format='%m/%d/%Y %H:%M')
    df = df.replace({np.nan: None})
    return df

In [38]:
def inserir_dados(df, conn):
    with conn.cursor() as cur:
        # Criação da tabela caso ela não exista
        cur.execute("""
        CREATE TABLE IF NOT EXISTS public.covid_data (
            id serial4 NOT NULL PRIMARY KEY,
            fips float8,
            admin2 text,
            province_state text,
            country_region text,
            last_update timestamp,
            lat float8,
            long_ float8,
            confirmed float8,
            deaths float8,
            recovered float8,
            active float8,
            combined_key text,
            incident_rate float8,
            case_fatality_ratio float8,
            "date" date,
            "Province/State" text,
            "Country/Region" text,
            "Last Update" timestamp,
            latitude float8,
            longitude float8,
            incidence_rate float8,
            "Case-Fatality_Ratio" float8
        );
        """)
        # Inserção dos dados com barra de progresso
        for i, row in tqdm(df.iterrows(), total=df.shape[0], desc='Inserindo dados'):
            cur.execute(sql.SQL("""
            INSERT INTO public.covid_data (
                fips, admin2, province_state, country_region, last_update,
                lat, long_, confirmed, deaths, recovered, active, combined_key,
                incident_rate, case_fatality_ratio, "date", "Province/State",
                "Country/Region", "Last Update", latitude, longitude,
                incidence_rate, "Case-Fatality_Ratio"
            ) VALUES (
                %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                %s, %s, %s, %s, %s, %s
            );
            """), tuple(row))
        conn.commit()


In [39]:
dotenv.load_dotenv()

# Tratando os dados
df_tratado = tratar_dados(df_final)

# Conectando ao banco de dados
conn = psycopg2.connect(
    dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT')
)

# Inserindo os dados
inserir_dados(df_tratado, conn)


Tratando dados: 100%|██████████| 22/22 [00:03<00:00,  7.24it/s]
Inserindo dados: 100%|██████████| 4287473/4287473 [24:25<00:00, 2925.58it/s] 
