In [1]:
import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
import pandas as pd
import numpy as np
import copy

In [2]:
# Snowflake connection - DSC
with open('/etc/security/snowflake/' + "/rsa_plbiap01dy.p8", "rb") as key:
    p_key = serialization.load_pem_private_key(
        key.read(),
        password='snowflake'.encode(),
        backend=default_backend())

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
        user='plbiap01dy',
        account='hfsg_prod.us-east-1.privatelink',
        private_key=pkb,
        warehouse='DSC_PLBI_PRD_MFG_WHS',
        role='plbiap01dy_prd_pii_role'
)

# Snowflake queries
query_scoremart = """select * FROM DSC_PLDS_DB.APP_AUTOMATA_PRD.PREVAIL_HOME_ULM_QUOTE_AGG_NB_QUALITY where trans_dt between '2022-01-01' and '2024-12-31' and issue_ind = 'Y'"""

# Get data from Snowflake
cs = ctx.cursor()
try:
    cs.execute(query_scoremart)
    df = cs.fetch_pandas_all()
finally:
    cs.close()
ctx.close()

df.shape

(122238, 94)

In [3]:
# calculate relativities
all_state_mean_lr_dict = df.groupby('PRIMARYRATINGSTATE').agg(all_state_mean_lr_dict = ('EXPECTED_LOSS_RATIO', 'mean')).to_dict()
df["all_state_lr_mean"] = df["PRIMARYRATINGSTATE"].map(all_state_mean_lr_dict['all_state_mean_lr_dict'])
df['ULM_eLRR'] = df['EXPECTED_LOSS_RATIO'] / df['all_state_lr_mean']

all_state_mean_ol_lr_dict = df.groupby('PRIMARYRATINGSTATE').agg(all_state_mean_ol_lr_dict = ('OL_EXPECTED_LOSS_RATIO', 'mean')).to_dict()
df["all_state_ol_lr_mean"] = df["PRIMARYRATINGSTATE"].map(all_state_mean_ol_lr_dict['all_state_mean_ol_lr_dict'])
df['ULM_OL_eLRR'] = df['OL_EXPECTED_LOSS_RATIO'] / df['all_state_ol_lr_mean']

In [4]:
def summarize_data(dataframe, by_vars, aggdict, writer, outsheet):
    
    # create deep copy
    df_tmp = copy.deepcopy(dataframe)        
            
    # summarize
    table = df_tmp.pivot_table(index = by_vars, aggfunc = aggdict).reset_index()
    
    # write to excel
    table.to_excel(writer, sheet_name=outsheet, index = False)
    # writer.save()

In [5]:
# aggregations
aggregation_dict = {'QCN': 'count',
#           'EXPECTED_LOSS_RATIO': 'mean', 
#           'OL_EXPECTED_LOSS_RATIO': 'mean', 
           'ULM_eLRR': 'mean',
#           'ULM_OL_eLRR': 'mean',
           }

In [6]:
# insurance score deciles
df.loc[(df['INS_SCR_CD'] ==0)   | (df['INS_SCR_CD'] >= 998), 'cv_decile'] = 'NH/NS'
df.loc[(df['INS_SCR_CD'] > 0)   & (df['INS_SCR_CD'] <= 481), 'cv_decile'] = 'Decile 10'
df.loc[(df['INS_SCR_CD'] > 481) & (df['INS_SCR_CD'] <= 598), 'cv_decile'] = 'Decile 09'
df.loc[(df['INS_SCR_CD'] > 598) & (df['INS_SCR_CD'] <= 673), 'cv_decile'] = 'Decile 08'
df.loc[(df['INS_SCR_CD'] > 673) & (df['INS_SCR_CD'] <= 717), 'cv_decile'] = 'Decile 07'
df.loc[(df['INS_SCR_CD'] > 717) & (df['INS_SCR_CD'] <= 746), 'cv_decile'] = 'Decile 06'
df.loc[(df['INS_SCR_CD'] > 746) & (df['INS_SCR_CD'] <= 768), 'cv_decile'] = 'Decile 05'
df.loc[(df['INS_SCR_CD'] > 768) & (df['INS_SCR_CD'] <= 788), 'cv_decile'] = 'Decile 04'
df.loc[(df['INS_SCR_CD'] > 788) & (df['INS_SCR_CD'] <= 809), 'cv_decile'] = 'Decile 03'
df.loc[(df['INS_SCR_CD'] > 809) & (df['INS_SCR_CD'] <= 833), 'cv_decile'] = 'Decile 02'
df.loc[(df['INS_SCR_CD'] > 833) & (df['INS_SCR_CD'] <  998), 'cv_decile'] = 'Decile 01'
df['cv_decile'].value_counts()

