##Validando a SparkSession

In [0]:
spark

##Conectando Azure ADLS Gen2 no Databricks

###Mostrando os pontos de montagem no cluster Databricks

In [0]:
display(dbutils.fs.mounts())

###Desmontando os pontos de montagem não utilizados

In [0]:
#dbutils.fs.unmount('/mnt/datalake6f2d8d16eba38233/bronze')
#dbutils.fs.unmount('/mnt/datalakefc6082cb60bef06c/bronze')

### Definindo uma função para montar um ADLS com um ponto de montagem com ADLS SAS 

In [0]:
storageAccountName = ""
storageAccountAccessKey = ""
sasToken= ""

def mount_adls(blobContainerName):
    try:
      dbutils.fs.mount(
        source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
        mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
        #extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
        extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
      )
      print("OK!")
    except Exception as e:
      print("Falha", e)

###Montando todos os containers

In [0]:
mount_adls('landing-zone')
mount_adls('bronze')
mount_adls('silver')
mount_adls('gold')

OK!
OK!
OK!
OK!


###Mostrando os pontos de montagem no cluster Databricks

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/mnt/datalake4d021b08c5cc1ca4/bronze,wasbs://bronze@datalake4d021b08c5cc1ca4.blob.core.windows.net,
/mnt/datalakec47b4098c7f22fbc/landing-zone,wasbs://landing-zone@datalakec47b4098c7f22fbc.blob.core.windows.net,
/databricks-datasets,databricks-datasets,
/databricks/mlflow-tracking,databricks/mlflow-tracking,sse-s3
/mnt/datalakef96a297a49066c08/silver,wasbs://silver@datalakef96a297a49066c08.blob.core.windows.net,
/databricks-results,databricks-results,sse-s3
/mnt/datalakef96a297a49066c08/landing-zone,wasbs://landing-zone@datalakef96a297a49066c08.blob.core.windows.net,
/mnt/datalakeb1f95aa108e33317/silver,wasbs://silver@datalakeb1f95aa108e33317.blob.core.windows.net,
/mnt/datalakeb1f95aa108e33317/bronze,wasbs://bronze@datalakeb1f95aa108e33317.blob.core.windows.net,
/mnt/datalake4d021b08c5cc1ca4/lading-zone,wasbs://lading-zone@datalake4d021b08c5cc1ca4.blob.core.windows.net,


### Mostrando todos os arquivos da camada bronze

In [0]:
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/bronze"))

path,name,size,modificationTime
dbfs:/mnt/datalakec47b4098c7f22fbc/bronze/ecommerce/,ecommerce/,0,0


###Gerando um dataframe dos delta lake no container bronze do Azure Data Lake Storage

In [0]:
df_avaliacoes          = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/avaliacoes")
df_categorias          = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/categorias")
df_clientes            = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/clientes")
df_enderecos_cliente  = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/enderecos_cliente")
df_entregas            = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/entregas")
df_estoque             = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/estoque")
df_formas_pagamento    = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/formas_pagamento")
df_itens_pedidos       = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/itens_pedido")
df_pagamentos          = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/pagamentos")
df_pedidos             = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/pedidos")
df_produtos            = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/produtos")
df_transportadoras     = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/transportadoras")
df_vendedores          = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/ecommerce/vendedores")

### Adicionando metadados de data e hora de processamento e nome do arquivo de origem

In [0]:
from pyspark.sql.functions import current_timestamp, lit


df_avaliacoes          = df_avaliacoes.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("avaliacoes"))
df_categorias          = df_categorias.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("categorias"))
df_clientes            = df_clientes.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("clientes"))
df_enderecos_cliente   = df_enderecos_clientes.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("enderecos_cliente"))
df_entregas            = df_entregas.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("entregas"))
df_estoque             = df_estoque.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("estoque"))
df_formas_pagamento    = df_formas_pagamento.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("formas_pagamento"))
df_itens_pedidos       = df_itens_pedidos.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("itens_pedidos"))
df_pagamentos          = df_pagamentos.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("pagamentos"))
df_pedidos             = df_pedidos.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("pedidos"))
df_produtos            = df_produtos.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("produtos"))
df_transportadoras     = df_transportadoras.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("transportadoras"))
df_vendedores          = df_vendedores.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("vendedores"))


