In [2]:
import json
import yaml
import pyspark.sql.functions as f
from src.utils import dataframe_utils
from src.utils import request_utils
from urllib.parse import urlparse
import os
from datetime import datetime

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/09 11:49:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [16]:
# Read config yaml file
with open("api_config.yaml") as f:
    config_data=yaml.safe_load(f)
# this is just for printing
json_data=json.dumps(config_data,indent=2)
print(json_data)

{
  "apis": {
    "health_insurance_rate_puf": {
      "description": "Public Use File API for Health Rate Data",
      "version": 1,
      "endpoint": "https://data.healthcare.gov/api/1/metastore/schemas/dataset/items/672d5f6a-b8a7-4ebe-87f6-67db641e192d",
      "method": "GET",
      "params": [],
      "response": {
        "format": "JSON"
      }
    }
  }
}


In [17]:
rate_api_url=config_data['apis']['health_insurance_rate_puf']['endpoint']

meta_data = request_utils.make_api_call(rate_api_url)
with open("meta_data_files/health_insurance_rate_puf.json",'w') as meta_file:
    json.dump(meta_data.json(),meta_file,indent=2)





In [18]:
# extract download link and format 
with open("meta_data_files/health_insurance_rate_puf.json",'r') as meta_file:
    meta_data=json.load(meta_file)
    format=meta_data['distribution'][0]['format']
    data_url=meta_data['distribution'][0]['downloadURL']
    print(data_url)
# make API call 
data_response = request_utils.make_api_call(data_url)

# Step 2: Extract filename from URL
parsed_url = urlparse(data_url)
filename = os.path.basename(parsed_url.path)  # gets 'Rate_PUF.csv'
# Optional: Verify it's a good response
# Step 3:Save initial Raw file
if data_response.status_code == 200:
    with open(f"data/bronze_{filename}", "wb") as f:
        f.write(data_response.content)
    print("✅ File downloaded successfully.")
else:
    raise Exception(f"❌ Failed to download file. Status: {data_response.status_code}")








https://data.healthcare.gov/datafile/py2025/Rate_PUF.csv
✅ File downloaded successfully.


## Handle null values or missing -Silver layer

In [None]:
filename="Rate_PUF.csv"
format='csv'
# 1: Read using PySpark
read_file=dataframe_utils.read_data_spark(file_path=f"data/bronze_{filename}",file_format=format,header=True,inferSchema=True)
# Convert all columns to string type
rates_df = read_file.select([f.col(c).cast("string") for c in read_file.columns])

# Fill all null values with empty strings
rates_df = rates_df.fillna("")

# Get today's date in YYYYMMDD format
today_str = datetime.today().strftime("%Y%m%d")

# Add column with the same date for all rows
rates_df = rates_df.withColumn("ImportDate", f.lit(today_str))
rates_df.show(5)

                                                                                

+------------+---------+--------+----------+----------+-----------------+------------------+--------------+-------------+-------+-------------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+
|BusinessYear|StateCode|IssuerId|SourceName|ImportDate|RateEffectiveDate|RateExpirationDate|        PlanId| RatingAreaId|Tobacco|          Age|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|
+------------+---------+--------+----------+----------+-----------------+------------------+--------------+-------------+-------+-------------+--------------+---------------------+------+--------------------------------+---------------------------------+

In [None]:
# save as Silver dataset
dataframe_utils.write_data_spark(file_path=f"data/silver_{filename}",file_format='csv',df=rates_df,mode='append',partition_by=['ImportDate','StateCode','Age'],header=True)

                                                                                

## Gold Layer transformations

In [4]:
# 1: Read using PySpark
filename="Rate_PUF.csv"
format='csv'
spark_rates_df=dataframe_utils.read_data_spark(file_path=f"data/silver_{filename}",file_format=format,header=True,inferSchema=False)
spark_rates_df.printSchema()

                                                                                

