# VPR Data Landing

# 1. Import dependencies & declare constants

In [1]:
import sys
import os
import yaml
import configparser

# Add the src directory to the sys.path
sys.path.append(os.path.abspath(os.path.join('..', 'src')))

In [2]:
from spark_session import create_spark_session
from schemas import *
from functions import *

In [4]:
def load_aws_credentials(profile_name="default"):

    # Load credentials from the .aws/credentials file (local development)
    try:
        credentials = configparser.ConfigParser()
        credentials.read(os.path.join('..', '.aws', 'credentials'))
        
        logging.info("Successfully loaded credentials variables from .aws file.")
    except Exception as e:
        logging.error(f"Error loading .aws file: {e}")
        sys.exit(1)

    aws_access_key_id = credentials[profile_name]["aws_access_key_id"]
    aws_secret_access_key = credentials[profile_name]["aws_secret_access_key"]

    if not aws_access_key_id or not aws_secret_access_key:
        logging.error("AWS credentials not found.")
        sys.exit(1)

    return aws_access_key_id, aws_secret_access_key

aws_access_key_id, aws_secret_access_key = load_aws_credentials()

In [5]:
# Load the YAML configuration file
with open('../config/config.yml', 'r') as file:
    config = yaml.safe_load(file)

In [6]:
BUCKET_NAME = config["paths"]["BUCKET_NAME"]
RAW = config["paths"]["RAW"]
ORDERS = config["paths"]["ORDERS"]

BRONZE = config["paths"]["BRONZE"]
SILVER = config["paths"]["SILVER"]
GOLD = config["paths"]["GOLD"]

ADDRESS_DATA = config["raw_data"]["ADDRESS_DATA"]
CLIENTS_DATA = config["raw_data"]["CLIENTS_DATA"]
PRODUCTS_DATA = config["raw_data"]["PRODUCTS_DATA"]

DATABASE_NAME = "vpr-optimizer-platforom_db"

ADDRESS_TABLE = config["table_names"]["ADDRESS_TABLE"]
CLIENTS_TABLE = config["table_names"]["CLIENTS_TABLE"]
CLIENTS_ADDRESS_TABLE = config["table_names"]["CLIENTS_ADDRESS_TABLE"]
PRODUCTS_TABLE = config["table_names"]["PRODUCTS_TABLE"]
PACKAGE_TABLE = config["table_names"]["PACKAGE_TABLE"]

RAW_ADDRESS_PATH = os.path.join(BUCKET_NAME, RAW, ADDRESS_DATA)
RAW_CIENTS_PATH = os.path.join(BUCKET_NAME, RAW, CLIENTS_DATA)
RAW_PRODUCTS_PATH = os.path.join(BUCKET_NAME, RAW, PRODUCTS_DATA)

BRONZE_ADDRESS_PATH = os.path.join(BUCKET_NAME, ORDERS, BRONZE, ADDRESS_TABLE)
BRONZE_CLIENTS_PATH = os.path.join(BUCKET_NAME, ORDERS, BRONZE, CLIENTS_TABLE)
BRONZE_PRODUCTS_PATH = os.path.join(BUCKET_NAME, ORDERS, BRONZE, PRODUCTS_TABLE)


SILVER_ADDRESS_PATH = os.path.join(BUCKET_NAME, ORDERS, SILVER, ADDRESS_TABLE)
SILVER_CLIENTS_PATH = os.path.join(BUCKET_NAME, ORDERS, SILVER, CLIENTS_TABLE)
SILVER_PRODUCTS_PATH = os.path.join(BUCKET_NAME, ORDERS, SILVER, PRODUCTS_TABLE)

GOLD_CLIENTS_ADDRESS_PATH = os.path.join(BUCKET_NAME, ORDERS, GOLD, CLIENTS_ADDRESS_TABLE)
GOLD_PRODUCTS_PATH = os.path.join(BUCKET_NAME, ORDERS, GOLD, PRODUCTS_TABLE)
GOLD_PACKAGE_PATH = os.path.join(BUCKET_NAME, ORDERS, GOLD, PACKAGE_TABLE)

# 2. Initialize Spark Session

In [7]:
spark = create_spark_session(aws_access_key_id, aws_secret_access_key)

24/09/21 23:28:22 WARN Utils: Your hostname, Miguels-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.16 instead (on interface en0)
24/09/21 23:28:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/miguelgranica/Documents/MBIT-DE/vpr-data_landing/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/miguelgranica/.ivy2/cache
The jars for the packages stored in: /Users/miguelgranica/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4937df64-2f21-4104-b8e2-58d80ddc1992;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 129ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	

# 3. Medallion Architecure

## 3.1 Bronze Layer

In [8]:
df_address_raw = read_file(spark, RAW_ADDRESS_PATH, "json", addresses_schema)
df_clients_raw = read_file(spark, RAW_CIENTS_PATH, "json", clients_schema)
df_products_raw = read_file(spark, RAW_PRODUCTS_PATH, "json", products_schema)

24/09/21 23:28:54 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [9]:
write_df(df_address_raw, BRONZE_ADDRESS_PATH)
write_df(df_clients_raw, BRONZE_CLIENTS_PATH)
write_df(df_products_raw, BRONZE_PRODUCTS_PATH)

                                                                                

## 3.2 Silver Layer

In [11]:
df_address_bronze = read_file(spark, BRONZE_ADDRESS_PATH, "parquet", addresses_schema)
df_clients_bronze = read_file(spark, BRONZE_CLIENTS_PATH, "parquet", clients_schema)
df_products_bronze = read_file(spark, BRONZE_PRODUCTS_PATH, "parquet", products_schema)

In [12]:
write_df(transform_addresses_bronze_to_silver(df_address_bronze), SILVER_ADDRESS_PATH)
write_df(transform_clients_bronze_to_silver(df_clients_bronze), SILVER_CLIENTS_PATH)
write_df(transform_products_bronze_to_silver(df_products_bronze), SILVER_PRODUCTS_PATH)

                                                                                

## 3.3 Gold Layer

In [14]:
df_address_silver = read_file(spark, SILVER_ADDRESS_PATH, "parquet", silver_address_schema)
df_clients_silver = read_file(spark, SILVER_CLIENTS_PATH, "parquet", silver_clients_schema)
df_products_silver = read_file(spark, SILVER_PRODUCTS_PATH, "parquet", silver_products_schema)

In [16]:
write_df(transform_clients_addresses_silver_to_gold(df_clients_silver, df_address_silver), GOLD_CLIENTS_ADDRESS_PATH)
write_df(transform_products_silver_to_gold(df_products_silver), GOLD_PRODUCTS_PATH, file_type="delta")
write_df(transform_packages_silver_to_gold(df_products_silver), GOLD_PACKAGE_PATH, file_type="delta")

                                                                                