In [0]:
# Dicionário com o nome da tabela como chave e o DataFrame como valor
dfs = {
    "avaliacoes": df_avaliacoes,
    "categorias": df_categorias,
    "clientes": df_clientes,
    "enderecos_cliente": df_enderecos_cliente,
    "entregas": df_entregas,
    "estoque": df_estoque,
    "formas_pagamento": df_formas_pagamento,
    "itens_pedidos": df_itens_pedidos,
    "pagamentos": df_pagamentos,
    "pedidos": df_pedidos,
    "produtos": df_produtos,
    "transportadoras": df_transportadoras,
    "vendedores": df_vendedores,
}

# Loop para imprimir as colunas de cada tabela em maiúsculas
for nome, df in dfs.items():
    colunas = df.columns
    colunas_maiusculas = [coluna.upper() for coluna in colunas]

    print(f"\nTabela: {nome}")
    print("Colunas em maiúsculas:")
    for coluna in colunas_maiusculas:
        print(coluna)


Tabela: avaliacoes
Colunas em maiúsculas:
ID
CLIENTE_ID
PRODUTO_ID
NOTA
COMENTARIO
DATA_AVALIACAO
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: categorias
Colunas em maiúsculas:
ID
NOME
DESCRICAO
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: clientes
Colunas em maiúsculas:
ID
NOME
EMAIL
TELEFONE
DATA_CADASTRO
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: enderecos_cliente
Colunas em maiúsculas:
ID
CLIENTE_ID
LOGRADOURO
NUMERO
COMPLEMENTO
BAIRRO
CIDADE
ESTADO
CEP
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: entregas
Colunas em maiúsculas:
ID
PEDIDO_ID
TRANSPORTADORA_ID
DATA_ENVIO
DATA_ENTREGA
STATUS
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: estoque
Colunas em maiúsculas:
ID
PRODUTO_ID
QUANTIDADE
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: formas_pagamento
Colunas em maiúsculas:
ID
DESCRICAO
DATA_HORA_BRONZE
NOME_ARQUIVO
DATA_HORA_SILVER

Tabela: itens_pedidos
Colunas em maiúsculas:
ID
PEDIDO_ID
PRODUTO_ID
QUANTIDADE
PRE

### Fazer as transformacoes de forma massiva

In [0]:
from pyspark.sql.functions import lit, current_timestamp

def renomear_colunas(diretorio):
    # Carrega o DataFrame a partir do Delta
    df = spark.read.format('delta').load(diretorio)
    tabela = diretorio.split('/')[-2]

    # Lista para armazenar os novos nomes das colunas
    novos_nomes = {}

    for coluna in df.columns:
        novo_nome = coluna.upper()

        # Se terminar com _ID, renomeia para CODIGO_<prefixo>
        if novo_nome.endswith("_ID"):
            prefixo = novo_nome[:-3]  # remove o _ID
            novo_nome = f"CODIGO_{prefixo}"
        # Caso contrário, aplica substituições específicas (opcional)
        else:
            novo_nome = novo_nome.replace("ID", "CODIGO")  # se ainda quiser tratar outros usos de 'ID'

        novos_nomes[coluna] = novo_nome

    # Aplica as renomeações
    for antigo, novo in novos_nomes.items():
        df = df.withColumnRenamed(antigo, novo)

    # Remove colunas desnecessárias (se existirem)
    for col_drop in ["DATA_HORA_BRONZE", "NOME_ARQUIVO"]:
        if col_drop in df.columns:
            df = df.drop(col_drop)

    # Adiciona colunas fixas
    df = df.withColumn("NOME_ARQUIVO_BRONZE", lit(tabela))
    df = df.withColumn("DATA_ARQUIVO_SILVER", current_timestamp())

    # Salva o DataFrame modificado
    df.write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/ecommerce/{tabela}")

def renomear_arquivos_delta(diretorio):
    arquivos = dbutils.fs.ls(diretorio)

    for arquivo in arquivos:
        nome_arquivo = arquivo.path
        renomear_colunas(nome_arquivo)

