In [0]:
import json
import pyspark.sql.functions as F
import re
import sys
import traceback
import pandas as pd
from datetime import datetime, date, timedelta
from delta.tables import DeltaTable
from enum import Enum, unique
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from types import TracebackType
from typing import List, Type, TypedDict
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import row_number, desc



class Framework:
    
    @unique
    class LoadType(str, Enum):
        OVERWRITE_TABLE = "OVERWRITE_TABLE"
        OVERWRITE_PARTITION = "OVERWRITE_PARTITION"
        APPEND_ALL = "APPEND_ALL"  
        APPEND_NEW = "APPEND_NEW"
        UPSERT = "UPSERT"

    @unique
    class RawFileFormat(str, Enum):
        PARQUET = "PARQUET"
        DELTA = "DELTA"
        ORC = "ORC"
        CSV = "CSV"

    @unique
    class RunStatus(str, Enum):
        SUCCEEDED = "SUCCEEDED"
        FAILED = "FAILED"
    
    @unique
    class SchemaEvolutionMode(str, Enum):
        FAIL_ON_SCHEMA_MISMATCH = "FAIL_ON_SCHEMA_MISMATCH"
        ADD_NEW_COLUMNS = "ADD_NEW_COLUMNS"
        IGNORE_NEW_COLUMNS = "IGNORE_NEW_COLUMNS"
        OVERWRITE_SCHEMA = "OVERWRITE_SCHEMA"


    class ReturnObject(TypedDict):
        status: str
        target_object: str
        num_records_read: int
        num_records_loaded: int
        num_records_errored_out: int
        error_message: str
        error_details: str

    
    def check_workspace(environment)-> str:
        ## busca workspace id
        id = spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")
        if id == 'xxxxxxxxx':
            return  'abfss://xxxxxxxx.dfs.core.windows.net/xxxxxxxx'
        elif id == 'yyyyyyyyy':
            return 'abfss://xxxxxxxx.dfs.core.windows.net/xxxxxxxx'
        elif id == 'zzzzzzzz':
            return 'abfss://xxxxxxxx.dfs.core.windows.net/xxxxxxxx'
        else:
            ##raise ValueError("Este workspace não pertence a este escopo ")
            return f'dbfs:/mnt/lakehouse/{environment}'
    
    LAKEHOUSE_LANDING_ROOT =  check_workspace('land')
    LAKEHOUSE_BRONZE_ROOT  =  check_workspace('bronze')
    LAKEHOUSE_SILVER_ROOT  =  check_workspace('silver')
    LAKEHOUSE_GOLD_ROOT    =  check_workspace('gold')
    
    @classmethod
    def _build_return_object(
        cls,
        status: RunStatus,
        target_object: str,
        num_records_read: int = 0,
        num_records_loaded: int = 0,
        error_message: str = "",
        error_details: str = "",
    ) -> ReturnObject:
        """ Retorna objeto com detalhes da ultima excução/erro
        """
        return {
            "status": status,
            "target_object": target_object,
            "num_records_read": num_records_read,
            "num_records_loaded": num_records_loaded,
            "num_records_errored_out": num_records_read - num_records_loaded,
            "error_message": error_message[:8000],
            "error_details": error_details,
        }
                
    @classmethod
    def exit_with_object(cls, results: ReturnObject):
        """ Retorno de objeto com o resultado da execução 
        """
        dbutils.notebook.exit(json.dumps(results))
    
    
    @classmethod
    def exit_with_last_exception(cls):
        """Busca o ultimo erro e retorno um objeto.
        """
        exc_type, exc_value, _ = sys.exc_info()
        results = cls._build_return_object(
            status=cls.RunStatus.FAILED,
            target_object=None,
            error_message=f"{exc_type.__name__}: {exc_value}",
            error_details=traceback.format_exc(),
        )
        cls.exit_with_object(results)
        

        
    @classmethod
    def read_landing_zone_dataframe(
        cls,
        file_format: RawFileFormat,
        location: str,
        delimiter: str = ';',
        quote: str = "",
    ) -> DataFrame:
        """Leitura de arquivo da camada Landing com opções de parametros como csv, parquet, avro """

        try:
            df = (
                spark.read
                .format(file_format.lower())
                .option("header", True)
                .option("escape", "\"")
                .option("mergeSchema", True)
                .option("delimiter", delimiter)
                .option("quote", quote)
                .load(location)
            )

            # transforma todas as colunas para string
            if file_format != cls.RawFileFormat.CSV:
                non_string_columns = [col for col, dtype in df.dtypes if dtype != "string"]
                for column in non_string_columns:
                    df = df.withColumn(column, F.col(column).cast("string"))

            return df

        except:
            cls.exit_with_last_exception()
            
    @classmethod
    def generate_bronze_table_location(
        cls,
        table_name: str,
    ) -> str:
        """Cria caminho para tabela bronze 
        """
        try:
            # Verifica se os parametro tem algum campo nulo ou em branco
            params_list = [schema_name, table_name]
            if any(len(x) == 0 for x in params_list):
                raise ValueError("Caminho não pode conter brancos ou nulo, verifique !")
            return f"{cls.LAKEHOUSE_BRONZE_ROOT}/{table_name}/"

            
        except:
            cls.exit_with_last_exception()

            
            
    @classmethod
    def generate_silver_table_location(
        cls,
        schema_name: str,
        table_name: str,
    ) -> str:
        """Cria caminho para tabela bronze 
        """
        
        try:
            # Verifica se os parametro tem algum campo nulo ou em branco
            params_list = [schema_name, table_name]
            if any(len(x) == 0 for x in params_list):
                raise ValueError("Caminho não pode conter brancos ou nulo, verifique !")
            return f"{cls.LAKEHOUSE_SILVER_ROOT}/{table_name}/"

            
        except:
            cls.exit_with_last_exception()
            
    @classmethod
    def generate_gold_table_location(
        cls,
        schema_name: str,
        table_name: str,
    ) -> str:
        """Cria caminho para tabela bronze 
        """
        
        try:
            # Verifica se os parametro tem algum campo nulo ou em branco
            params_list = [schema_name, table_name]
            if any(len(x) == 0 for x in params_list):
                raise ValueError("Caminho não pode conter brancos ou nulo, verifique !")
            return f"{cls.LAKEHOUSE_GOLD_ROOT}/{table_name}/"

            
        except:
            cls.exit_with_last_exception()

    @classmethod
    def write_delta_table(
        cls,
        df: DataFrame,
        location: str,
        schema_name: str,
        table_name: str,
        load_type: LoadType,
        key_columns: List[str] = [],
        partition_columns: List[str] = [],
        schema_evolution_mode: SchemaEvolutionMode = SchemaEvolutionMode.ADD_NEW_COLUMNS,
    ) -> ReturnObject:
      
        """Escreve um DataFrame como Delta Table 
        """
        num_records_read = 0
        num_records_loaded = 0
      
        try:
            # Tabela deve existir para aplicar o merge
            if load_type != cls.LoadType.APPEND_ALL and not DeltaTable.isDeltaTable(spark, location):
                #print("Delta table ainda não existe. Altere load_type para APPEND_ALL e reexecute")
                load_type = cls.LoadType.APPEND_ALL

            # Otimiza a escrita para evitar small files 
            spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)

            # Determina o load type
            if load_type == cls.LoadType.APPEND_ALL:
                cls._write_table_using_append_all(
                    df=df,
                    location=location,
                    partition_columns=partition_columns,
                    schema_evolution_mode=schema_evolution_mode,
                )
            elif load_type == cls.LoadType.UPSERT:
                if len(key_columns) == 0:
                    raise ValueError("Nenhuma coluna foi especificada para o upsert")
                     
                cls._write_table_using_upsert(
                    df=df,
                    location=location,
                    key_columns=key_columns,
                    schema_evolution_mode=schema_evolution_mode,
                )
            else:
                raise NotImplementedError

            # Cria Metadados e tabela
            spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_name};")
            spark.sql(f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} USING DELTA LOCATION '{location}';")

            #return cls._build_return_object(
            #    status=cls.RunStatus.SUCCEEDED,
            #    target_object=f"{schema_name}.{table_name}",
            #    num_records_read=num_records_read,
            #    num_records_loaded=num_records_loaded,
            #)

        except Exception as e:
            return cls._build_return_object(
                status=cls.RunStatus.FAILED,
                target_object=f"{schema_name}.{table_name}",
                num_records_read=num_records_read,
                num_records_loaded=num_records_loaded,
                error_message=str(e),
                error_details=traceback.format_exc(),
            )
            
    @classmethod
    def _write_table_using_append_all(
        cls,
        df: DataFrame,
        location: str,
        partition_columns: List[str] = [],
        schema_evolution_mode: SchemaEvolutionMode = SchemaEvolutionMode.ADD_NEW_COLUMNS,
    ) -> ReturnObject:
        """Escreve o DataFrame utilizando APPEND_ALL.
        """
        
        df_writer = (
            df.write
            .format("delta")
            .mode("append")
        )

        # Checa se havera partições
        if len(partition_columns) > 0:
            df_writer = df_writer.partitionBy(partition_columns)

        # verifica schema evolution
        if schema_evolution_mode == cls.SchemaEvolutionMode.FAIL_ON_SCHEMA_MISMATCH:
            pass
        elif schema_evolution_mode == cls.SchemaEvolutionMode.ADD_NEW_COLUMNS:
            df_writer = df_writer.option("mergeSchema", True)
        elif schema_evolution_mode == cls.SchemaEvolutionMode.IGNORE_NEW_COLUMNS:
            if DeltaTable.isDeltaTable(spark, location):
                table_columns = DeltaTable.forPath(spark, location).columns
                new_df_columns = [col for col in df.columns if col not in table_columns]
                df = df.drop(*new_df_columns)
        elif schema_evolution_mode == cls.SchemaEvolutionMode.OVERWRITE_SCHEMA:
            df_writer = df_writer.option("overwriteSchema", True)
        elif schema_evolution_mode == cls.SchemaEvolutionMode.RESCUE_NEW_COLUMNS:
            raise NotImplementedError
        else:
            raise NotImplementedError

        # Grava a Delta Table
        df_writer.save(location)   
    
    @classmethod
    def _write_table_using_upsert(
        cls,
        df: DataFrame,
        location: str,
        key_columns: List[str] = [],
        schema_evolution_mode: SchemaEvolutionMode = SchemaEvolutionMode.ADD_NEW_COLUMNS,
    ) -> ReturnObject:
        """Atualiza Delta table utilizando UPSERT.
        """
        # Set schema_evolution_mode options
        if schema_evolution_mode == cls.SchemaEvolutionMode.FAIL_ON_SCHEMA_MISMATCH:
            pass
        elif schema_evolution_mode == cls.SchemaEvolutionMode.ADD_NEW_COLUMNS:
            original_auto_merge = spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled")
            spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
        elif schema_evolution_mode == cls.SchemaEvolutionMode.IGNORE_NEW_COLUMNS:
            if DeltaTable.isDeltaTable(spark, location):
                table_columns = DeltaTable.forPath(spark, location).columns
                new_df_columns = [col for col in df.columns if col not in table_columns]
                df = df.drop(*new_df_columns)
        elif schema_evolution_mode == cls.SchemaEvolutionMode.OVERWRITE_SCHEMA:
            raise ValueError("OVERWRITE_SCHEMA não é suportado no UPSERT load type")
        elif schema_evolution_mode == cls.SchemaEvolutionMode.RESCUE_NEW_COLUMNS:
            raise NotImplementedError
        else:
            raise NotImplementedError

        # Constroi a condição de merge
        merge_condition_parts = [f"source.`{col}` = target.`{col}`" for col in key_columns]
        merge_condition = " AND ".join(merge_condition_parts)

        # Escreve a delta table
        delta_table = DeltaTable.forPath(spark, location)
        (
            delta_table.alias("target")
            .merge(df.alias("source"), merge_condition)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

        # Reset spark.conf
        spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", original_auto_merge)
     
    
    ####################################################
    ## codigo importado funções já existentes Bemol   ##
    ####################################################
    
    ### Lista arquivos disponiveis na landing
    @classmethod        
    def folder_read_path(
        cls, 
        source_table, 
        source_path_landing,
        years: List[str] = [],
    ) -> list:
      
        try:
            if years == []: 
              years = [file.name[:-1] for file in dbutils.fs.ls(source_path_landing)]
            regex = '\d+X*_\d+X*_*\d*.csv$'
            file_list = []
            for year in years:
              #lista de arquivos nas pastas
              filenames = dbutils.fs.ls(source_path_landing + '/' + year)
              for file in filenames:
                # obtenção do mês do arquivo
                start, _ = re.search(regex, file.path).span()    #FALAR COM O SAULO DESSA BRUXARIA
                month = file.path[start+4:start+6]
                file_list.append(file.path)
            return file_list 
        except :
            cls.exit_with_last_exception()
            
    ### Copia os arquivos da coletado para o diretório da processado
    @staticmethod
    def transport_collected_precessed(
        file_list:List[str]):
      
        try:
            for file in file_list:
                dbutils.fs.mv(file, file.replace('Coletado', 'Processado'), True)  
        except : 
            print('Erro ao tentar mover os arquivos')
          
    ### Leitura da tabela Bronze já aplicando deduplicação 
    @staticmethod  
    def read_bronze(
        schema_bronze,
        bronze_table, 
        partition_key, 
        orderBy_key, 
        start_datetime, 
        end_datetime,
        from_to_field, 
    ) -> DataFrame:
      
        try:
          
            df = (spark.sql(f'''SELECT * FROM {schema_bronze}.{bronze_table}''')
                  .where((col(f"{from_to_field}") >= start_datetime) & (col(f"{from_to_field}") <= end_datetime) ))
            windowSpec  = Window.partitionBy(f"{partition_key}").orderBy(col(f"{orderBy_key}").desc())
            df = df.withColumn("row_number",row_number().over(windowSpec)).filter('row_number = 1').drop('row_number')#.dropDuplicates([f"{partition_key}"]) verificar se precisa
            
            return df
          
        except :
            print('Erro ao tentar selecionar tabela bronze')

    ### Retorna o dicionario de dados de uma tabela especifica 
    @classmethod  
    def _data_dictionary(
        df,
        target_table, 
        location: str=' ' , 
        delimiter: str='|',
    ) -> DataFrame:
        
        try:
          
            table_data_dictionary = pd.read_csv('/dbfs/mnt/bemoldigitalde/Trusted/DATA_DICTIONARY/'+target_table+'.csv', sep='|') #puxa o dicionario da tabela

            return table_data_dictionary 
              
        except :
            print('Erro ao tentar ler dicionario de dados')  
      
    ### Transforma a moeda americana em br-pt
    @staticmethod 
    def decimal_br(decimal):
        if decimal == None:
          return None
        return decimal.replace('.','').replace(',','.')
      
      
    ### Decodificação e tipagem das colunas
    @classmethod 
    def decode_columns(
        cls, 
        df, 
        target_table,
    ) -> DataFrame:
      
      table_data_dictionary = cls._data_dictionary(target_table)
      columns_to_drop = list(table_data_dictionary[(table_data_dictionary['TRUSTED_EXCLUIDO'] == True)]['TRANSIENT_COLUNA'])#pegando colunas pra dropar

      new_column_types = table_data_dictionary[(table_data_dictionary['TRUSTED_TIPO'] != 'string') & (table_data_dictionary['TRUSTED_TIPO'].notnull())][['TRANSIENT_COLUNA',                                    'TRUSTED_TIPO']].set_index('TRANSIENT_COLUNA').to_dict()['TRUSTED_TIPO'] #pegando a nova tipagem das colunas utilizadas

      decode_indexes = list(table_data_dictionary[table_data_dictionary['ARQUIVO_DECODIFICADOR'].notnull()].index.values.astype(int))

      description_tables = table_data_dictionary.iloc[decode_indexes, :][['TRANSIENT_COLUNA', 'ARQUIVO_DECODIFICADOR']].set_index('TRANSIENT_COLUNA').to_dict()['ARQUIVO_DECODIFICADOR']

      map_dictionaries = table_data_dictionary.iloc[decode_indexes, :][['TRANSIENT_COLUNA', 'MAPEAMENTO_DECODIFICACAO']].set_index('TRANSIENT_COLUNA').to_dict()['MAPEAMENTO_DECODIFICACAO']

      trusted_columns_not_decode = list(table_data_dictionary[(table_data_dictionary['TRUSTED_COLUNA'].notnull())&(table_data_dictionary['ARQUIVO_DECODIFICADOR'].isnull())]['TRANSIENT_COLUNA'])

      all_columns_to_decode = []
      
      decode_columns_map = map_dictionaries.keys()
      all_columns_to_decode += decode_columns_map

      for column in description_tables.keys():
        code_description = pd.read_csv('/dbfs/mnt/bemoldigitalde/Trusted/Arquivos_decodificacao/'+description_tables[column]+'-DECODE.csv', sep='|', dtype={'code':'object', 'description':'object'})
        code_description = code_description.fillna('').set_index('code').to_dict()['description']

        dict_map = eval(map_dictionaries[column])

        decode_columns = dict_map.keys()
        concat_decode_column_name = '_'.join(decode_columns) + '_X'

        if len(decode_columns) == 1:
            df = df.withColumn(concat_decode_column_name, col(column))
        else:
            agg_expression = ", lit('_'), ".join(["col('"+str(i)+"')" for i in decode_columns])
            agg_expression = "(" + agg_expression + ")"

            df = df.withColumn(concat_decode_column_name, concat(*eval(agg_expression)))

        if '' in code_description.keys():
          df = df.fillna('', subset=[concat_decode_column_name])

        df = df.replace(to_replace=code_description, subset=[concat_decode_column_name])

        for column in decode_columns:
          if column in trusted_columns_not_decode:
            list(decode_columns).remove(column)

        #all_columns_to_decode += decode_columns

      drop_all_columns_to_decode = list(dict.fromkeys(all_columns_to_decode))

      df = df.drop(*drop_all_columns_to_decode)
      # drop de colunas vazias, nao utilizadas
      df = df.drop(*columns_to_drop)

      # modificação dos tipos de dados das colunas do dataframe
      for column_name in new_column_types.keys():
        column_type = new_column_types[column_name]
        if column_type == 'timestamp':
          df = df.withColumn(column_name, to_date(unix_timestamp(column_name, "yyyyMMdd").cast(column_type)))
        elif column_type[:7] == 'decimal':
          if column_type[:10] == 'decimal_br': 
            column_type = column_type.replace('_br','')
            decimal_br_udf = udf(decimal_br)
            df = df.withColumn(column_name, decimal_br_udf(column_name))
          int_part = int(column_type[8:-1].split(',')[0])
          decimal_places = int(column_type[8:-1].split(',')[1])
          df = df.withColumn(column_name, df[column_name].cast(DecimalType(int_part,decimal_places)))
        else:
          df = df.withColumn(column_name, df[column_name].cast(column_type))
      return df  
    
    
    # funcao pra renomear as colunas de acordo com o dicionario de dados
    @classmethod 
    def rename_collumns(
        cls, 
        df, 
    ) -> DataFrame:
      
        table_data_dictionary = cls._data_dictionary(target_table)
        # obtenção dos novos nomes das colunas, dataframe em pandas com colunas pra decodificar e com colunas sem decodificação
        new_columns_names_with_decode = table_data_dictionary[(table_data_dictionary['MAPEAMENTO_DECODIFICACAO'].notnull())][['TRANSIENT_COLUNA', 'TRUSTED_COLUNA', 'MAPEAMENTO_DECODIFICACAO']]

        new_columns_names_without_decode = table_data_dictionary[(table_data_dictionary['MAPEAMENTO_DECODIFICACAO'].isnull()) & (table_data_dictionary['TRUSTED_COLUNA'].notnull())][['TRANSIENT_COLUNA', 'TRUSTED_COLUNA']].set_index('TRANSIENT_COLUNA').to_dict()['TRUSTED_COLUNA']

        #cria um dicionario com novos nomes da coluna
        new_column_names = new_columns_names_without_decode.copy()

        for _, row in new_columns_names_with_decode.iterrows():
            new_column_names[('_'.join(list(eval(row['MAPEAMENTO_DECODIFICACAO']).keys())) + '_X')] = row['TRUSTED_COLUNA']

        # renomeação dos nomes das colunas do dataframe
        for old_column_name in new_column_names.keys():
          new_column_name = new_column_names[old_column_name]
          df = df.withColumnRenamed(old_column_name,new_column_name)
        # insere zeros a esquerda na coluna ID_CLIENTE
        if 'ID_CLIENTE' in df.columns:
          F = df.withColumn('ID_CLIENTE', lpad(df['ID_CLIENTE'], 10, '0'))    #### verificar este item com James e LInconln ... nao deveria retornar F ? 
        return df
      
    # funcao pra remover as colunas que não estão presente no dicionario de dados 
    @classmethod   
    def columns_to_keep(
        cls, 
        df, 
        target_table, 
        extras_columns=[]
    ) -> list:
      
        table_data_dictionary = cls._data_dictionary(target_table)
        columns_to_keep = list(table_data_dictionary[(table_data_dictionary['TRUSTED_COLUNA']).notna()]['TRUSTED_COLUNA']) + extras_columns

        return columns_to_keep


