## Importação de Bibliotecas

In [1]:
# Python versão 3.8.8
# Windows 10

# biblioteca para realizar requisição
import requests # versão 2.25.1

# biblioteca para interação com o sistema
import os
import shutil

# biblioeca para manipulação de dados
import pandas as pd # versão 1.3.2

# biblioteca para manipulação de banco de dados sqlite
import sqlite3 # versão 2.6.0
from sqlite3 import Error

import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='openpyxl')

## Variáveis Globais

In [2]:
path_atual = os.path.dirname(os.path.realpath('__file__')) + "\\" # diretório onde se encontra o script
nome_bd = "DW_Covid.sqlite" # nome do banco de dados

## Download de Dados

In [3]:
# Função que realiza o download de um arquivo através de uma url dada e o salva no diretório dado

def download_arquivo(url, path_arquivo, nome_arquivo = None, download_forcado = False):
    
    mensagem_download = "Realizando Download do Arquivo '{}'...".format(nome_arquivo)
    mensagem_sucesso = 'Arquivo {} salvo com sucesso.'.format(nome_arquivo).ljust(len(mensagem_download))

    if not os.path.exists(path_arquivo):
        os.makedirs(path_arquivo) 
        

    if nome_arquivo is None:
        nome_arquivo = url.split('/')[-1].replace(" ", "_")
        
    arquivo_existe = os.path.exists(path_atual + nome_arquivo)
    # Realiza o donwload se o arquivo ainda nao existe no diretório
    if not arquivo_existe or download_forcado:
        path_arquivo = os.path.join(path_arquivo, nome_arquivo)

        request_url = requests.get(url, stream=True)
        if request_url.ok:
            print(mensagem_download, end="\r", flush=True)

            tam_chunk = 1024 * 8

            with open(path_arquivo, 'wb') as arquivo:

                for chunk in request_url.iter_content(tam_chunk):
                    if chunk:
                        arquivo.write(chunk)
                        arquivo.flush()
                        os.fsync(arquivo.fileno())

            print(mensagem_sucesso) 

        else:
            print("Falha no Download: status code {}\n{}".format(request_url.status_code, request_url.text))

In [4]:
url = "https://data.brasil.io/dataset/covid19/caso_full.csv.gz"
download_arquivo(url, path_atual, "casos_covid.gz")

In [5]:
# download_arquivo(url, path_atual, "casos_covid.gz", download_forcado = True)

## Leitura de Dados

In [6]:
data_casos = pd.read_csv("casos_covid.gz", compression = "gzip")

In [7]:
# data_vacinacao = pd.read_csv("tabela_vacinacao.csv")

## Conexão Banco de Dados

In [7]:
def nova_conexao(path, nome_bd):

    conec_bd = None
    try:
        conec_bd = sqlite3.connect(path + nome_bd)

    except Error as erro:
        print(erro)
        return 0
    
    finally:
        if conec_bd:
            conec_bd.close()
    return 1

In [8]:
if (nova_conexao(path_atual,  nome_bd)):
    conec_bd = sqlite3.connect(path_atual + nome_bd)
    cursor = conec_bd.cursor()
    print("Conexão ao banco '{}' criada com sucesso.".format(nome_bd))
    conec_bd.commit()

Conexão ao banco 'DW_Covid.sqlite' criada com sucesso.


## Alimentação do Banco de Dados

In [9]:
def verifica_tabela_existe(cursor, nome_tabela):
    
    cursor.execute(''' SELECT name FROM sqlite_master WHERE type='table' AND name='{}' '''.format(nome_tabela))
    
    if cursor.fetchone():
        return 1
    else:
        return 0

In [10]:
def coverte_tipo_sqlite(string_tipo):  
        
    if string_tipo in ["int64", "bool"]:
            tipo_sql = "integer"
            
    elif string_tipo == "float64":
            tipo_sql = "real"           
    else:
        tipo_sql = "text"
    
    return tipo_sql
    

In [11]:
def insere_tabelas_banco(data, cursor, nome_tabela):

    tabela_existe = verifica_tabela_existe(cursor, nome_tabela)

    if not tabela_existe:


        # Transforma o nome da coluna seguido por seu tipo em apenas uma string para cada coluna
        # Logo após concatena essas strings utilizando a vírgula como separador
        lista_colunas = list(data.columns)
        
        novas_colunas = []
        for coluna in lista_colunas:
            coluna = coluna + " " + coverte_tipo_sqlite(str(data[coluna].dtype)) 
            novas_colunas.append(coluna)

        nomes_colunas = ", ".join(novas_colunas)         

        print("Criando tabela {} no banco {}...".format(nome_tabela, nome_bd), end="\r", flush=True)

        # Cria a tabela no banco de dados sqlite passando o nome da tabela, o nome das colunas e seus tipos
        comando = 'CREATE TABLE ' + nome_tabela + ' (' + nomes_colunas + ')'
        cursor.execute('' + comando + '')

        # Importa o dataframe com microdados referentes ao no para a tabela no banco de dados
        data.to_sql(nome_tabela, conec_bd, if_exists='append', index = False)
        conec_bd.commit()

        print(" "*70, end="\r", flush=True)
        print("Tabela {} criada com sucesso no banco {}.".format(nome_tabela, nome_bd), end="\n\n")

    # Se a tabela já existe, nada acontece
    else:
        print("Tabela {} já existe no banco de dados {}.".format(nome_tabela, nome_bd), end="\n\n")

In [12]:
insere_tabelas_banco(data_casos, cursor, "Casos_Covid")

Tabela Casos_Covid criada com sucesso no banco DW_Covid.sqlite.       



In [13]:
conec_bd.close()