root
 |-- BusinessYear: string (nullable = true)
 |-- IssuerId: string (nullable = true)
 |-- SourceName: string (nullable = true)
 |-- RateEffectiveDate: string (nullable = true)
 |-- RateExpirationDate: string (nullable = true)
 |-- PlanId: string (nullable = true)
 |-- RatingAreaId: string (nullable = true)
 |-- Tobacco: string (nullable = true)
 |-- IndividualRate: string (nullable = true)
 |-- IndividualTobaccoRate: string (nullable = true)
 |-- Couple: string (nullable = true)
 |-- PrimarySubscriberAndOneDependent: string (nullable = true)
 |-- PrimarySubscriberAndTwoDependents: string (nullable = true)
 |-- PrimarySubscriberAndThreeOrMoreDependents: string (nullable = true)
 |-- CoupleAndOneDependent: string (nullable = true)
 |-- CoupleAndTwoDependents: string (nullable = true)
 |-- CoupleAndThreeOrMoreDependents: string (nullable = true)
 |-- ImportDate: integer (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- Age: string (nullable = true)



In [5]:
columns_to_use = [col for col in spark_rates_df.columns if col != "ImportDate"]

# Build full_text expression safely with explicit string casting
full_text_expr = f.concat_ws(" | ", *[
    f.concat(f.lit(f"{col_name}: "), f.coalesce(f.col(col_name).cast("string"), f.lit("")))
    for col_name in columns_to_use
])

spark_rates_df_gold = spark_rates_df.withColumn("full_text", full_text_expr)
spark_rates_df_gold = spark_rates_df_gold.withColumn("row_id", f.monotonically_increasing_id()).orderBy("row_id")
spark_rates_df_gold.show(truncate=False)



+------------+--------+----------+-----------------+------------------+--------------+--------------+-----------------------------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+----------+---------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|BusinessYear|IssuerId|SourceName|RateEffectiveDate|RateExpirationD

                                                                                

In [6]:
# save as Gold dataset
# save as Silver dataset
dataframe_utils.write_data_spark(file_path=f"data/gold_{filename}",file_format='csv',df=spark_rates_df_gold,mode='append',partition_by=['ImportDate','StateCode','Age'],header=True)

                                                                                

In [7]:
query={'age': 21, 'state': 'AK', 'tobacco': 'No Preference'}
filename = 'Rate_PUF.csv'
df= dataframe_utils.read_data_spark(file_path=f"data/gold_{filename}",file_format="csv",header=True,inferSchema=True)
print(df.show())
df=df.filter(df["Age"].isNotNull())
if query['age']:
    df=df.filter(df['Age'] == query['age'])
if query['state']:
    df=df.filter(df['StateCode'] == query['state'])
if query['tobacco']:
    df=df.filter(df['Tobacco']== query['tobacco'])

print(df.limit(10).toPandas().to_string(index=False))

                                                                                

+------------+--------+----------+-----------------+------------------+--------------+--------------+--------------------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+--------------------+------+----------+---------+-----------+
|BusinessYear|IssuerId|SourceName|RateEffectiveDate|RateExpirationDate|        PlanId|  RatingAreaId|             Tobacco|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|           full_text|row_id|ImportDate|StateCode|        Age|
+------------+--------+----------+-----------------+------------------+--------------+--------------+--------------------+--------------+---------------------+------+--------

In [None]:
from src.main.vectorstore_builder import load_vectorstore
from src.main.constants import VECTOR_STORE_DIR
from langchain_huggingface.embeddings import HuggingFaceEmbeddings
# : Embedding model
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = load_vectorstore(VECTOR_STORE_DIR,embedding=embedding)

query = "What are the rates for a 35-year-old non-tobacco user in NC?"
docs = db.similarity_search(query,k=2,filter={'state':'NC'})

print(docs)
results = db.similarity_search_by_vector(
    embedding=embedding.embed_query("what are the rates for a 45 age non-tobacco?"), k=3
)
print("****")
for doc in results:
    print(f"* {doc.page_content} [{doc.metadata}]")

[Document(id='57fc2fdf-342f-4cdf-96df-49d1b68a6680', metadata={'age': '62', 'state': 'NC', 'tobacco': 'Tobacco User/Non-Tobacco User'}, page_content='BusinessYear: 2025 | IssuerId: 17414 | SourceName: HIOS | RateEffectiveDate: 2025-01-01 | RateExpirationDate: 2025-12-31 | PlanId: 17414NC0010009 | RatingAreaId: Rating Area 1 | Tobacco: Tobacco User/Non-Tobacco User | IndividualRate: 1410.45 | IndividualTobaccoRate: 1692.54 | Couple:  | PrimarySubscriberAndOneDependent:  | PrimarySubscriberAndTwoDependents:  | PrimarySubscriberAndThreeOrMoreDependents:  | CoupleAndOneDependent:  | CoupleAndTwoDependents:  | CoupleAndThreeOrMoreDependents:  | StateCode: NC | Age: 62'), Document(id='1265caa7-a4a4-40ff-87b1-ddd5cae18c3d', metadata={'age': '62', 'state': 'NC', 'tobacco': 'Tobacco User/Non-Tobacco User'}, page_content='BusinessYear: 2025 | IssuerId: 61671 | SourceName: HIOS | RateEffectiveDate: 2025-01-01 | RateExpirationDate: 2025-12-31 | PlanId: 61671NC0100017 | RatingAreaId: Rating Area 9 

25/05/10 02:54:38 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3169815 ms exceeds timeout 120000 ms
25/05/10 02:54:38 WARN SparkContext: Killing executors is not supported by current scheduler.
25/05/10 02:54:44 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at 