In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lower, trim, regexp_replace, split, udf, explode, array_contains
from pyspark.sql.types import FloatType, IntegerType, DateType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

import utils.attributes_processing_bronze_table
import utils.attributes_processing_silver_table


## set up pyspark session

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("features_attributes_preprocessing") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/21 05:00:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## set up config

In [3]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [4]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)
dates_str_lst

['2023-01-01',
 '2023-02-01',
 '2023-03-01',
 '2023-04-01',
 '2023-05-01',
 '2023-06-01',
 '2023-07-01',
 '2023-08-01',
 '2023-09-01',
 '2023-10-01',
 '2023-11-01',
 '2023-12-01',
 '2024-01-01',
 '2024-02-01',
 '2024-03-01',
 '2024-04-01',
 '2024-05-01',
 '2024-06-01',
 '2024-07-01',
 '2024-08-01',
 '2024-09-01',
 '2024-10-01',
 '2024-11-01',
 '2024-12-01']

## Build Bronze Table

In [5]:
# Define input and output directories
bronze_features_attributes_directory = "datamart/bronze/features_attributes/"
if not os.path.exists(bronze_features_attributes_directory):
    os.makedirs(bronze_features_attributes_directory)

In [6]:
# run bronze backfill
for date_str in dates_str_lst:
    utils.attributes_processing_bronze_table.process_bronze_table(date_str, bronze_features_attributes_directory, spark)

2023-01-01row count: 530
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_01_01.csv
2023-02-01row count: 501
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_02_01.csv
2023-03-01row count: 506
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_03_01.csv
2023-04-01row count: 510
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_04_01.csv
2023-05-01row count: 521
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_05_01.csv
2023-06-01row count: 517
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_06_01.csv
2023-07-01row count: 471
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_07_01.csv
2023-08-01row count: 481
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2023_08_01.csv
2023-09-01row count: 454
saved to: datamart/bronze/features_attributes/bronze_features_attribute

In [7]:
# inspect output
utils.attributes_processing_bronze_table.process_bronze_table(date_str, bronze_features_attributes_directory, spark).toPandas()

2024-12-01row count: 515
saved to: datamart/bronze/features_attributes/bronze_features_attributes_2024_12_01.csv


Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date
0,CUS_0x103e,Tim Kellyf,40,155-72-8070,Scientist,2024-12-01
1,CUS_0x1195,Alexk,31,822-48-3629,Manager,2024-12-01
2,CUS_0x1197,Nayako,28,799-23-8283,_______,2024-12-01
3,CUS_0x11e2,Valetkevitchr,34,809-04-1419,Musician,2024-12-01
4,CUS_0x11ec,William Schombergh,34,417-74-2163,Journalist,2024-12-01
...,...,...,...,...,...,...
510,CUS_0xe6c,Doris Frankelh,26,172-24-1577,Entrepreneur,2024-12-01
511,CUS_0xe99,Moone,48,164-90-3178,Mechanic,2024-12-01
512,CUS_0xf55,Tarmo Virkip,39,025-54-8593,Entrepreneur,2024-12-01
513,CUS_0xfd1,Frewy,32,389-55-6408,Architect,2024-12-01


## Build Silver Table

In [8]:
# create bronze datalake
silver_features_attributes_directory = "datamart/silver/features_attributes/"

if not os.path.exists(silver_features_attributes_directory):
    os.makedirs(silver_features_attributes_directory)

In [9]:
# run silver backfill
for date_str in dates_str_lst:
    utils.attributes_processing_silver_table.process_silver_table(date_str, bronze_features_attributes_directory, silver_features_attributes_directory, spark)

loaded from: datamart/bronze/features_attributes/bronze_features_attributes_2023_01_01.csv row count: 530
saved to: datamart/silver/features_attributes/silver_features_attributes_2023_01_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_2023_02_01.csv row count: 501
saved to: datamart/silver/features_attributes/silver_features_attributes_2023_02_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_2023_03_01.csv row count: 506
saved to: datamart/silver/features_attributes/silver_features_attributes_2023_03_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_2023_04_01.csv row count: 510
saved to: datamart/silver/features_attributes/silver_features_attributes_2023_04_01.parquet
loaded from: datamart/bronze/features_attributes/bronze_features_attributes_2023_05_01.csv row count: 521
saved to: datamart/silver/features_attributes/silver_features_attributes_2023_05_01.parquet
loaded fro

In [10]:
utils.attributes_processing_silver_table.process_silver_table(date_str, bronze_features_attributes_directory, silver_features_attributes_directory, spark).toPandas()

loaded from: datamart/bronze/features_attributes/bronze_features_attributes_2024_12_01.csv row count: 515
saved to: datamart/silver/features_attributes/silver_features_attributes_2024_12_01.parquet


Unnamed: 0,Customer_ID,Name,Age,SSN,Occupation,snapshot_date,Age_Group
0,CUS_0x103e,tim kellyf,40,155-72-8070,scientist,2024-12-01,31-45
1,CUS_0x1195,alexk,31,822-48-3629,manager,2024-12-01,31-45
2,CUS_0x11e2,valetkevitchr,34,809-04-1419,musician,2024-12-01,31-45
3,CUS_0x11ec,william schombergh,34,417-74-2163,journalist,2024-12-01,31-45
4,CUS_0x1234,carrickn,24,101-01-2307,musician,2024-12-01,18-30
...,...,...,...,...,...,...,...
462,CUS_0xe6c,doris frankelh,26,172-24-1577,entrepreneur,2024-12-01,18-30
463,CUS_0xe99,moone,48,164-90-3178,mechanic,2024-12-01,46-60
464,CUS_0xf55,tarmo virkip,39,025-54-8593,entrepreneur,2024-12-01,31-45
465,CUS_0xfd1,frewy,32,389-55-6408,architect,2024-12-01,31-45


## Build Gold Table

In [11]:
# create bronze datalake
gold_label_store_directory = "datamart/gold/label_store/"

if not os.path.exists(gold_label_store_directory):
    os.makedirs(gold_label_store_directory)

In [None]:
# run gold backfill
for date_str in dates_str_lst:
    utils.attributes_processing_gold_table.process_labels_gold_table(date_str, silver_features_attributes_directory, gold_label_store_directory, spark)

In [None]:
utils.attributes_processing_gold_table.process_labels_gold_table(date_str, silver_features_attributes_directory, gold_label_store_directory, spark).dtypes