# Data ETL
ETL - Extract, Transform, Load: Gather raw data from various sources, clean and consolidate it, and then load it into a single centralized destination
- Extract data from various sources (databases, files, APIs, etc.)
- Transform the data by cleaning, formatting, or combining it as needed
- Load the processed data into a destination, such as a database, data warehouse, or analytics tool
ETL is a foundational step in data engineering and analytics pipelines, ensuring data is ready for analysis or further processing.

## Initialize

In [1]:
import pandas as pd
import numpy as np
# import polars as pl
import gdown
import os
import rich

In [2]:
# ## Onetime data downloading
# model_file_id = "15DamSAHtEUsLn2qwnwmJFgD7Djk1MizJ"
# data_folder = "c:/teaching/fall2025/data_science_bootcamp_lecture_1"
# gdown.download(
#     f"https://drive.google.com/uc?id={model_file_id}",
#     f'{data_folder}/.local/auto_policies_2017.csv', 
#     quiet=False)
# inference_file_id = "1ZppcSp8WMinV3iUdNUDapfDg3bMEqMat"
# gdown.download(
#     f"https://drive.google.com/uc?id={inference_file_id}",
#     f'{data_folder}/.local/auto_potential_customers_2018.csv', 
#     quiet=False)

## Extract Data

In [3]:
data_folder = "c:/teaching/fall2025/data_science_bootcamp_lecture_1"
# model_data = pd.read_csv(f'{data_folder}/.local/auto_policies_2017.csv')
model_data = pd.read_csv(f'{data_folder}/.local/synthetic_model_data.csv')
print( model_data.shape )
model_data.head(5)

(50000, 15)


Unnamed: 0,pol_number,pol_eff_dt,agecat,date_of_birth,credit_score,traffic_index,veh_age,veh_value,numclaims,claimcst0,annual_premium,gender,area,veh_body,claim_office
0,53703320.0,2017-07-19,-5.985015,1967-08-09,566.644937,72.104017,2.55748,2.289678,1,7228.00554,716.53,M,A,SEDAN,A
1,55656910.0,2017-06-29,-4.35375,1966-04-15,691.515821,107.602505,2.792034,1.826403,0,0.0,716.53,F,C,SEDAN,Missing
2,56278460.0,2017-06-30,-3.997989,1972-08-14,614.973556,80.482879,2.452349,2.899493,0,0.0,716.53,M,A,STNWG,Missing
3,54910150.0,2017-07-03,-5.121608,1971-09-13,614.002052,85.560472,2.462938,2.702998,0,0.0,716.53,M,A,STNWG,Missing
4,53715610.0,2017-07-09,-5.724952,1966-04-03,682.43484,67.920249,2.644569,2.287953,0,0.0,716.53,M,A,SEDAN,Missing


In [4]:
# inference_data = pd.read_csv(f'{data_folder}/.local/auto_potential_customers_2018.csv')
inference_data = pd.read_csv(f'{data_folder}/.local/synthetic_inference_data.csv')
print( inference_data.shape )
inference_data.head(5)

(5000, 10)


Unnamed: 0,quote_number,agecat,date_of_birth,credit_score,traffic_index,veh_age,veh_value,gender,area,veh_body
0,54908970.0,-5.336328,1969-09-03,610.77872,88.917854,2.9005,1.518183,M,C,UTE
1,54020800.0,-4.865684,1968-06-01,647.422732,112.836114,2.772218,1.256718,F,C,SEDAN
2,55218730.0,-5.558392,1970-08-12,628.717026,100.190509,2.508692,2.023435,F,C,SEDAN
3,53524360.0,-7.677538,1978-04-11,587.018013,118.913016,1.858794,4.418014,F,C,STNWG
4,55229190.0,-5.302583,1968-05-13,620.66522,69.924154,2.721533,1.688835,M,A,HBACK


## Transform Data
### Visualize data

In [5]:
output_folder = "../.local/analysis_pipeline/data_etl"
os.makedirs(output_folder, exist_ok=True)

In [6]:
from ydata_profiling import ProfileReport

model_data_profile = ProfileReport(model_data, title="Model Data Profiling Report")

In [7]:
# model_data_profile.to_notebook_iframe()
model_data_profile.to_file(f"{output_folder}/model_data_profile_09162025.html") 

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 15/15 [00:00<00:00, 27.16it/s]


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

### Clean data

