In [1]:
# Ingestão Ficheiros Landing Zone 

### Leitura dos ficheiros
### Ingestão no formato parquet na camada landing zone do HDFS

## Importação das LIBS 

In [1]:
import pyspark
from delta import *
from pyspark.sql import SparkSession
import psycopg2
import pandas as pd
import logging
from configs import config_env_test
from functions import functions as func


builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
    .master("local") 

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.version

'3.4.1'

## Conexao com Postgres

##  *** Ideal é utilizar o jar do Postgres para conectar ***

In [2]:
# Crie a conexão usando psycopg2
conn_postgres = psycopg2.connect(**config_env_test.credential_postgres_adventureworks)


## Ingestão no Data Lake - Camada Landing Zone - Formato Parquet

In [3]:
# Configure the logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Log the start of ingestions
logging.info("Starting ingestions from adventureworks to HDFS...")

for key, value in config_env_test.tables_postgres_adventureworks.items():
    table_postgres = value
    table_name_hdfs = func.convert_table_name(table_postgres)
    
    try:
        # Build the SQL query
        query = f"SELECT * FROM {table_postgres}"
        
        # Use pandas to read the query results directly into a DataFrame
        df = pd.read_sql_query(query, conn_postgres)

        hdfs_path = config_env_test.hdfs_path['landing_zone']
        
        target = f"{hdfs_path}{table_name_hdfs}"
        logging.info(f"Processing table: {table_name_hdfs}")

        df_spark = spark.createDataFrame(df)
            
        df_spark.write.format("parquet").mode("overwrite").save(target)
        logging.info(f"Table {table_name_hdfs} successfully processed and saved to HDFS: {target}")

    except Exception as e:
        # Log any exceptions
        logging.error(f"Error processing table {table_name_hdfs}: {str(e)}")

# Optionally, you can add logging messages for the end of the script
logging.info("ingestions from adventureworks to HDFS completed!")
    

2024-01-16 18:05:11,073 - INFO - Starting ingestions from adventureworks to HDFS...
  df = pd.read_sql_query(query, conn_postgres)
2024-01-16 18:05:11,166 - INFO - Processing table: sales_countryregioncurrency
2024-01-16 18:05:14,787 - INFO - Table sales_countryregioncurrency successfully processed and saved to HDFS: hdfs://namenode:9000/landing_zone/sales_countryregioncurrency
  df = pd.read_sql_query(query, conn_postgres)
2024-01-16 18:05:14,825 - INFO - Processing table: sales_creditcard
2024-01-16 18:05:15,892 - INFO - Table sales_creditcard successfully processed and saved to HDFS: hdfs://namenode:9000/landing_zone/sales_creditcard
  df = pd.read_sql_query(query, conn_postgres)
2024-01-16 18:05:15,900 - INFO - Processing table: sales_currency
2024-01-16 18:05:16,112 - INFO - Table sales_currency successfully processed and saved to HDFS: hdfs://namenode:9000/landing_zone/sales_currency
  df = pd.read_sql_query(query, conn_postgres)
2024-01-16 18:05:16,160 - INFO - Processing table: