# ETL

In [0]:
from pyspark.sql.types import *
import  pyspark.sql.functions as F
from datetime import datetime


class Pipeline:
    #verifica se há conexão com com o sf
    def __init__(self, host, user, password,warehouse):
        self.host = host
        self.user = user
        self.password = password
        self.warehouse = warehouse
        database = "SNOWFLAKE_SAMPLE_DATA"
        schema = "TPCH_SF1"
        dbtable = "CUSTOMER"

        try:
            self.session = (spark.read\
            .format("snowflake")\
            .option("host", host)\
            .option("user", user)\
            .option("password", password)\
            .option("sfWarehouse", warehouse)\
            .option("database", database)\
            .option("schema", schema)\
            .option("dbtable", dbtable)\
            .load()
            )
            self.is_connected = True
        except Exception as e:
            self.is_connected = False
            print(e)

        if self.is_connected:
            print("Conexão ao Snowflake estabelecida com sucesso!")
        else:
            print("Não foi possível conectar ao Snowflake.")


    #faz a leitura de um df no sf
    def read_df(self,database,schema,table,query = None):
        try:
            if query==None:
                df=spark.read \
                .format("snowflake") \
                .option("host", self.host) \
                .option("user", self.user) \
                .option("password", self.password) \
                .option("sfWarehouse", self.warehouse) \
                .option("database", database)\
                .option("schema", schema) \
                .option("dbtable", table)\
                .load()
            elif (query!=None):
                df=spark.read \
                .format("snowflake") \
                .option("host", self.host) \
                .option("user", self.user) \
                .option("password", self.password) \
                .option("sfWarehouse", self.warehouse) \
                .option("database", database) \
                .option("schema", schema) \
                .option("query", query) \
                .load()
            return df
        except:
            print("error")

    #escreve o df no sf
    #necessita de criar uma database e schema no sf ou usar uma já existente
    def write_df(self,database,schema,dbtable,df):
        
        time_before = datetime.now()

        df.write \
            .format("snowflake") \
            .option("host", self.host) \
            .option("user", self.user) \
            .option("password", self.password) \
            .option("sfWarehouse", self.warehouse) \
            .option("database",database) \
            .option("schema", schema) \
            .option("dbtable", dbtable) \
            .save()
        
        
        time_after = datetime.now()
        dif_time = time_after - time_before
        
        result_dict = {
            "Tempo total transcorrido:": dif_time.seconds,
            "Schema:": schema,
            "Tabela:": dbtable,
            "Numero de colunas:": len(df.columns),
            "Nome das colunas:": df.columns,
            "Numero de linhas:": df.count() 
        }

        return result_dict