<div style="text-align:center">
  
# **Trabalho Final Big Data Analytics**  
##**Matheus Nascimento**
  
</div>

**Projeto Final**  
**BIG DATA ANALYTICS.**  
Você foi contratado como consultor em Big Data Analytics pelo ministério de saúde dos Estados Unidos par analisar os mais recentes dados da COVID-19. Os seus 2 grandes objetivos são um procedimento de ingestão, transformação e carregamento dos dados (Extract Transformation and Load); e a outro é a análise dos dados.  


**Primeira Parte (ETL)**  
1. Como consultor em Analytics, a primeira tarefa é criar um pipeline para carregar aos dados que contêm a informação de cada doente. É o seu objetivo e criar um   procedimento para receber ficheiros CSV e carregá-los diretamente no snowflake. Para atingir dito objetivo o HHS(departamento de saúde dos Estados Unidos), pede que o seu   código seja reutilizável porque o processo que você vai criar será usado para ingerir mais dados posteriormente. Crie uma definição para cada um dos processos.  

In [0]:
from pyspark.sql.types import *
import  pyspark.sql.functions as F
from datetime import datetime
from pyspark.sql import SparkSession

In [0]:
class ETL:
    def __init__(self, snowflake_params):
        self.snowflake_params = snowflake_params
        self.spark = SparkSession.builder.appName("ETL").getOrCreate()
        self.snowflake_connection = None

    def establish_snowflake_connection(self):
        try:
            self.snowflake_connection = self.spark.read.format("snowflake") \
                .option("host", self.snowflake_params["host"]) \
                .option("user", self.snowflake_params["user"]) \
                .option("password", self.snowflake_params["password"]) \
                .option("sfWarehouse", self.snowflake_params["warehouse"]) \
                .option("database", self.snowflake_params["database"]) \
                .option("schema", self.snowflake_params["schema"]) \
                .option("dbtable", self.snowflake_params["dbtable"]) \
                .load()

            print("Connection to Snowflake established successfully.")
            return True
        except Exception as e:
            print(f"Error establishing Snowflake connection: {str(e)}")
            return False

    def read_data_from_snowflake(self, tables_or_queries):
        try:
            dfs = []
            for item in tables_or_queries:
                if " " in item:
                    df = self.spark.read.format("snowflake") \
                        .option("host", self.snowflake_params["host"]) \
                        .option("user", self.snowflake_params["user"]) \
                        .option("password", self.snowflake_params["password"]) \
                        .option("sfWarehouse", self.snowflake_params["warehouse"]) \
                        .option("database", self.snowflake_params["database"]) \
                        .option("schema", self.snowflake_params.get("schema", None)) \
                        .option("dbtable", f"({item})") \
                        .load()
                else:
                    df = self.spark.read.format("snowflake") \
                        .option("host", self.snowflake_params["host"]) \
                        .option("user", self.snowflake_params["user"]) \
                        .option("password", self.snowflake_params["password"]) \
                        .option("sfWarehouse", self.snowflake_params["warehouse"]) \
                        .option("database", self.snowflake_params["database"]) \
                        .option("schema", self.snowflake_params.get("schema", None)) \
                        .option("dbtable", item) \
                        .load()
                dfs.append(df)

            print("Data read successfully.")
            return dfs
        except Exception as e:
            print(f"Error reading data from Snowflake: {str(e)}")
            return None

    def write_data_to_snowflake(self, dfs, target_schema, target_table_names=None):
        try:
            if target_table_names is None:
                raise ValueError("Target table names must be specified for writing to Snowflake.")

            if len(target_table_names) != len(dfs):
                raise ValueError("The number of target table names must be the same as the number of DataFrames.")

            start_time = datetime.now()
            results = []

            for df, target_table in zip(dfs, target_table_names):
                df.write.format("snowflake") \
                    .option("host", self.snowflake_params["host"]) \
                    .option("user", self.snowflake_params["user"]) \
                    .option("password", self.snowflake_params["password"]) \
                    .option("sfWarehouse", self.snowflake_params["warehouse"]) \
                    .option("database", self.snowflake_params["database"]) \
                    .option("schema", target_schema) \
                    .option("dbtable", target_table) \
                    .mode("overwrite").save()

                results.append({
                    "Schema": target_schema,
                    "Table": target_table,
                    "Number of columns": len(df.columns),
                    "Column names": df.columns,
                    "Number of rows": df.count()
                })

            end_time = datetime.now()
            elapsed_time = (end_time - start_time).total_seconds()

            print("Data written successfully.")
            return {"Total elapsed time": elapsed_time, "Results": results}
        except Exception as e:
            print(f"Error writing data to Snowflake: {str(e)}")
            return {"Error": f"Failed to write data to Snowflake. {str(e)}"}

**Procedimento:**  


**--Use %run "Users/matheuslpdnascimento@gmail.com/matheus_nascimento_etl"**

**--Defina os parametros do snowflake em um dicionário como este:**    
    

snowflake_params = {  
    "host": "wsraake-pw92122.snowflakecomputing.com",  
    "user": "MATHEUSLDNASCIMENTO",  
    "password": "MNBmnb1!",  
    "warehouse": "COMPUTE_WH",  
    "database": "TRABALHO_FINAL_BIGDATA",  
    "dbtable": "PATIENTS"  
}


**--Crie uma instância da classe.**  
EXEMPLO: ETL_Processor = ETL(snowflake_params)


**--Estabeleça a conexão. Terá que usar uma tabela específica para isso (defina no snowflake_params).**  
Função: establish_snowflake_connection(self):  
EXEMPLO: ETL_Processor.establish_snowflake_connection()  


**--Use a função para ler as tabelas do snowflake. Pode usar uma tabela, uma lista de tabelas ou uma query.**  
Função: read_data_from_snowflake(self, tables_or_queries)    
EXEMPLO: tables_or_queries = ['PATIENTS', 'CONDITIONS', 'OBSERVATIONS']  
[PATIENTS, CONDITIONS, OBSERVATIONS] = ETL_Processor.read_data_from_snowflake(tables_or_queries)  


**--Use a função para levar um dataframe até o snowflake. Especifique o schema e os nomes da(s) novas tabelas.**    
Função: write_data_to_snowflake(self, dfs, target_schema, target_table_names=None)   
EXEMPLO: ETL_Processor.write_data_to_snowflake([PATIENTS, CONDITIONS, OBSERVATIONS], "COVID19", ['N_PATIENTS', 'N_CONDITIONS', '_N_OBSERVATIONS'])