cv_decile
Decile 03    15356
Decile 02    14946
Decile 01    14865
Decile 04    14179
Decile 05    13701
Decile 06    13207
Decile 07    12277
Decile 08    10802
Decile 09     8075
Decile 10     3917
NH/NS          913
Name: count, dtype: int64

In [7]:
# insurance score quintiles
df.loc[(df['cv_decile'] == "Decile 01") | (df['cv_decile'] == "Decile 02"), 'cv_quintile'] = 'Quintile 1'
df.loc[(df['cv_decile'] == "Decile 03") | (df['cv_decile'] == "Decile 04"), 'cv_quintile'] = 'Quintile 2'
df.loc[(df['cv_decile'] == "Decile 05") | (df['cv_decile'] == "Decile 06"), 'cv_quintile'] = 'Quintile 3'
df.loc[(df['cv_decile'] == "Decile 07") | (df['cv_decile'] == "Decile 08"), 'cv_quintile'] = 'Quintile 4'
df.loc[(df['cv_decile'] == "Decile 09") | (df['cv_decile'] == "Decile 10"), 'cv_quintile'] = 'Quintile 5'
df.loc[(df['cv_decile'] == "NH/NS") , 'cv_quintile'] = 'NH/NS'
df['cv_quintile'].value_counts()

cv_quintile
Quintile 1    29811
Quintile 2    29535
Quintile 3    26908
Quintile 4    23079
Quintile 5    11992
NH/NS           913
Name: count, dtype: int64

In [8]:
# coverage A
df['cov_a'] = 'no_coverage'
df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] = pd.to_numeric(df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'].replace('None', 0))

df.loc[(df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] < 300000), 'cov_a'] = '<$300k'
df.loc[(df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] >= 300000) & (df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] < 450000), 'cov_a'] = '$300k-$449k'
df.loc[(df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] >= 450000) & (df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] < 650000), 'cov_a'] = '$450k-$649k'
df.loc[(df['CV_HIGI_HOMEARD_DWELLING_COV_LMT'] >= 650000), 'cov_a'] = '$650k+'

df['cov_a'].value_counts()

cov_a
$300k-$449k    51620
<$300k         41849
$450k-$649k    22208
$650k+          6561
Name: count, dtype: int64

In [9]:
# Total Bath Count
df['CV_HIGV_ATM_SA_NBR_BATH_CALC_2'] = pd.to_numeric(df['CV_HIGV_ATM_SA_NBR_BATH_CALC_2'].replace('None', 0))
df.loc[(df['CV_HIGV_ATM_SA_NBR_BATH_CALC_2'] == 1), 'ttl_bath_cnt'] = '1'
df.loc[(df['CV_HIGV_ATM_SA_NBR_BATH_CALC_2'] == 2), 'ttl_bath_cnt'] = '2'
df.loc[(df['CV_HIGV_ATM_SA_NBR_BATH_CALC_2'] == 3), 'ttl_bath_cnt'] = '3'
df.loc[(df['CV_HIGV_ATM_SA_NBR_BATH_CALC_2'] >= 4), 'ttl_bath_cnt'] = '4+'
df['ttl_bath_cnt'].value_counts()

ttl_bath_cnt
2     64087
3     30669
1     19649
4+     7833
Name: count, dtype: int64

In [10]:
# Roof Age
df['PROP_ROOF_AGE'] = pd.to_numeric(df['PROP_ROOF_AGE'].replace('None', 0))
df.loc[(df['PROP_ROOF_AGE'] <= 5), 'roof_age'] = '5 years or less'
df.loc[(df['PROP_ROOF_AGE'] >= 6) & (df['PROP_ROOF_AGE'] <= 10), 'roof_age'] = '6-10 years'
df.loc[(df['PROP_ROOF_AGE'] >= 11) & (df['PROP_ROOF_AGE'] <= 20), 'roof_age'] = '11-20 years'
df.loc[(df['PROP_ROOF_AGE'] >= 21), 'roof_age'] = '21+ years'
df['roof_age'].value_counts()

