In [None]:
#import pandas & set display setting

import pandas as pd


In [None]:
pd.options.display.max_rows = 20

In [None]:
#to be compared for periodic updates

lastEtlDate = pd.to_datetime('20200430', format='%Y%m%d')
lastEtlDate = lastEtlDate.date()
lastEtlDate

## Extract IHP Data

In [None]:
#extract IHP Programs file

ihp_cols = ['disasterNumber','state','county','city','zipCode','totalValidRegistrations','ihpReferrals','ihpEligible',
           'ihpAmount','haReferrals','haEligible','haAmount','onaReferrals','onaEligible','onaAmount','lastRefresh','id']

ihpProgram_src1 = pd.read_csv('https://www.fema.gov/api/open/v1/RegistrationIntakeIndividualsHouseholdPrograms.csv',usecols=ihp_cols)
ihpProgram_src1.sort_values('disasterNumber')
ihpProgram_src1

In [None]:
#for comparison with lastEtlDate to handle next ETL run (not implemented here further)

ihpProgram_src1['lastRefresh'] = pd.to_datetime(ihpProgram_src1['lastRefresh'].str.slice(0, 10, 1))

In [None]:
#copy of source for relief fact (for debugging to avoid re-reading file)
ihpProgram_src = ihpProgram_src1

In [None]:
ihpProgram_src.info()

In [None]:
#since there are so few,
#drop na records that would not key to other dimensional data

na_keys = ['disasterNumber','county','city','zipCode']
ihpProgram_src.dropna(subset=na_keys,inplace=True)

In [None]:
ihpProgram_src.info()

## Extract Disaster Data

In [None]:
#listed columns for vertical partition of data

disaster_cols = ['disasterNumber','declarationType','incidentType','designatedArea','declarationDate']

In [None]:
#extract declared disasters file (OpenFEMA, 'Disaster', 2020)

disasterDeclarations_src = pd.read_csv('https://www.fema.gov/api/open/v2/DisasterDeclarationsSummaries.csv',
                                       usecols=disaster_cols)
disasterDeclarations_src.sort_values('disasterNumber')
disasterDeclarations_src.info()

# Transforming IHP & Disaster Data

In [None]:
#remove duplicates
disasterDeclarations_src.drop_duplicates(subset='disasterNumber',inplace=True)
disasterDeclarations_src.info()

In [None]:
disasterDeclarations_src['incidentType'] = disasterDeclarations_src['incidentType'].str.replace("'","''")
disasterDeclarations_src['designatedArea'] = disasterDeclarations_src['designatedArea'].str.replace("'","''")

In [None]:
#add declaration date for date dimension data

ihpProgram_src['calendarDate'] = ihpProgram_src.merge(disasterDeclarations_src,on='disasterNumber',how='inner')['declarationDate']
ihpProgram_src.info()

In [None]:
#drop na rows (na date rows are from recent data that has yet to be loaded into the disaster CSV file)

ihpProgram_src.dropna(inplace=True)
ihpProgram_src.info()

In [None]:
#conforming/formatting string column data

ihpProgram_src['county'] = ihpProgram_src['county'].str.replace('(','').str.replace(')','').str.replace("'","''")
ihpProgram_src['city'] = ihpProgram_src['city'].str.title().str.replace("\\",'').str.replace("]",'').str.replace("'","''")
ihpProgram_src['calendarDate'] = ihpProgram_src['calendarDate'].str.slice(0, 10, 1)


In [None]:
int_measures = ['totalValidRegistrations','ihpReferrals','ihpEligible','haReferrals','haEligible','onaReferrals','onaEligible']

In [None]:
ihpProgram_src[int_measures] = ihpProgram_src[int_measures].astype(int)

In [None]:
ihpProgram_src.info()

# Loading

In [None]:
#open connection with db

import pyodbc
conn = pyodbc.connect('Driver={SQL Server};'
                     'Server=MARTINPC-01;'
                     'Database=cs689TermProj;'
                     'Trusted_Connection=yes;')

cursor = conn.cursor()

## Load Disaster Dimension

In [None]:
disaster_dimension = disasterDeclarations_src.loc[:,disaster_cols[:-1]]

In [None]:
disaster_dimension.info()

In [None]:
#populate disaster dimension

for i, row in disaster_dimension.iterrows():
    disaster_sql = "INSERT INTO cs689TermProj.dbo.Disaster_Dimension(disasterId,\
    femaDisasterNumber,incidentType,designatedArea) VALUES \
    (NEXT VALUE FOR disaster_PK,'" + str(row.disasterNumber) + "','" + str(row.incidentType) + "', '" + str(row.designatedArea) + "');"
    
    #print(disaster_sql)
    cursor.execute(disaster_sql)

## Load Location Dimension

In [None]:
location_cols = ['county','state','city','zipCode']

In [None]:
location_dimension = ihpProgram_src.loc[:,location_cols].drop_duplicates(keep='first')

In [None]:
location_dimension.sort_values(['state','county','city'])
location_dimension['locationIndex'] = location_dimension.index
location_dimension.info()

In [None]:
#index created as synthetic key to improve speed in connecting with fact table

ihpProgram_src['locationIndex'] = ihpProgram_src.merge(location_dimension, on=location_cols,how='left',indicator=True)['locationIndex']