In [0]:
# Executa a funcao para atualizar todos os dataframes
diretorio = f'/mnt/{storageAccountName}/bronze/ecommerce'

renomear_arquivos_delta(diretorio)

In [0]:
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/avaliacoes').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/categorias').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/clientes').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/enderecos_cliente').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/entregas').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/estoque').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/formas_pagamento').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/itens_pedido').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/pagamentos').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/pedidos').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/produtos').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/transportadoras').limit(1).display()
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/ecommerce/vendedores').limit(1).display()

CODIGO,CODIGO_CLIENTE,CODIGO_PRODUTO,NOTA,COMENTARIO,DATA_AVALIACAO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
0,9135,13790,5,Dolorem quae deserunt deleniti mollitia iste corrupti molestias similique nulla assumenda dignissimos at.,2022-06-22 13:46:56.0000000,avaliacoes,2025-06-12T22:45:32.263+0000


CODIGO,NOME,DESCRICAO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,Eletrônicos,Produtos eletrônicos e gadgets,categorias,2025-06-12T22:45:38.436+0000


CODIGO,NOME,EMAIL,TELEFONE,DATA_CADASTRO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,Brenda Alves,samuel32@example.net,+55 51 8196 0013,2023-05-28 15:43:33.0000000,clientes,2025-06-12T22:45:42.664+0000


CODIGO,CODIGO_CLIENTE,LOGRADOURO,NUMERO,COMPLEMENTO,BAIRRO,CCODIGOADE,ESTADO,CEP,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,4299,Alameda Azevedo,78,,da Mata,Gomes da Serra,RN,19007-721,enderecos_cliente,2025-06-12T22:45:47.352+0000


CODIGO,CODIGO_PEDIDO,CODIGO_TRANSPORTADORA,DATA_ENVIO,DATA_ENTREGA,STATUS,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,12582,53,2025-05-22 00:00:00.0000000,2025-06-03 00:00:00.0000000,Enviado,entregas,2025-06-12T22:45:52.674+0000


CODIGO,CODIGO_PRODUTO,QUANTCODIGOADE,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,1,259,estoque,2025-06-12T22:45:57.252+0000


CODIGO,DESCRICAO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,Cartão de Crédito,formas_pagamento,2025-06-12T22:46:01.935+0000


CODIGO,CODIGO_PEDIDO,CODIGO_PRODUTO,QUANTCODIGOADE,PRECO_UNITARIO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,1819,24342,3,285.62,itens_pedido,2025-06-12T22:46:06.790+0000


CODIGO,CODIGO_PEDIDO,CODIGO_FORMA_PAGAMENTO,VALOR_PAGO,DATA_PAGAMENTO,STATUS,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,13020,1,2683.94,2024-07-01 13:41:43.0000000,falha,pagamentos,2025-06-12T22:46:11.457+0000


CODIGO,CODIGO_CLIENTE,CODIGO_ENDERECO_ENTREGA,DATA_PEDCODIGOO,STATUS,TOTAL,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,4290,1210,2023-12-21 21:25:52.0000000,cancelado,1522.51,pedidos,2025-06-12T22:46:16.472+0000


CODIGO,CODIGO_VENDEDOR,CODIGO_CATEGORIA,NOME,DESCRICAO,PRECO,DATA_CADASTRO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,655,8,Optio,Sint eos ad natus et odio nisi harum.,34.76,2022-11-23 21:12:31.0000000,produtos,2025-06-12T22:46:21.477+0000


CODIGO,NOME,TELEFONE,EMAIL,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,da Mata,+55 (081) 5646 9105,barbaramartins@example.org,transportadoras,2025-06-12T22:46:26.528+0000


CODIGO,NOME,EMAIL,TELEFONE,DATA_CADASTRO,NOME_ARQUIVO_BRONZE,DATA_ARQUIVO_SILVER
1,Sr. Daniel Alves,pintoana-livia@example.com,71 2133 9437,2024-07-06 12:45:55.0000000,vendedores,2025-06-12T22:46:31.063+0000
