In [0]:
#landing-zone
import psycopg2
import pandas as pd
from sqlalchemy import create_engine, text
from azure.storage.blob import BlobServiceClient
import os
from azure.core.exceptions import AzureError

db_host = "-"
db_port = "5432"
db_name = "Banco"
db_user = "-"
db_password = "-"

azure_connection_string = "-"

conn = psycopg2.connect(host=db_host, port=db_port, dbname=db_name, user=db_user, password=db_password)

blob_service_client = BlobServiceClient.from_connection_string(azure_connection_string)

db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(db_url)

def fetch_table_data(table_name):
    query = text(f"SELECT * FROM {table_name}")
    with engine.connect() as connection:
        return pd.read_sql(query, con=connection)

clientes_df = fetch_table_data('cliente')
imovel_df = fetch_table_data('imovel')
cobertura_df = fetch_table_data('cobertura')
apolice_seguro_df = fetch_table_data('apolice_seguro')
corretor_df = fetch_table_data('corretor')
seguradora_df = fetch_table_data('seguradora')

def save_to_azure_blob(df, file_name):
    try:
        file_name = file_name.lower().replace(" ", "_")  

        local_temp_path = f"/tmp/{file_name}"
        os.makedirs(os.path.dirname(local_temp_path), exist_ok=True)  
        df.to_csv(local_temp_path, index=False)

        container_name = "landing-zone"  

        container_client = blob_service_client.get_container_client(container_name)

        blob_client = container_client.get_blob_client(file_name)
        with open(local_temp_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True)

        print(f"Arquivo {file_name} carregado no Azure Blob Storage.")
    
    except AzureError as e:
        print(f"Ocorreu um erro ao tentar carregar o arquivo no Azure Blob Storage: {e}")
    except Exception as e:
        print(f"Ocorreu um erro inesperado: {e}")

save_to_azure_blob(clientes_df, "clientes.csv")
save_to_azure_blob(imovel_df, "imovel.csv")
save_to_azure_blob(cobertura_df, "cobertura.csv")
save_to_azure_blob(apolice_seguro_df, "apoliceseguro.csv")
save_to_azure_blob(corretor_df, "corretor.csv")
save_to_azure_blob(seguradora_df, "seguradora.csv")


Arquivo clientes.csv carregado no Azure Blob Storage.
Arquivo imovel.csv carregado no Azure Blob Storage.
Arquivo cobertura.csv carregado no Azure Blob Storage.
Arquivo apoliceseguro.csv carregado no Azure Blob Storage.
Arquivo corretor.csv carregado no Azure Blob Storage.
Arquivo seguradora.csv carregado no Azure Blob Storage.


In [0]:
storageAccountName = "datalakea04fb344bd6a3620"
sasToken = "-"

def mount_adls(blobContainerName):
    try:
      dbutils.fs.mount(
        source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
        mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
        extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
      )
      print("OK!")
    except Exception as e:
      print("Falha", e)


mount_adls('landing-zone')
mount_adls('bronze')
mount_adls('silver')
mount_adls('gold')

OK!
OK!
OK!
OK!


In [0]:
from pyspark.sql import SparkSession

tables = ["cliente", "imovel", "cobertura", "apolice_seguro", "corretor", "seguradora"]

for table in tables:
    remote_table = (spark.read
        .format("jdbc")
        .option("url", jdbc_url)
        .option("dbtable", f"public.{table}")  
        .options(**db_properties)  
        .load())

    remote_table.write.option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/{table}.csv")
    print(f"Tabela {table} salva na landing-zone.")


Tabela cliente salva na landing-zone.
Tabela imovel salva na landing-zone.
Tabela cobertura salva na landing-zone.
Tabela apolice_seguro salva na landing-zone.
Tabela corretor salva na landing-zone.
Tabela seguradora salva na landing-zone.


In [0]:
from pyspark.sql import functions as F

for table in tables:
    path = f"/mnt/{storageAccountName}/landing-zone/{table}/"
    
    landing_table = spark.read.option("header", "true").csv(path).withColumn("path", F.input_file_name())
    
    landing_table = landing_table.filter(~F.col("path").like("%_committed%"))
    
    landing_table.show()


+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        cliente_id|        nome_cliente|    endereco_cliente|    telefone_cliente|       email_cliente|                path|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 1|       Steven Conrad|069 Courtney Corn...|    001-339-461-4007|austin66@example.com|dbfs:/mnt/datalak...|
|                 2|        Brandi Mason|7934 Mcdowell For...|                NULL|                NULL|dbfs:/mnt/datalak...|
| West Matthewhaven|           NE 99893"|001-617-824-3623x...| klawson@example.org|                NULL|dbfs:/mnt/datalak...|
|                 3|       Spencer Jones|2688 Phillips Tra...|                NULL|                NULL|dbfs:/mnt/datalak...|
|        East Helen|           AS 20445"|  (971)396-3288x1951|michele99@example...|                NULL|dbfs:/mnt/data