In [8]:
## Show column information for abnormal checking purpose
rich.print( model_data.info() )

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 15 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   pol_number      50000 non-null  float64
 1   pol_eff_dt      50000 non-null  object 
 2   agecat          50000 non-null  float64
 3   date_of_birth   50000 non-null  object 
 4   credit_score    50000 non-null  float64
 5   traffic_index   50000 non-null  float64
 6   veh_age         50000 non-null  float64
 7   veh_value       50000 non-null  float64
 8   numclaims       50000 non-null  int64  
 9   claimcst0       50000 non-null  float64
 10  annual_premium  50000 non-null  float64
 11  gender          50000 non-null  object 
 12  area            50000 non-null  object 
 13  veh_body        50000 non-null  object 
 14  claim_office    50000 non-null  object 
dtypes: float64(8), int64(1), object(6)
memory usage: 5.7+ MB


In [9]:
## Format data into correct types
# Convert 'date_of_birth' to datetime, coercing errors to NaT for invalid formats
model_data['pol_eff_dt'] = pd.to_datetime(model_data['pol_eff_dt'], format='%m/%d/%Y', errors='coerce')
model_data['date_of_birth'] = pd.to_datetime(model_data['date_of_birth'], format='%m/%d/%Y', errors='coerce')

In [10]:
## Validate 'date_of_birth'
# Show 'date_of_birth' for null 'agecat'
rich.print( model_data.loc[model_data['agecat'].isnull(), ['date_of_birth']].describe() )
# Find 'agecat' cutoff for each 'agecat'
# Here, we are giving an example on 'agecat' = 1
rich.print( model_data.loc[model_data['agecat'].isin([1]), ['date_of_birth']].describe() )

In [12]:
## Correct 'date_of_birth'
# Define bins and labels for age categories
bins = [
    pd.Timestamp('1900-01-01'), 
    pd.Timestamp('1950-01-01'), 
    pd.Timestamp('1960-01-01'),
    pd.Timestamp('1970-01-01'), 
    pd.Timestamp('1980-01-01'), 
    pd.Timestamp('1990-01-01'), 
    pd.Timestamp.max 
    ]
labels = [1, 2, 3, 4, 5, 6]
# Use pd.cut to assign agecat2 based on date_of_birth
model_data['agecat2'] = pd.cut(model_data['date_of_birth'], bins=bins, labels=labels, right=False)
# Ensure float type for consistency
model_data['agecat2'] = model_data['agecat2'].astype(float)  

### Consolidate Data

In [13]:
# Check category distribution of 'veh_body'
rich.print( model_data['veh_body'].value_counts() )
# Group 'MCARA', 'CONVT', 'BUS', and 'RDSTR' 'veh_body' as 'Other'
model_data.loc[model_data['veh_body'].isin(['MCARA','CONVT','BUS','RDSTR']), 'veh_body'] = 'Other'

In [14]:
# Add a year and quarter variable for later consistency check purpose
model_data['pol_year'] = model_data['pol_eff_dt'].dt.year
model_data['pol_quarter'] = model_data['pol_eff_dt'].dt.quarter

In [15]:
# Cap 'veh_value' and 'traffic_index' at the 99th percentile for outlier control
veh_value_cap = round(np.nanpercentile(model_data['veh_value'], 99), 3)
traffic_index_cap = round(np.nanpercentile(model_data['traffic_index'], 99), 3)
print(f"veh_value cap at 99th percentile: {veh_value_cap}")
print(f"traffic_index cap at 99th percentile: {traffic_index_cap}")
model_data['veh_value'] = model_data['veh_value'].clip(upper=veh_value_cap)
model_data['traffic_index'] = model_data['traffic_index'].clip(upper=traffic_index_cap)

veh_value cap at 99th percentile: 3.815
traffic_index cap at 99th percentile: 125.635


In [16]:
# Assume single vehicle policy and create a vehicle count variable
model_data['veh_cnt'] = 1

# Add policy year 
model_data['pol_year'] = 2017

### Carry the above steps to inference data 

In [17]:
import sys
sys.path.append(os.path.abspath(".."))
from analysis_tool_chest.data_etl import DataETL

output_folder = "../.local/analysis_pipeline/data_etl"
os.makedirs(output_folder, exist_ok=True)
etl = DataETL(inference_data)
etl.profile_analysis(output_folder, file_name="inference_data_profile_09162025.html", title="Inference Data Profiling Report")
processed_inference_data = etl.transform(cap_dict={'veh_value': veh_value_cap, 'traffic_index': traffic_index_cap})

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 10/10 [00:00<00:00, 223.92it/s]


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

veh_value capped at 3.815
traffic_index capped at 125.635


## Load Data Into Memory

In [18]:
output_folder = "../.local/analysis_pipeline/data_etl"
model_data.to_parquet(f"{output_folder}/model_data_09162025.parquet", index=False)
processed_inference_data.to_parquet(f"{output_folder}/inference_data_09162025.parquet", index=False)