roof_age
6-10 years         58330
5 years or less    35870
11-20 years        19185
21+ years           8853
Name: count, dtype: int64

In [11]:
# Construction Material
df['const_mat'] = 'e. other'
df.loc[(df['CF_HIGI_HOMEARD_RMP30_HOUSE_CODE_CONSTRUCTIN_ARD'] == 'FRAME'), 'const_mat'] = 'a. frame'
df.loc[(df['CF_HIGI_HOMEARD_RMP30_HOUSE_CODE_CONSTRUCTIN_ARD'] == 'BRKVEN'), 'const_mat'] = 'b. masonry/veneer'
df.loc[(df['CF_HIGI_HOMEARD_RMP30_HOUSE_CODE_CONSTRUCTIN_ARD'] == 'MASON'), 'const_mat'] = 'c. masonry'
df['const_mat'].value_counts()

const_mat
a. frame             80408
b. masonry/veneer    33178
c. masonry            8454
e. other               198
Name: count, dtype: int64

In [12]:
# Dwelling Age
df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] = pd.to_numeric(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'].replace('None', 0))
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=0)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=10), 'dwell_age_grp'] = '0-10'
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=11)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=20), 'dwell_age_grp'] = '11-20'
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=21)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=30), 'dwell_age_grp'] = '21-30'
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=31)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=40), 'dwell_age_grp'] = '31-40'
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=41)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=50), 'dwell_age_grp'] = '41-50'
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=51)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=60), 'dwell_age_grp'] = '51-60'
df.loc[(df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] >=61)  & (df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP'] <=70), 'dwell_age_grp'] = '61-70'
df.loc[df['CV_HIGI_HOMEARD_RMP30_DWELLING_AGE_CAP']  > 70, 'dwell_age_grp'] = '70+'
df['dwell_age_grp'].value_counts()

dwell_age_grp
70+      19711
21-30    19105
61-70    15606
11-20    15372
41-50    14411
51-60    13525
31-40    12951
0-10     11552
Name: count, dtype: int64

In [13]:
# Square Foot
df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] = pd.to_numeric(df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'].replace('None', 0))
df.loc[(df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] >= 0) & (df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] <= 1500), 'sq_ft'] = 'a. 0-1500'
df.loc[(df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] >= 1501) & (df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] <= 2000), 'sq_ft'] = 'b. 1501-2000'
df.loc[(df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] >= 2001) & (df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] <= 2500), 'sq_ft'] = 'c. 2001-2500'
df.loc[(df['CV_HIGI_HOMEARD_CMB_SQFTGRP2_ARD'] >= 2501), 'sq_ft'] = 'e. 2501+'
df['sq_ft'].value_counts()

sq_ft
a. 0-1500       50112
b. 1501-2000    37379
c. 2001-2500    20047
e. 2501+        14700
Name: count, dtype: int64

In [14]:
writer = pd.ExcelWriter('./Q4_Home_Fatima.xlsx', engine = 'xlsxwriter')
summarize_data(dataframe = df, by_vars = ['cv_decile', ], aggdict = aggregation_dict, writer = writer, outsheet = 'cv_decile')
summarize_data(dataframe = df, by_vars = ['cv_quintile', ], aggdict = aggregation_dict, writer = writer, outsheet = 'cv_quintile')
summarize_data(dataframe = df, by_vars = ['ROOF_COND_DERIV_SCR_NUM', ], aggdict = aggregation_dict, writer = writer, outsheet = 'roof_score')
summarize_data(dataframe = df, by_vars = ['cov_a', ], aggdict = aggregation_dict, writer = writer, outsheet = 'cov_a')
summarize_data(dataframe = df, by_vars = ['ttl_bath_cnt', ], aggdict = aggregation_dict, writer = writer, outsheet = 'ttl_bath_cnt')
summarize_data(dataframe = df, by_vars = ['roof_age', ], aggdict = aggregation_dict, writer = writer, outsheet = 'roof_age')
summarize_data(dataframe = df, by_vars = ['const_mat', ], aggdict = aggregation_dict, writer = writer, outsheet = 'const_mat')
summarize_data(dataframe = df, by_vars = ['dwell_age_grp', ], aggdict = aggregation_dict, writer = writer, outsheet = 'dwell_age_grp')
summarize_data(dataframe = df, by_vars = ['sq_ft', ], aggdict = aggregation_dict, writer = writer, outsheet = 'sq_ft')
writer.close()