In [None]:
for i, row in location_dimension.iterrows():
    location_sql = "INSERT INTO cs689TermProj.dbo.Location_Dimension(locationId,locState,locCounty,locCity,locZipCode,locationIndex) VALUES (NEXT VALUE FOR location_PK,'" + str(row.state) + "','" + str(row.county) + "','" + str(row.city) + "','" + str(row.zipCode) + "','" + str(row.locationIndex) + "');"
    
    #print(location_sql)
    cursor.execute(location_sql)

## Creation & Loading of the DateTime Dimension

In [None]:
#create date dimension data w/ pandas datetime (Pandas, 'Time Series / Date functionality', n.d.)
#used federal government FYE 9/30 per FEMA website

dateTime_dimension = pd.DataFrame({'calendarDate': pd.date_range('1953-01-01','2022-12-31')},)
dateTime_dimension['dayOfWeek'] = dateTime_dimension.calendarDate.dt.weekday_name
dateTime_dimension['calendarDayOfMonth'] = dateTime_dimension.calendarDate.dt.day
dateTime_dimension['calendarMonthOfYear'] = dateTime_dimension.calendarDate.dt.month
dateTime_dimension['calendarWeekOfYear'] = dateTime_dimension.calendarDate.dt.weekofyear
dateTime_dimension['calendarYear'] = dateTime_dimension.calendarDate.dt.year
dateTime_dimension['fiscalQuarter'] = pd.PeriodIndex(dateTime_dimension['calendarDate'], freq='Q-SEP').quarter
dateTime_dimension['fiscalYear'] = pd.PeriodIndex(dateTime_dimension['calendarDate'], freq='Q-SEP').year

In [None]:
dateTime_dimension.info()

In [None]:
cursor.execute('commit')

In [None]:
#populate dateTime dimension

for i, row in dateTime_dimension.iterrows():
    dateTime_sql = "INSERT INTO cs689TermProj.dbo.DateTime_Dimension(dateTimeId, calendarDate, \
    calendarDayOfWeek, calendarDayOfMonth, calendarMonthOfYear, calendarWeekOfYear, calendarYear, fiscalQuarter, fiscalYear)\
    VALUES (NEXT VALUE FOR dateTime_PK,'" + str(row.calendarDate) + "','" + str(row.dayOfWeek) + "',\
    '" + str(row.calendarDayOfMonth) + "','" + str(row.calendarMonthOfYear) + "','" + str(row.calendarWeekOfYear) + "',\
    '" + str(row.calendarYear) + "','" + str(row.fiscalQuarter) + "','" + str(row.fiscalYear) + "');"
    
    #print(dateTime_sql)
    cursor.execute(dateTime_sql)

## Load Relief Fact Data

In [None]:
#get synthetic keys from DBMS

location_keys = pd.read_sql_query(
    'SELECT locationId, locationIndex FROM dbo.Location_Dimension',conn)
disaster_keys = pd.read_sql_query(
    'SELECT disasterId,femaDisasterNumber AS disasterNumber FROM dbo.Disaster_Dimension',conn)
dateTime_keys = pd.read_sql_query(
    'SELECT dateTimeId,calendarDate FROM dbo.DateTime_Dimension',conn)

In [None]:
#add synthetic key columns

ihpProgram_src['disasterId'] = ihpProgram_src.merge(disaster_keys,on='disasterNumber',how='left',indicator=True)['disasterId']
ihpProgram_src['locationId'] = ihpProgram_src.merge(location_keys,on='locationIndex',how='left',indicator=True)['locationId']
ihpProgram_src['dateTimeId'] = ihpProgram_src.merge(dateTime_keys, on='calendarDate',how='left',indicator=True)['dateTimeId']

In [None]:
relief_cols = ['disasterId','locationId','dateTimeId','totalValidRegistrations',
               'ihpReferrals','ihpEligible','ihpAmount','haReferrals','haEligible',
               'haAmount','onaReferrals','onaEligible','onaAmount']

In [None]:
relief_fact = ihpProgram_src.loc[:,relief_cols]

In [None]:
relief_fact.info()

In [None]:
relief_fact.dropna(inplace=True)

In [None]:
relief_fact.info()

In [None]:
for i,row in relief_fact.iterrows():
    relief_sql = "INSERT INTO cs689TermProj.dbo.Relief_Fact(disasterId,locationId,dateTimeId,\
    totalValidRegistrations,ihpReferrals,ihpEligible,ihpAmount,haReferrals,haEligible,haAmount,\
    onaReferrals,onaEligible,onaAmount) VALUES ('" + str(row.disasterId) + "',\
    '" + str(row.locationId) + "','" + str(row.dateTimeId) + "','" + str(row.totalValidRegistrations.astype(int)) + "',\
    '" + str(row.ihpReferrals.astype(int)) + "','" + str(row.ihpEligible.astype(int)) + "','" + str(row.ihpAmount) + "',\
    '" + str(row.haReferrals.astype(int)) + "','" + str(row.haEligible.astype(int)) + "','" + str(row.haAmount) + "',\
    '" + str(row.onaReferrals.astype(int)) + "','" + str(row.onaEligible.astype(int)) + "','" + str(row.onaAmount) + "');"
    
    print(relief_sql)
    cursor.execute(relief_sql)

In [None]:
cursor.execute('commit')