# Primeira Parte - Extract Transformation and Load (ETL)


## Contexto

#### Projeto Final - BIG DATA ANALYTICS - EDIT 2023.

__Cenário:__  
Fui contratado como consultor em 'Big Data Analytics' pelo Ministério de Saúde dos Estados Unidos (EUA) para analisar os mais recentes dados da COVID-19. Os meus dois principais objetivos são: um procedimento de ingestão, transformação e carregamento dos dados ('Extract Transformation and Load - ETL'); e o outro é a análise dos dados ('Exploratory Data Analysis - EDA'). Neste notebook será abordado o primeiro objetivo (ETL).  

__ETL__:  
Como consultor em 'Analytics', a minha primeira tarefa é criar um 'pipeline' para carregar os dados que contêm a informação de cada doente. É o meu objetivo criar um procedimento para receber os ficheiros 'CSV' e carregá-los diretamente no Snowflake (SF). Para atingir o objetivo, o HHS (departamento de saúde e serviços humanos dos EUA), pede que o meu código seja reutilizável, porque o processo que será posteriormente utilizado para ingerir mais dados.  
Deverá ser criada uma definição para cada um dos processos. 


## Importação de bibliotecas

In [0]:
import pandas as pd
import numpy as np
import time

## Criação da classe 'DatabricksSnowflakeConnection' com 3 funções (conexão com o SF, ecrita de tabela no SF e leitura de tabela/query no SF).


In [0]:
# Foi definida uma nova classe chamada 'DatabricksSnowflakeConnection'.
# Para essa classe foram definidos diversos atributos correspondentes às credenciais de conexão do meu utilizador SF e à localização do ficheiro no qual será realizada a conexão.
# Os atributos definidos na classe são: 'host', 'user', 'password', 'dw'(datawarehouse), 'db' (database), 'schema', 'table'.
# Se a conexão for bem-sucedida, uma mensagem a indicar o sucesso é impressa. Se ocorrer uma exceção (erro), uma mensagem de falha é impressa.
# Os atributos defininos na função '__init_()' são utilizados nas definições das funções de leitura de tabelas ('read_table') e escrita de tabelas/queries ('write_table'). 
# As funções de leitura e escrita representam os métodos que podem ser aplicados aos objetos definidos a partir da classe 'DatabricksSnowflakeConnection'.
# Na eventualidade de haver algum erro na execução de uma das funções de leitura e/ou escrita, o erro especifico será exibido.
# No caso da função de escrita for bem sucedida é criado um dicionário com as seguintes informações: 'tempo que tardou em criar a tabela em segundos'; 'schema'; 'nome da tabela'; 'número de colunas'; 'nome das colunas' e 'número de linhas'.

class DatabricksSnowflakeConnection:
    def __init__(self, host, user, password, dw, db, schema, table):
        self.host = host
        self.user = user
        self.password = password
        self.dw = dw
        self.db = db
        self.schema = schema
        self.table = table
        options1 = {
            "host": self.host,
            "user": self.user,
            "password": self.password,
            "sfWarehouse": self.dw,
            "database": self.db,
            "schema": self.schema,
        }
        try:
            connection = (
                spark.read.format("snowflake")
                .options(**options1)
                .option("dbtable", self.table)
                .load()
            )
            return print(
                "Connection between Databricks and Snowflake executed successfully"
            )
        except:
            print("Connection between Databricks and Snowflake failed")

    def read_table(self, db_read, schema_read, table_read, query=None, type=True):
        options2 = {
            "host": self.host,
            "user": self.user,
            "password": self.password,
            "sfWarehouse": self.dw,
            "database": db_read,
            "schema": schema_read,
        }
        try:
            if type == True:
                df = (
                    spark.read.format("snowflake")
                    .options(**options2)
                    .option("dbtable", table_read)
                    .load()
                )
                return df
            else:
                df = (
                    spark.read.format("snowflake")
                    .options(**options2)
                    .option("query", query)
                    .load()
                )
                return df
        except Exception as e:
            print(f"Error description: {e}")

    def write_table(self, df, db_write, schema_write, table_write):
        options3 = {
            "host": self.host,
            "user": self.user,
            "password": self.password,
            "sfWarehouse": self.dw,
            "database": db_write,
            "schema": schema_write,
        }
        try:
            start_time = time.time()
            df.write.format("snowflake").options(**options3).option("dbtable", table_write).save()
            end_time = time.time()
            time_total = end_time - start_time
            time_total_rounded = round(time_total, 2)
            dict_info_tabela = {
                "Tempo total transcorrido (segundos)": time_total_rounded,
                "Schema": self.schema,
                "Tabela": table_write,
                "Numero de columnas": len(df.columns),
                "nome das colunas": df.columns,
                "Numero de linhas": df.count(),
            }
            return dict_info_tabela
        except Exception as e:
            print(f"Error description: {e}")

