### Declaração das bibliotecas/pacotes

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

import logging

logging.getLogger("py4j").setLevel(logging.DEBUG)

from datetime import datetime
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp


### Leitura dos dados do arquivo .env

In [3]:
# Carrega variáveis do .env
dotenv_path = os.path.join(os.path.dirname(".."), ".env")
load_dotenv(dotenv_path)

account_name = os.getenv("ADLS_ACCOUNT_NAME")
landing_container = os.getenv("ADLS_FILE_SYSTEM_NAME")
bronze_container = os.getenv("ADLS_BRONZE_CONTAINER")
client_id = os.getenv("ADLS_SP_CLIENT_ID")
client_secret = os.getenv("ADLS_SP_CLIENT_SECRET")
tenant_id = os.getenv("ADLS_SP_TENANT_ID")

if not all([account_name, landing_container, bronze_container, client_id, client_secret, tenant_id]):
    raise ValueError("Variáveis de ambiente não carregadas corretamente. Verifique o .env.")



#### Validação do carregamento correto das variaveis

In [4]:
print (bronze_container);

bronze


### Criação do Spark Session com os pacotes para o Delta Lake e Azure ADLS, usando credencial por Service Principal

In [5]:
# Criação da SparkSession com autenticação via OAuth 2.0
spark = (
    SparkSession.builder
    .appName("LandingToBronze")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0,org.apache.hadoop:hadoop-azure:3.3.4")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.azurebfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem")
    .config(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows.net", "OAuth")
    .config(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    .config(f"fs.azure.account.oauth2.client.id.{account_name}.dfs.core.windows.net", client_id)
    .config(f"fs.azure.account.oauth2.client.secret.{account_name}.dfs.core.windows.net", client_secret)
    .config(f"fs.azure.account.oauth2.client.endpoint.{account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
    .getOrCreate()
)

# Reflete as configurações no Hadoop Configuration também (essencial para `FileSystem.get()`)
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows.net", "OAuth")
hadoop_conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
hadoop_conf.set(f"fs.azure.account.oauth2.client.id.{account_name}.dfs.core.windows.net", client_id)
hadoop_conf.set(f"fs.azure.account.oauth2.client.secret.{account_name}.dfs.core.windows.net", client_secret)
hadoop_conf.set(f"fs.azure.account.oauth2.client.endpoint.{account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")


25/06/18 16:53:40 WARN Utils: Your hostname, NOTEDELL3420 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/06/18 16:53:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/mnt/c/codigos/projeto-elt-satc-airflow/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jlsilva01/.ivy2/cache
The jars for the packages stored in: /home/jlsilva01/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.hadoop#hadoop-azure added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9a048c75-8893-4ce3-9e85-790ef09f5d1b;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.hadoop#hadoop-azure;3.3.4 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found com.microsoft.azure#azure-storage;7.0.1 in central
	found com.fasterxml.jackson.core#jackson-core;2.12.7 in central
	found org.slf4j#slf4j-api;1.7.36 in central
	found com.microsoft.azure#azure-keyvault-core;1.0.0 in ce

### Validação do seção do spark

In [6]:
spark

### Criação de variáveis de ambiente

In [11]:

# Define os caminhos ABFSS
subdir = os.getenv("ADLS_DIRECTORY_NAME", "")

landing_path = f"abfss://{landing_container}@{account_name}.dfs.core.windows.net/{subdir}"
bronze_path = f"abfss://{bronze_container}@{account_name}.dfs.core.windows.net/{subdir}"



### Checagem do correto carregamento das variaveis de ambiente setadas no bloco anterior

In [13]:
path = f"{landing_path}/carro.csv"

print(path)


abfss://landing-zone@datalakefd251f50081bbe5a.dfs.core.windows.net/sistema_seguro/carro.csv


### Leitura de um arquivo CSV do container landing-zone no ADLS Gen2 da Azure

In [16]:
df = spark.read.format("csv").option("header", "true").load(f"{landing_path}/carro.csv")
df.show()

+-------+---------+-----------+----------+----+--------+
|  placa|   modelo|     chassi|     marca| ano|     cor|
+-------+---------+-----------+----------+----+--------+
|ALD3834|     CLIO|34574215969|   RENAULT|2011|  BRANCO|
|CCR8096|    CRETA|88547875547|   HYUNDAI|2020|  BRANCO|
|DLA3438|    PUNTO|98823483434|      FIAT|2013|   PRETO|
|EEE1056|ECO SPORT|56753453455|      FORD|2020|    AZUL|
|FFR1234|    PALIO|32383478747|      FIAT|2009| AMARELO|
|GQY6753|      S10|72004160549|        GM|2015|   PRETO|
|IAC8974|   TIGUAN|77130757746|VOLKSWAGEN|2022|    AZUL|
|JIE0952|   PASSAT|87493270405|VOLKSWAGEN|2016|   CINZA|
|JNU7898|     2020|87628347687|      FORD|2020|   VERDE|
|LVX7086|  SANDERO|00025131958|   RENAULT|1999|VERMELHO|
|LWJ9156|     ONIX|40991078801|        GM|2015|    AZUL|
|MZT1826|      GOL|41150439528|VOLKSWAGEN|1998| AMARELO|
|NAP5760|  COMPASS|40364369549|      JEEP|2017|   PRETO|
|NEM5116|     2008|69469771523|   PEUGEOT|2018|   PRETO|
|NFT2212|     KWID|12344343433|

### Leitura de um arquivo tipo Delta Lake do container bronze no ADLS Gen2 da Azure

In [19]:
df = spark.read.format("delta").load(f"{bronze_path}/carro")
df.show()

25/06/18 16:58:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 13:>                                                         (0 + 1) / 1]

+-------+---------+-----------+----------+----+--------+--------------------+
|  placa|   modelo|     chassi|     marca| ano|     cor|_ingestion_timestamp|
+-------+---------+-----------+----------+----+--------+--------------------+
|ALD3834|     CLIO|34574215969|   RENAULT|2011|  BRANCO|2025-06-18 16:00:...|
|CCR8096|    CRETA|88547875547|   HYUNDAI|2020|  BRANCO|2025-06-18 16:00:...|
|DLA3438|    PUNTO|98823483434|      FIAT|2013|   PRETO|2025-06-18 16:00:...|
|EEE1056|ECO SPORT|56753453455|      FORD|2020|    AZUL|2025-06-18 16:00:...|
|FFR1234|    PALIO|32383478747|      FIAT|2009| AMARELO|2025-06-18 16:00:...|
|GQY6753|      S10|72004160549|        GM|2015|   PRETO|2025-06-18 16:00:...|
|IAC8974|   TIGUAN|77130757746|VOLKSWAGEN|2022|    AZUL|2025-06-18 16:00:...|
|JIE0952|   PASSAT|87493270405|VOLKSWAGEN|2016|   CINZA|2025-06-18 16:00:...|
|JNU7898|     2020|87628347687|      FORD|2020|   VERDE|2025-06-18 16:00:...|
|LVX7086|  SANDERO|00025131958|   RENAULT|1999|VERMELHO|2025-06-

                                                                                