### Imports

In [1]:
import os
import sys
import json
from datetime import datetime
import pyspark.sql.functions as fun

In [2]:
sys.path.append(os.path.abspath('..'))  # adds the parent folder to sys.path
from src.utils import dataframe_utils
from src.utils.path_utils import find_project_root,Path

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/16 15:44:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/16 15:44:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Configurations and Parameters

In [3]:
# parameters
base_dir='../'
config_path=None

In [4]:

if config_path is not None:
    BASE_DIR = Path(base_dir)
    config_path
else:
    # fallback: find project root and load config.json
    BASE_DIR = find_project_root()
    config_path = BASE_DIR / 'configurations' / 'config.json'

with open(config_path, 'r', encoding='utf-8') as f:
    config = json.load(f)

print(f"Loaded config from {config_path}")

Loaded config from /Users/vaishnavi/Desktop/Research/DataEngineering/health-insurance-rate-data-etl/configurations/config.json


In [5]:
downloaded_file_name='Rate_PUF.csv'
silver_path = os.path.join(BASE_DIR,config["silver_parquet_path"], downloaded_file_name.split('.')[0])
gold_path = os.path.join(BASE_DIR,config["gold_parquet_path"], downloaded_file_name.split('.')[0])

### Gold layer transformation
#### combining all columns into single column 

In [6]:
gold_rates_df=dataframe_utils.read_data_spark(file_path=silver_path,
                                              file_format='parquet',
                                              header=True,
                                              inferSchema=True)
gold_rates_df.printSchema()

                                                                                

root
 |-- BusinessYear: string (nullable = true)
 |-- IssuerId: string (nullable = true)
 |-- SourceName: string (nullable = true)
 |-- RateEffectiveDate: string (nullable = true)
 |-- RateExpirationDate: string (nullable = true)
 |-- PlanId: string (nullable = true)
 |-- RatingAreaId: string (nullable = true)
 |-- Tobacco: string (nullable = true)
 |-- IndividualRate: string (nullable = true)
 |-- IndividualTobaccoRate: string (nullable = true)
 |-- Couple: string (nullable = true)
 |-- PrimarySubscriberAndOneDependent: string (nullable = true)
 |-- PrimarySubscriberAndTwoDependents: string (nullable = true)
 |-- PrimarySubscriberAndThreeOrMoreDependents: string (nullable = true)
 |-- CoupleAndOneDependent: string (nullable = true)
 |-- CoupleAndTwoDependents: string (nullable = true)
 |-- CoupleAndThreeOrMoreDependents: string (nullable = true)
 |-- ImportDate: integer (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- Age: string (nullable = true)



In [None]:
columns_to_use = [col for col in gold_rates_df.columns if col != "ImportDate"]

# Build full_text expression safely with explicit string casting
full_text_expr = fun.concat_ws(" | ", *[
                                        fun.concat(fun.lit(f"{col_name}: "), 
                                        fun.coalesce(fun.col(col_name).cast("string"), fun.lit("")))
                                        for col_name in columns_to_use
                            ])

gold_rates_df = gold_rates_df.withColumn("full_text", full_text_expr)
gold_rates_df = gold_rates_df.withColumn("row_id", fun.monotonically_increasing_id()).orderBy("row_id")

                                                                                

+------------+--------+----------+-----------------+------------------+--------------+--------------+--------------------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+----------+---------+---+--------------------+------+
|BusinessYear|IssuerId|SourceName|RateEffectiveDate|RateExpirationDate|        PlanId|  RatingAreaId|             Tobacco|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|ImportDate|StateCode|Age|           full_text|row_id|
+------------+--------+----------+-----------------+------------------+--------------+--------------+--------------------+--------------+---------------------+------+------------------------

In [8]:
# save as Gold dataset as both parquet

dataframe_utils.write_data_spark(file_path=gold_path,
                                file_format='parquet',
                                df=gold_rates_df,
                                mode='append',
                                partition_by=['ImportDate','StateCode','Age'],
                                header=True)

print(f"Saved Gold parquet at {gold_path}")

25/06/16 15:46:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/16 15:46:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/16 15:46:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/16 15:46:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/16 15:46:39 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/16 15:46:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/06/16 15:46:41 WARN MemoryManager: Total allocation exceeds 95.00% 

Saved Gold parquet at /Users/vaishnavi/Desktop/Research/DataEngineering/health-insurance-rate-data-etl/data/gold/Rate_PUF
