
##Controle de versões
##Objetivo:
Esse notebook tem a função de realizar ingestão de files

-------------

###**Histórico de versões:**
| Data | Desenvolvido por | Modificações |
|---|---|---|
|09/04/2025|João Guilherme Brandi|Criação do notebook |

## Informações do notebook
- Este notebook tem como objetivo realizar ingestões no Lake de arquivos disponibilizados
-------------

1. **Tipo do arquivo**: O parâmetro tem objetivo de informar o tipo de arquivo que será processado.
     - Opções: 
          - ``csv``
          - ``xlsx``

2. **source_container**: O parâmetro tem objetivo de informar em qual container o arquivo está disponibilizado.
     - Padrão: `transient`

3. **source_directory**: O parâmetro tem objetivo de informar em qual a path que o arquivo está disponibilizado.

4. **Arquivo de Origem**: O parâmetro tem objetivo de informar o nome do arquivo a ser processado.

5. **Delimitador**: O parâmetro tem objetivo de informar qual o delimitador utilizado no arquivo do tipo ``csv`.
     - Padrão: `;`

6. **SkipFooter**: O parâmetro tem objetivo de informar quantas colunas serão puladas olhando de cima para baixo.
     - Padrão: `N/A`

7. **SkipRows**: O parâmetro tem objetivo de informar quantas colunas serão puladas olhando de baixo para cima.
     - Padrão: `N/A`

8. **Planilha**: O parâmetro tem objetivo de informar qual shet do arquivo do tipo ``xlsx`` será processado.
     - Padrão: `N/A`

9. **Coluna**: O parâmetro tem objetivo de informar quais (`colunas`) do arquivos serão lidas.
     - Padrão: `full`

10. **Table name**: O parâmetro tem objetivo de informar qual o nome da tabela a ser criada.
     - Padrão: `N/A` -> Caso o falor for igual a `N/A` o processo irá considerar o valor do parâmetro ``Arquivo de Origem``
     - Objetivo: Esse parâmetro tem objetivo de customizar o nome da tabela a ser criada.

11. **Target Container**: O parâmetro tem objetivo de informar qual container será gravada a tabela a ser criada.
     - Padrão: `bronze`

12. **Write Table Model**: O parâmetro tem objetivo de informar qual será o tipo de carga que será utilizada na alimentação/criação da tabela.
     - `overwrite`
     - `append`

13. **Target directory**: O parâmetro tem objetivo de informar qual o diretório a tabela será salva.
     - Padrão: `bronze`

14. **DataLake Name**: O parâmetro tem objetivo de informar qual o storage a tabela será salva.
     - Padrão: `{Nome do Storage}`

-------------

###**Regras de tratamento**
- O processo realizar uma serie de tratamento no nome das colunas e no nome da tabela a ser criada, sendo elas:
  - Substituição do ` `  por `_`.
  - Remoção de caracteres especiais.
  - Reescrita para minusculo.
  - Caso o valor da coluna seja `nulo` o processo colocará ``padrao_1``, ``padrao_2``...
  - Caso o valor da coluna tenha somente número, o processo colocará o `n_` antes do número.

- Para leitura de arquivo do tipo `csv` o processo utiliza o ``encoding = UTF-8``

## Pip Install

In [0]:
%pip install azure-storage-blob
%pip install openpyxl
%pip install fsspec adlfs

## Import Libraries

In [0]:
from pyspark.sql.functions import *
from datetime import datetime
from azure.storage.blob import BlobServiceClient
from io import BytesIO
from pyspark.sql.window import Window

import pandas as pd
import os
import re
import logging
import unicodedata

## Logs

In [0]:
# Configuração do logger
logger = logging.getLogger('Ingestion xlsx')
logger.setLevel(logging.WARNING)

## Parâmetro

In [0]:
## Definir container de Tipo do arquivo
dbutils.widgets.dropdown("file_type","csv",["csv", "xlsx"],"1.Tipo do arquivo")
file_type = dbutils.widgets.get("file_type")

## Definir container de origem
dbutils.widgets.text("source_container","N/A","2.Container de Origem")
source_container = dbutils.widgets.get("source_container")

## Definir diretório de origem
dbutils.widgets.text("source_directory","N/A","3.Diretório de Origem")
source_directory = dbutils.widgets.get("source_directory")

## Definir arquivo de origem
dbutils.widgets.text("source_file","N/A","4.Arquivo de Origem")
source_file = dbutils.widgets.get("source_file")

## Definir coluna
dbutils.widgets.text("delimitador","N/A","5.Delimitador")
delimitador = dbutils.widgets.get("delimitador")

In [0]:
## Definir skipfooter
dbutils.widgets.text("skipfooter","N/A","6.Rodapé a Ignorar")
skipfooter = dbutils.widgets.get("skipfooter")

## Definir skiprows
dbutils.widgets.text("skiprows","N/A","7.Linhas a Ignorar")
skiprows = dbutils.widgets.get("skiprows")

## Definir planilha
dbutils.widgets.text("planilha","N/A","8.Planilha")
planilha = dbutils.widgets.get("planilha")

## Definir coluna
dbutils.widgets.text("coluna","full","9.Coluna")
coluna = dbutils.widgets.get("coluna")
if coluna == 'full':
    coluna = None

In [0]:
## Definir table_name
dbutils.widgets.text("table_name","N/A","10.Table name")
table_name = dbutils.widgets.get("table_name")
if table_name == 'N/A':
    table_name = source_file.replace(".csv", "").replace(".txt", "").replace(".xlsx", "")
else:
    table_name = dbutils.widgets.get("table_name")

## Definir target container
dbutils.widgets.text("target_container","N/A","11.Target Container")
target_container = dbutils.widgets.get("target_container")

## Definir write table model
dbutils.widgets.text("write_table_model","N/A","12.Write Table Model")
write_table_model = dbutils.widgets.get("write_table_model")

## Definir write table model
dbutils.widgets.text("target_directory","N/A","13.Target directory")
target_directory = dbutils.widgets.get("target_directory")

## Definir write table model
dbutils.widgets.text("data_lake_name","N/A","14.DataLake Name")
data_lake_name = dbutils.widgets.get("data_lake_name")

# Client - Connection

In [0]:
# Obter Key datalake
secret_value = dbutils.secrets.get(scope="scope-databricks", key="secret-key-datalake")

In [0]:
if file_type == 'xlsx':
    ## Criando a connextion_string
    connection_string = f"DefaultEndpointsProtocol=https;AccountName={data_lake_name};AccountKey={secret_value};EndpointSuffix=core.windows.net"
    
    # Create a BlobServiceClient
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    
    # Get the blob client
    blob_client = blob_service_client.get_blob_client(container=source_container, blob=f"{source_directory}/{source_file}")

    ## Criando caminho para o path
    logger.info('Criando caminho para o path:', source_file)
    path = f"abfss://{source_container}@{data_lake_name}.dfs.core.windows.net/{source_directory}/{source_file}"
    #path = "/mnt/transient/" + source_directory + "/" + source_file

elif file_type == 'csv':
    # Configurando a chave de acesso no Spark
    spark.conf.set(f"fs.azure.account.key.{data_lake_name}.dfs.core.windows.net", secret_value)

    ## Criando caminho para o path
    logger.info('Criando caminho para o path:', source_file)
    path = f"abfss://{source_container}@{data_lake_name}.dfs.core.windows.net/{source_directory}/{source_file}"
    #path = "/mnt/transient/" + source_directory + "/" + source_file

# DEFs

In [0]:
## Remove os acentos
def remove_accents(input_str):
    logger.info("Iniciando remoção de acentos.")
    nfkd_form = unicodedata.normalize('NFD', input_str)
    without_accents = re.sub(r'[\u0300-\u036f]', '', nfkd_form)
    logger.info("Remoção de acentos concluída.")
    return without_accents


## Formata nome das colunas
def clean_column_names(df: DataFrame) -> DataFrame:
    logger.info("Iniciando formatação dos nomes das colunas.")
    new_column_names = {}
    for column in df.columns:
        new_column = remove_accents(re.sub(r"[. ()#\+\-\\/]", "_", column.lower().replace("º", "o").replace("ª", "a").replace("%", "perc")))
        new_column = re.sub(r'[^a-zA-Z0-9_]', '', new_column)
        new_column = new_column.replace(' ', '_')
        new_column_names[column] = new_column
    
    for old_col, new_col in new_column_names.items():
        df = df.withColumnRenamed(old_col, new_col)
    
    logger.info("Formatação dos nomes das colunas concluída.")
    return df

## Formatar dados
def format_data(df):
    logger.info("Iniciando formatação dos dados.")
    df = clean_column_names(df)
    logger.info("Formatação dos dados concluída.")
    return df

## Remover caracteres
def remove_caracteres(name):
    logger.info("Iniciando remoção de caracteres.")
    ## Removendo acentos
    file = remove_accents(name)
    ## Removendo caracteres especiais
    file = remove_accents(re.sub(r"[. ()#\+\-\\/]", "_", file.lower().replace("º", "o").replace("ª", "a").replace("%", "perc")))
    file = re.sub(r'[^a-zA-Z0-9_]', '', file)
    file = file.replace(' ', '_')
    logger.info("Remoção de caracteres concluída.")
    return file

def rename_duplicate_columns(df_file):
    # Obter as colunas do DataFrame
    columns = df_file.columns
    column_counts = {}

    # Contar a ocorrência de cada coluna
    for col in columns:
        column_counts[col] = column_counts.get(col, 0) + 1

    # Lista para armazenar os novos nomes das colunas
    new_columns = []

    # Renomear as colunas duplicadas com um sufixo sequencial
    for col in columns:
        if column_counts[col] > 1:
            # Incrementa o contador para colunas duplicadas
            column_counts[col] -= 1
            new_col_name = f"{col}_{column_counts[col]}"
            new_columns.append(new_col_name)
        else:
            # Se não for duplicado, mantém o nome original
            new_columns.append(col)

    # Renomear as colunas no DataFrame
    df_file = df_file.toDF(*new_columns)
    
    return df_file

def renomeia_colunas_com_numeros(df_file):
    """
    Função que renomeia colunas que possuem apenas números, adicionando o prefixo 'n_'.
    """
    # Obter os nomes das colunas do DataFrame
    colunas = df_file.columns
    
    # Lista para armazenar os novos nomes das colunas
    novas_colunas = []
    
    # Loop pelas colunas para verificar se o nome é numérico
    for coluna in colunas:
        # Verificar se o nome da coluna é numérico
        if coluna.isdigit():
            novas_colunas.append(f"n_{coluna}")  # Adiciona 'n_' no início
        else:
            novas_colunas.append(coluna)  # Mantém o nome da coluna como está
    
    # Renomeia as colunas do DataFrame
    for old_name, new_name in zip(colunas, novas_colunas):
        df_file = df_file.withColumnRenamed(old_name, new_name)
    
    return df_file

In [0]:
# Função para gerar nomes de colunas de forma alfabética
def generate_column_names(n):
    # Gera os nomes das colunas baseados no padrão A, B, ..., Z, AA, AB, ...
    result = []
    for i in range(n):
        column_name = ""
        num = i
        while num >= 0:
            column_name = chr(num % 26 + 65) + column_name
            num = num // 26 - 1
        result.append(column_name)
    return result

def csv_ajuste_file(df_csv, sr, sf, cl):

    # Removendo linhas - skiprows
    if sr != 'N/A':
        windowSpec = Window.orderBy(lit(1))
        df_with_index = df_csv.withColumn("row_num", row_number().over(windowSpec))
        df_csv = df_with_index.filter(col("row_num") > sr).drop("row_num")

    if sf != 'N/A':
        df_csv = df_csv.limit(df_csv.count() - int(sf))

    if coluna != 'full':
        # Gerar os novos nomes das colunas com a função customizada
        new_columns = generate_column_names(len(df_csv.columns))
        column_mapping = dict(zip(df_csv.columns, new_columns))
        df_csv = df_csv.select([df_csv[col].alias(column_mapping[col]) for col in df_csv.columns])

        # Selecionando as colunas
        colunas_selecionadas_lista = cl.split(',')
        colunas_final = [coluna for coluna in df_csv.columns if coluna in colunas_selecionadas_lista]
        df_csv = df_csv.select(*colunas_final)

    # Verifica se o DataFrame tem dados antes de acessar a primeira linha
    if df_csv.count() > 0:

        # Remover a primeira linha para não incluí-la no DataFrame final
        windowSpec = Window.orderBy(lit(1))
        df_with_index = df_csv.withColumn("row_num", row_number().over(windowSpec))
        df_without_header = df_with_index.filter(col("row_num") > 1).drop("row_num")\

        # Capturar a primeira linha do DataFrame
        first_row = df_csv.first()
        new_columns = []

        for i, value in enumerate(first_row):
            if value is None:
                # Substituir valores nulos por 'padrão1', 'padrão_1', 'padrão_2', etc.
                new_columns.append(f"padrão_{i+1}" if i > 0 else "padrão1")
            else:
                new_columns.append(str(value))

        df_csv = df_without_header.toDF(*new_columns)

    return df_csv

In [0]:
def read_file_xlsx(path, pl, cl, sr, sf):
    logger.info("Iniciando leitura do arquivo.")

    ## Tratamento Path
    #path = path.replace('dbfs:', '')

    ## Tratamento planilha
    if pl == 'N/A':
        pl = 0

    ## Tratamento coluna
    if cl == 'full':
        cl = None

    ## Tratamento primeira linha
    if sr == 'N/A':
        sr = 0
    else:
        sr = int(sr)

    ## Tratamento ultima linha
    if sf == 'N/A':
        sf = 0
    else:
        sf = int(sf)

    ## Leitura do arquivo
    logger.info("Iniciando leitura do arquivo Excel.")
    ex = pd.read_excel(path, dtype=str, na_values=None, sheet_name=pl, index_col=None, header=None, usecols=cl, skiprows=sr, skipfooter=sf)
    logger.info("Leitura do arquivo Excel concluída.")
    ex.columns = ex.iloc[0]
    ex = ex.iloc[1:].reset_index(drop=True)

    ## Converter DF Spark
    logger.info("Convertendo DataFrame para Spark.")
    df = spark.createDataFrame(ex)
    logger.info("Conversão para DataFrame Spark concluída.")

    logger.info("Leitura do arquivo concluída.")
    return df

In [0]:
def read_file_csv(path, dl):
    logger.info(f"Lendo arquivo: {path}")

    ## Leitura do arquivo
    df = (
                spark.read.format("csv")
                .option("delimiter", dl)
                .option("header", "false")
                .option("inferSchema", "false") \
                .option("encoding", "UTF-8")
                .load(path)
            )
    
    logger.info("Arquivo lido com sucesso.")
    return df

# Extract / Transform

In [0]:
if file_type == 'csv':
    ## Realizando leitura do arquivo
    logger.info('Realizando tratamento no nome das colunas da tabela: %s', table)
    df = read_file_csv(path, delimitador)
    
elif file_type == 'xlsx':
    # Download the blob content to a BytesIO object
    blob_data = blob_client.download_blob().readall()
    excel_data = BytesIO(blob_data)
    
    ## Realizando leitura do arquivo
    logger.info('Realizando tratamento no nome das colunas da tabela: %s', table)
    df = read_file_xlsx(excel_data, planilha, coluna, skiprows, skipfooter)

In [0]:
if file_type == 'csv':
    df = csv_ajuste_file(df, skiprows, skipfooter, coluna)

In [0]:
## Realizando tratamento na tabela
logger.info('Realizando tratamento no nome das colunas da tabela: %s', table)
df = format_data(df)

## Tratando nome da tabela
logger.info('Tratando nome da tabela: %s', table)
table_name = remove_caracteres(table_name)

## Tratando colunas duplicadas
logger.info('Tratando colunas duplicadas: %s', table)
df = rename_duplicate_columns(df)

## Tratando colunas númericas
df = renomeia_colunas_com_numeros(df)

## Tratando valores nulas
df = df.dropna(how='all')

## Save

In [0]:
## Criando Path delta
logger.info("Criando path para salvar o Delta file.")
#deltaFile = f'/mnt/{target_directory}/{source_directory}/{table_name}'
delta_file = f"abfss://{target_container}@{data_lake_name}.dfs.core.windows.net/{target_directory}/{table_name}/"

## Definir nome da tabela delta
delta_table_name = f"{environment}.{target_container}.{table_name}"

In [0]:
## Save table
logger.info('Realizando o save da tabela:', table)
if df.count() > 0:
    (
        df.write
            .mode(write_table_model) \
            .format("delta") \
            .option("mergeSchema", "true") \
            .saveAsTable(delta_table_name, path = deltaFile)
    )
else:
    logger.info('Realizando o save na tabela: %s', table)