## Criação do objeto da classe 'DatabricksSnowflakeConnection' e conexão com SF

In [0]:
# Criação do objeto 'francisco' permite estabelecer e verificar a ligação entre o Databricks (DB) e o SF.
# Na eventualidade de ainda não terem sido criados novos 'databases', 'schemas' e/ou 'tabelas' no SF, a definição da função de conexão será estabelecida por intermédio da leitura de uma das tabelas de amostragem previamente forneceidas pelo SF. 
# (novamente) Se a conexão for bem-sucedida, uma mensagem indicando o sucesso é impressa. Se ocorrer uma exceção (erro), uma mensagem de falha é impressa.
# Mensagem de sucesso expectável: "Connection between Databricks and Snowflake executed successfully".

francisco = DatabricksSnowflakeConnection(
    host="txdygja-lz90113.snowflakecomputing.com",
    user="RAMALHOSAFRANCISCO",
    password="Edit2023_",
    dw="COMPUTE_WH",
    db="SNOWFLAKE_SAMPLE_DATA",
    schema="TPCH_SF10",
    table="CUSTOMER",
)

Connection between Databricks and Snowflake executed successfully


## Leitura e carregamento dos Dataframes para o ambiente 'notebook' do Databricks.

In [0]:
# A seguinte função realiza o carregamento dos DataFrames relevantes para o estudo num único dicionário. Nesta função, o parâmetro especificado será a variável que contém os nomes atribuídos aos ficheiros que desejamos carregar.
# A função retornará um dicionário que conterá os DataFrames cujos nomes serão eventualmente especificados no tuplo 'names', que neste caso, serve como argumento para a função em questão.
# As opções aplicadas são para ficheiros CSV. Para outros tipos de ficheiros, a função não resultará.
# Os ficheiros CSV que pretendemos fazer o ETL, deverão de ser previamente carregados no 'Workspace' do Databricks antes de se chamar a função.

def load_dfs(names):
    loaded_dfs = {}
    for i in names:
        file_type = "csv"
        file_location = f"/FileStore/tables/{i}.{file_type}"
        infer_schema = "true"
        first_row_is_header = "true"
        delimiter = ","
        try:
            loaded_dfs[f"df_{i}"] = (
                spark.read.format(file_type)
                .option("inferSchema", infer_schema)
                .option("header", first_row_is_header)
                .option("sep", delimiter)
                .load(file_location)
            )
        except Exception as e:
            print(f"Error description: {e}")
    return loaded_dfs

## Carregamento dos Dataframes para o ambiente Snowflake.

In [0]:
# 'names' é o tuplo que contém os nomes dos Dataframes que pretendemos carregar para o Snowflake.

names = (
    "allergies",
    "careplans",
    "conditions",
    "devices",
    "encounters",
    "observations",
    "organizations",
    "patients",
    "payer_transitions",
    "procedures",
    "providers",
    "supplies",
)

# Para aceder diretamente aos 'DataFrames' do dicionário retornado pela função 'load_dfs(names)', foi aplicado um ciclo 'for'. O ciclo 'for' itera sobre cada um dos pares 'key-value' (nome do DataFrame - DataFrame) do dicionário, obtidos através da função 'items()'. Dentro do loop, a função 'global()' é utilizada para criar variáveis globais, que serão, neste caso, os nomes dos DataFrames ('key'), e atribui os DataFrames ('value') correspondentes a essas variáveis.

for key, df in load_dfs(names).items():
    globals()[key] = df
    table_info = francisco.write_table(
        df=globals()[key],
        db_write="EDIT2023",
        schema_write="PROJETO_FINAL",
        table_write=key,
    )

print(table_info)

# O código carrega uma série de DataFrames no ambiente Databricks '(globals()[key] = df)' e, em seguida, utiliza o objeto 'francisco' para carregar cada DataFrame correspondente no Snowflake usando o método 'create_table' (criado na class 'DatabricksSnowflakeConnection').
# Cada DataFrame é associado a uma tabela no Snowflake com o mesmo nome.
# No caso do(s) DF(s) já terem sido previamente carregados para o SF, o seguinte erro irá aparecer: "(...) Table [nome do primeiro elemento/tabela da lista 'names'] already exists! (...)"

{'Tempo total transcorrido (segundos)': 10.17, 'Schema': 'TPCH_SF10', 'Tabela': 'df_supplies', 'Numero de columnas': 6, 'nome das colunas': ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'QUANTITY'], 'Numero de linhas': 143110}
