Here we are going to:
1.) investigate and analyse the datasets and then 
2.) transform the datasets, group them to convert them into a tabular format and then merge them together - so that we have a homogeneous grouping of risks which we can run our GLMs on. 
3.) We're going to run a frequency GLM and a severity GLM on the combined tabular dataset to produce a loss cost model (a model which predicts how much a particular grouping would cost given the claims experience). 4.) Lastly, we'll evaulate the model's performance.

Here we are going to apply GLM to model the loss cost of claims for the NFIP. I'm going to ignore adjusting the claims inflation, removing large losses and extreme events and just focus on applying the GLM on to a tabular form of the data.

Analaysis Questions:
* How many nulls are in the datasets?
* what are the differences between the datasets?
* How have the claims developed over time? - loss ratio/claims frequency and severity
* How are the claims distributed?

In [3]:
from pyspark.sql.functions import count, avg, sum
import pandas as pd


In [4]:
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import StrMethodFormatter

In [5]:
%sql

/* Query the created temp table in a SQL cell */

select * from `claims_csv`

_c0,agriculturestructureindicator,asofdate,basefloodelevation,basementenclosurecrawlspacetype,reportedcity,condominiumindicator,policycount,countycode,crsdiscount,dateofloss,elevatedbuildingindicator,elevationcertificateindicator,elevationdifference,censustract,floodzone,houseworship,latitude,locationofcontents,longitude,lowestadjacentgrade,lowestfloorelevation,numberoffloorsintheinsuredbuilding,nonprofitindicator,obstructiontype,occupancytype,originalconstructiondate,originalnbdate,amountpaidonbuildingclaim,amountpaidoncontentsclaim,amountpaidonincreasedcostofcomplianceclaim,postfirmconstructionindicator,ratemethod,smallbusinessindicatorbuilding,state,totalbuildinginsurancecoverage,totalcontentsinsurancecoverage,yearofloss,reportedzipcode,primaryresidence
93633,,2019-05-31T00:00:00.000+0000,10.0,0,FAR ROCKAWAY,N,1,36081,0.0,2012-10-29T00:00:00.000+0000,N,3.0,0,36081101002,AE,,40.6,,-73.7,8.2,9.7,2,,,1,1986-05-01T00:00:00.000+0000,2012-08-01T00:00:00.000+0000,110068.9,,0.0,Y,1,,NY,250000,0,2012,11691,Y
93634,,2019-05-31T00:00:00.000+0000,,0,ISLAND PARK,N,1,36059,0.0,2012-10-29T00:00:00.000+0000,N,,999,36059416201,X,,40.6,Lowest floor above ground level and higher floors (No basement/enclosure/crawlspace/subgrade crawlspace),-73.6,,,2,,,1,1976-09-10T00:00:00.000+0000,2012-09-13T00:00:00.000+0000,112066.77,35246.02,0.0,N,Q,,NY,250000,100000,2012,11558,Y
93635,,2019-05-31T00:00:00.000+0000,,2,POINT LOOKOUT,N,1,36059,0.0,2012-10-29T00:00:00.000+0000,N,,999,36059416900,AE,,40.6,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-73.6,,,2,,,1,1947-01-01T00:00:00.000+0000,2012-09-10T00:00:00.000+0000,21052.48,0.0,0.0,N,1,,NY,250000,10500,2012,11569,Y
93636,,2019-05-31T00:00:00.000+0000,,2,FAR ROCKAWAY,N,1,36081,0.0,2012-10-28T00:00:00.000+0000,Y,3.0,999,36081095400,AE,,40.6,,-73.8,,,3,,54.0,1,1960-04-01T00:00:00.000+0000,2012-08-31T00:00:00.000+0000,12463.61,,0.0,N,1,,NY,50000,0,2012,11692,Y
93637,,2019-05-31T00:00:00.000+0000,,0,LAWRENCE,N,1,36081,0.15,2012-10-29T00:00:00.000+0000,N,,999,36059411600,AE,,40.6,,-73.7,,,2,,,1,1950-05-05T00:00:00.000+0000,2012-09-10T00:00:00.000+0000,55389.61,,0.0,N,1,,NY,250000,0,2012,11559,Y
93638,,2019-05-31T00:00:00.000+0000,,1,SHOREHAM,N,1,36103,0.0,2012-11-08T00:00:00.000+0000,N,,999,36103158405,X,,40.9,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-72.9,,,2,,,1,1968-08-01T00:00:00.000+0000,2012-09-05T00:00:00.000+0000,0.0,0.0,,N,7,,NY,250000,100000,2012,11786,Y
93639,,2019-05-31T00:00:00.000+0000,,2,SHIRLEY,N,1,36103,0.0,2012-10-29T00:00:00.000+0000,N,,999,36103159505,AE,,40.7,,-72.9,,,2,,,1,1972-08-30T00:00:00.000+0000,2012-09-09T00:00:00.000+0000,0.0,,,N,1,,NY,157300,0,2012,11967,Y
93640,,2019-05-31T00:00:00.000+0000,8.0,4,FAR ROCKAWAY,N,1,36081,0.0,2012-10-29T00:00:00.000+0000,N,3.0,0,36081097203,AE,,40.6,,-73.8,8.7,8.2,2,,,1,1952-08-20T00:00:00.000+0000,2012-09-09T00:00:00.000+0000,72172.7,,0.0,N,1,,NY,250000,0,2012,11691,Y
93641,,2019-05-31T00:00:00.000+0000,,3,ISLAND PARK,N,1,36059,0.0,2012-10-29T00:00:00.000+0000,Y,,999,36059416202,X,,40.6,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-73.7,,,2,,50.0,1,1930-09-10T00:00:00.000+0000,2012-09-11T00:00:00.000+0000,68446.05,52310.97,0.0,N,Q,,NY,250000,100000,2012,11558,Y
93642,,2019-05-31T00:00:00.000+0000,10.0,0,FAR ROCKAWAY,N,1,36081,0.0,2012-10-30T00:00:00.000+0000,N,3.0,2,36081099802,AE,,40.6,,-73.8,8.8,11.7,2,,,2,2008-05-14T00:00:00.000+0000,2012-09-04T00:00:00.000+0000,48949.84,,0.0,Y,1,,NY,250000,0,2012,11691,Y


In [6]:
claims = spark.sql('''
          select * from `claims_csv`
          '''
            )

In [7]:
claims

How many nulls are in each column?

In [9]:
for col in claims.columns:
  print(col, "\t", "with null values: ", claims.filter(claims[col].isNull()).count())

In [10]:
claims_df = claims.toPandas()

In [11]:
claims_df.shape


In [12]:
claims_df.describe()

Unnamed: 0,_c0,basefloodelevation,basementenclosurecrawlspacetype,policycount,countycode,crsdiscount,elevationdifference,censustract,latitude,longitude,lowestadjacentgrade,lowestfloorelevation,numberoffloorsintheinsuredbuilding,occupancytype,amountpaidonbuildingclaim,amountpaidoncontentsclaim,amountpaidonincreasedcostofcomplianceclaim,totalbuildinginsurancecoverage,totalcontentsinsurancecoverage,yearofloss
count,169312.0,15655.0,169310.0,169312.0,169040.0,169312.0,169312.0,167612.0,167653.0,167653.0,11655.0,15197.0,169009.0,169236.0,164518.0,110773.0,89937.0,169312.0,169312.0,169312.0
mean,84655.5,59.734155,1.132609,1.105391,36070.534087,0.008301,888.857878,36042820000.0,40.996322,-74.015079,43.474809,166.892143,2.264578,1.3391,28638.8,6210.707788,381.98936,161045.3,27690.43659,2000.843998
std,48876.308726,195.76633,1.196702,3.929104,382.214499,0.029593,313.545474,1075460000.0,0.743909,1.256027,227.629544,1064.500471,0.816117,0.813504,68595.66,22111.813233,3251.298495,702413.6,47061.344294,13.214408
min,0.0,-999.0,0.0,1.0,1003.0,0.0,-962.0,36003.0,28.0,-98.0,-14.5,-999.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1974.0
25%,42327.75,7.0,0.0,1.0,36059.0,0.0,999.0,36059410000.0,40.6,-74.1,3.9,7.0,2.0,1.0,542.6825,0.0,0.0,33000.0,0.0,1989.0
50%,84655.5,8.0,1.0,1.0,36081.0,0.0,999.0,36081090000.0,40.7,-73.7,6.7,9.2,2.0,1.0,6551.13,334.5,0.0,156000.0,6200.0,2010.0
75%,126983.25,10.1,2.0,1.0,36103.0,0.0,999.0,36103120000.0,41.0,-73.5,9.0,12.0,3.0,1.0,34940.0,3718.65,0.0,250000.0,37100.0,2012.0
max,169311.0,6666.0,4.0,452.0,54107.0,0.3,9998.0,36123150000.0,45.0,-68.7,9990.0,9996.0,6.0,6.0,9467720.0,500000.0,45000.0,109500000.0,1000000.0,2019.0


In [13]:
claims.select('amountpaidoncontentsclaim','amountpaidonbuildingclaim', 'totalcontentsinsurancecoverage',  'totalbuildinginsurancecoverage').describe().show()

In [14]:
claims.select("lowestadjacentgrade",	"lowestfloorelevation",	"numberoffloorsintheinsuredbuilding",	"occupancytype", "countycode"	,"elevationdifference"	, "censustract", "basefloodelevation"	).show()

In [15]:
%sql

/* Query the created temp table in a SQL cell */

select date_format(originalnbdate, "y"), yearofloss from `claims_csv`

"date_format(originalnbdate, y)",yearofloss
2012,2012
2012,2012
2012,2012
2012,2012
2012,2012
2012,2012
2012,2012
2012,2012
2012,2012
2012,2012


How does the loss ratio develop over time?

In [17]:
%sql

/* Query the created temp table in a SQL cell */

select (SUM(totalbuildinginsurancecoverage) - SUM(amountpaidoncontentsclaim))/SUM(totalbuildinginsurancecoverage) as loss_ratio_building, 
       (SUM(totalcontentsinsurancecoverage) - SUM(amountpaidoncontentsclaim))/SUM(totalcontentsinsurancecoverage) as loss_ratio_content, 
       (SUM(totalcontentsinsurancecoverage + totalbuildinginsurancecoverage) - SUM(amountpaidoncontentsclaim + amountpaidoncontentsclaim))/SUM(totalcontentsinsurancecoverage + totalbuildinginsurancecoverage) as loss_ratio_total, 
       yearofloss
       from `claims_csv`
       group by yearofloss
       order by yearofloss desc

loss_ratio_building,loss_ratio_content,loss_ratio_total,yearofloss
0.9995118364096438,0.9980935540506648,0.9992227062625272,2019
0.9870662181677752,0.9505310972595422,0.9794938310777508,2018
0.9921799823245452,0.9659565544349022,0.9872814943893816,2017
0.9954552252528348,0.9833926723273384,0.9928634457297386,2016
0.988425330398741,0.9600039080963578,0.9820463558955884,2015
0.9954770668595936,0.9840951826967148,0.9929569900051964,2014
0.9913585401459852,0.9590637024651496,0.98572951297717,2013
0.9694862344542242,0.8057127397767581,0.9472561489726068,2012
0.983553829553242,0.8983150712239317,0.9716869193375878,2011
0.9970442216535648,0.9874300593003692,0.9952138824069012,2010


How are the claims distributed?

In [19]:
%sql

/* Query the created temp table in a SQL cell */

select amountpaidoncontentsclaim from `claims_csv`


amountpaidoncontentsclaim
""
35246.02
0.0
""
""
0.0
""
""
52310.97
""


In [20]:
%sql

/* Query the created temp table in a SQL cell */

select amountpaidonbuildingclaim  from `claims_csv`

amountpaidonbuildingclaim
110068.9
112066.77
21052.48
12463.61
55389.61
0.0
0.0
72172.7
68446.05
48949.84


In [21]:
%sql
select totalbuildinginsurancecoverage  from `claims_csv`  


totalbuildinginsurancecoverage
250000
250000
250000
50000
250000
250000
157300
250000
250000
250000


In [22]:
%sql
select totalcontentsinsurancecoverage from `claims_csv` 

totalcontentsinsurancecoverage
0
100000
10500
0
0
100000
0
0
100000
0


In [23]:
%sql
select amountpaidoncontentsclaim/totalcontentsinsurancecoverage * 100 from `claims_csv` 

((amountpaidoncontentsclaim / CAST(totalcontentsinsurancecoverage AS DOUBLE)) * CAST(100 AS DOUBLE))
""
35.246019999999994
0.0
""
""
0.0
""
""
52.31097
""


In [24]:
%sql
select amountpaidonbuildingclaim/totalbuildinginsurancecoverage * 100 from `claims_csv` 

((amountpaidonbuildingclaim / CAST(totalbuildinginsurancecoverage AS DOUBLE)) * CAST(100 AS DOUBLE))
44.02756
44.826708
8.420992
24.92722
22.155844
0.0
0.0
28.86908
27.37842
19.579936


In [25]:
%sql

/* Query the created temp table in a SQL cell */

select log10(amountpaidonbuildingclaim)  from `claims_csv` where amountpaidonbuildingclaim != 0

LOG10(amountpaidonbuildingclaim)
5.041664626271039
5.049476854833603
4.323303263443607
4.095643850991368
4.7434283072762335
4.858372952674347
4.835348390195327
4.689751276583882
4.499139884319948
4.562892965892898


In [26]:
%sql

/* Query the created temp table in a SQL cell */

select log10(amountpaidoncontentsclaim)  from `claims_csv` where amountpaidoncontentsclaim != 0

LOG10(amountpaidoncontentsclaim)
4.547110083323859
4.718592773205446
3.591327331383886
3.066612874965157
3.562086971972978
5.592126993949545
3.658579902444917
4.9146069862443005
4.4980471398066735
4.340197746458496


In [27]:
%sql

/* Query the created temp table in a SQL cell */

select log10(amountpaidoncontentsclaim + amountpaidonbuildingclaim)  from `claims_csv` where amountpaidoncontentsclaim != 0 and amountpaidonbuildingclaim != 0 

LOG10((amountpaidoncontentsclaim + amountpaidonbuildingclaim))
5.16824045482148
5.081912387114187
4.606948909394316
4.328025498919923
4.33824686605051
5.949855866907868
4.661579674670928
5.329979424090736
5.0050921426106845
5.180646372289837


Interestingly the dataset seems to reveal impact of Hurrican Sandy hitting New York in 2012 and the frequency & severity of that event

In [29]:
%sql
select count(*), cast(yearofloss as string) from `claims_csv` group by cast(yearofloss as string) order by cast(yearofloss as string)

count(1),yearofloss
1,1974
2,1975
12,1976
2358,1977
4267,1978
10305,1979
7324,1980
2548,1981
2209,1982
2830,1983


In [30]:
%sql
select sum(amountpaidoncontentsclaim + amountpaidonbuildingclaim), cast(yearofloss as string) from `claims_csv` group by cast(yearofloss as string) order by cast(yearofloss as string)

sum((amountpaidoncontentsclaim + amountpaidonbuildingclaim)),yearofloss
,1974
3174.28,1975
72003.1,1976
3768556.960000002,1977
9025862.330000008,1978
21473386.060000043,1979
11946320.759999985,1980
4489439.099999998,1981
3075367.530000001,1982
4284077.35,1983


Almost all of the claims cost in the last 50 years have been generated by one major event - Hurricane Sandy.

Normally, you would try and model the extreme events like Hurricane's using a catastrophe model and you would bake that into your prices. However, I will just leave it in and model it as if it were not an extreme event.

In [32]:
%sql
select sum(amountpaidoncontentsclaim + amountpaidonbuildingclaim) from `claims_csv`

sum((amountpaidoncontentsclaim + amountpaidonbuildingclaim))
3311941612.490001


In [33]:
%sql

/* Query the created temp table in a SQL cell */

select * from `policies_parquet`

agriculturestructureindicator,basefloodelevation,basementenclosurecrawlspacetype,cancellationdateoffloodpolicy,censustract,condominiumindicator,construction,countycode,crsdiscount,deductibleamountinbuildingcoverage,deductibleamountincontentscoverage,elevatedbuildingindicator,elevationcertificateindicator,elevationdifference,federalpolicyfee,floodzone,hfiaasurcharge,houseofworshipindicator,latitude,locationofcontents,longitude,lowestadjacentgrade,lowestfloorelevation,nonprofitindicator,numberoffloorsininsuredbuilding,obstructiontype,occupancytype,originalconstructiondate,originalnbdate,policycost,policycount,policyeffectivedate,policyterminationdate,policytermindicator,postfirmconstructionindicator,primaryresidenceindicator,propertystate,reportedzipcode,ratemethod,regularemergencyprogramindicator,reportedcity,smallbusinessindicatorbuilding,totalbuildinginsurancecoverage,totalcontentsinsurancecoverage,totalinsurancepremiumofthepolicy
,,2,,36017970400.0,N,N,36017,0.0,G,G,N,1,999,50,A04,25,,42.5,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-75.5,,,,3,,1,1901-06-15,2010-06-04,1371,1,2016-06-04,2017-06-04,1,N,Y,NY,13815,1,R,NORWICH,,87100,5600,1127
,,0,,36059411100.0,N,N,36059,0.0,F,F,N,,999,50,X,250,N,40.6,Lowest floor above ground level and higher floors (No basement/enclosure/crawlspace/subgrade crawlspace),-73.8,,,N,2,,1,1960-10-15,2011-10-17,700,1,2016-10-17,2017-06-06,1,N,N,NY,11096,,R,INWOOD,N,250000,100000,348
,,1,,36059415402.0,N,N,36059,0.0,F,F,N,1,999,25,X,250,N,40.7,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-73.5,,,N,2,,1,1964-01-01,2006-07-20,724,1,2016-07-20,2017-07-20,1,N,N,NY,11710,7,R,BELLMORE,N,250000,100000,390
,,0,,36059415401.0,N,N,36059,0.0,2,2,N,1,999,45,A04,250,,40.7,Lowest floor only above ground level (No basement/enclosure/crawlspace/subgrade crawlspace),-73.5,,,,1,,1,1955-08-15,1994-11-23,1082,1,2016-02-10,2017-02-10,1,N,N,NY,11710,1,R,BELLMORE,,33600,10800,684
,,2,,36119007200.0,N,N,36119,0.1,2,,N,,999,50,A08,250,N,40.9,,-73.7,,,N,3,,1,1957-09-01,2014-08-29,5489,1,2016-08-29,2017-08-29,1,N,N,NY,10543,1,R,MAMARONECK,N,250000,0,4512
,,0,,36081091601.0,N,N,36081,0.0,5,,N,,999,50,AE,250,,40.6,,-73.9,,,,1,,1,1950-01-01,2014-07-18,2774,1,2016-07-18,2017-07-18,1,N,N,NY,11697,1,R,BREEZY POINT,N,167500,0,2151
N,6.0,0,,36103123600.0,L,N,36103,0.0,A,0,N,,0,400,AE,250,N,40.7,,-73.4,5.6,6.3,N,2,,3,1976-01-01,2015-09-02,6359,8,2016-09-02,2017-09-02,1,N,N,NY,11701,B,R,AMITYVILLE,N,1373900,0,4964
N,,2,,36103169902.0,N,N,36103,0.0,F,F,N,,999,25,X,250,N,40.9,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-72.6,,,N,2,,1,2000-03-02,2014-05-24,724,1,2016-05-24,2017-05-24,1,Y,N,NY,11970,7,R,SOUTH JAMESPORT,N,250000,100000,390
N,,1,,36061004700.0,N,N,36061,0.0,F,F,N,,999,25,X,250,N,40.7,Basement/Enclosure/Crawlspace/Subgrade Crawlspace and above,-74.0,,,N,3,,6,1974-12-30,2014-05-26,3375,1,2016-05-26,2017-05-26,1,N,N,NY,10012,7,R,NEW YORK,N,500000,100000,2696
,10.0,4,,36047032800.0,N,N,36047,0.0,F,0,N,3,-3,50,AE,250,N,40.6,,-74.0,7.7,6.6,N,3,80,1,1988-07-30,2012-09-23,1698,1,2016-09-23,2017-09-23,1,Y,N,NY,11224,2,R,BROOKLYN,N,250000,0,1216


In [34]:
policies = spark.sql('''
          select * from `policies_parquet`
          '''
            )

How many nulls are in each column?

In [36]:
for pol in policies.columns:
  print(pol, "\t", "with null values: ", policies.filter(policies[pol].isNull()).count())

In [37]:
for pol in policies.columns:
  pol_null_count = policies.filter(policies[pol].isNull()).count()
  if pol_null_count < 1000:
    print(pol, "\t", "with null values: ", pol_null_count)

In [38]:
for col in claims.columns:
  col_null_count = claims.filter(claims[col].isNull()).count()
  if col_null_count < 1000:
    print(col, "\t", "with null values: ", col_null_count)

In [39]:
policies_df = policies.toPandas()

In [40]:
policies_df.shape

In [41]:
policies_df.describe()

Unnamed: 0,agriculturestructureindicator,basefloodelevation,basementenclosurecrawlspacetype,cancellationdateoffloodpolicy,censustract,condominiumindicator,construction,countycode,crsdiscount,deductibleamountinbuildingcoverage,...,primaryresidenceindicator,propertystate,reportedzipcode,ratemethod,regularemergencyprogramindicator,reportedcity,smallbusinessindicatorbuilding,totalbuildinginsurancecoverage,totalcontentsinsurancecoverage,totalinsurancepremiumofthepolicy
count,315023,270647.0,1790807,159215,1787524,1790837,1790834,1790021,1790837.0,1753065,...,1790835,1790837,1790837,1727594,1790837,1790837,603250,1790837,1790837,1790837
unique,2,3097.0,5,3701,4598,5,2,64,6.0,13,...,2,1,1930,17,2,3553,2,4731,1855,13510
top,N,10.0,0,2015-12-18,36103147004,N,N,36059,0.0,1,...,Y,NY,11561,1,R,BROOKLYN,N,250000,100000,390
freq,314866,39999.0,652372,304,22499,1742595,1788604,508212,1643929.0,615925,...,1322061,1790837,87968,862515,1790834,101542,596332,1108070,717506,155765


In [42]:
claims_df = claims.toPandas()
claims_df.describe()

Unnamed: 0,_c0,basefloodelevation,basementenclosurecrawlspacetype,policycount,countycode,crsdiscount,elevationdifference,censustract,latitude,longitude,lowestadjacentgrade,lowestfloorelevation,numberoffloorsintheinsuredbuilding,occupancytype,amountpaidonbuildingclaim,amountpaidoncontentsclaim,amountpaidonincreasedcostofcomplianceclaim,totalbuildinginsurancecoverage,totalcontentsinsurancecoverage,yearofloss
count,169312.0,15655.0,169310.0,169312.0,169040.0,169312.0,169312.0,167612.0,167653.0,167653.0,11655.0,15197.0,169009.0,169236.0,164518.0,110773.0,89937.0,169312.0,169312.0,169312.0
mean,84655.5,59.734155,1.132609,1.105391,36070.534087,0.008301,888.857878,36042820000.0,40.996322,-74.015079,43.474809,166.892143,2.264578,1.3391,28638.8,6210.707788,381.98936,161045.3,27690.43659,2000.843998
std,48876.308726,195.76633,1.196702,3.929104,382.214499,0.029593,313.545474,1075460000.0,0.743909,1.256027,227.629544,1064.500471,0.816117,0.813504,68595.66,22111.813233,3251.298495,702413.6,47061.344294,13.214408
min,0.0,-999.0,0.0,1.0,1003.0,0.0,-962.0,36003.0,28.0,-98.0,-14.5,-999.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1974.0
25%,42327.75,7.0,0.0,1.0,36059.0,0.0,999.0,36059410000.0,40.6,-74.1,3.9,7.0,2.0,1.0,542.6825,0.0,0.0,33000.0,0.0,1989.0
50%,84655.5,8.0,1.0,1.0,36081.0,0.0,999.0,36081090000.0,40.7,-73.7,6.7,9.2,2.0,1.0,6551.13,334.5,0.0,156000.0,6200.0,2010.0
75%,126983.25,10.1,2.0,1.0,36103.0,0.0,999.0,36103120000.0,41.0,-73.5,9.0,12.0,3.0,1.0,34940.0,3718.65,0.0,250000.0,37100.0,2012.0
max,169311.0,6666.0,4.0,452.0,54107.0,0.3,9998.0,36123150000.0,45.0,-68.7,9990.0,9996.0,6.0,6.0,9467720.0,500000.0,45000.0,109500000.0,1000000.0,2019.0


Firstly, we're going to select variables which are present in both datasets so we can merge the datasets once we've grouped them. We are also going to remove all the null values from the columns we want to select and lastly, we are going to convert the dataset into a tabular format to merge the two.

In [44]:
policies.groupBy('agriculturestructureindicator', 'condominiumindicator', 'construction', 'primaryresidenceindicator', 'ratemethod', 'regularemergencyprogramindicator').agg(sum("policycount"), count("*")).show()

In [45]:
claims.groupBy('agriculturestructureindicator', 'condominiumindicator', 'primaryresidence', 'ratemethod').agg(sum("policycount"), count("*"), sum("amountpaidonbuildingclaim"), sum("amountpaidoncontentsclaim") ).show() 

In [46]:
claims.groupBy("agriculturestructureindicator").count().orderBy("agriculturestructureindicator").show()

In [47]:
claims.columns


In [48]:
policies.columns

In [49]:
var_list = ['censustract','agriculturestructureindicator', 'condominiumindicator', 'primaryresidence', 'ratemethod', 'basementenclosurecrawlspacetype', 'floodzone', 'reportedzipcode', 'smallbusinessindicatorbuilding','obstructiontype', 'occupancytype']
for var in var_list:
  claims.groupBy(var).count().orderBy(var).show()

In [50]:
claims = claims.fillna({'condominiumindicator':'unknown', 'primaryresidence':'unknown', 'ratemethod': 'unknown', 'basementenclosurecrawlspacetype':'0', 'floodzone': 'unknown', 'occupancytype':'1', 'amountpaidonbuildingclaim': '0', 'amountpaidoncontentsclaim': '0'})

In [51]:
var_list = ['censustract', 'agriculturestructureindicator', 'condominiumindicator', 'primaryresidenceindicator', 'ratemethod', 'basementenclosurecrawlspacetype', 'floodzone', 'reportedzipcode', 'smallbusinessindicatorbuilding','obstructiontype', 'occupancytype']
for var in var_list:
  policies.groupBy(var).count().orderBy(var).show()

In [52]:
policies = policies.fillna({'condominiumindicator':'unknown', 'primaryresidenceindicator':'unknown', 'ratemethod': 'unknown', 'basementenclosurecrawlspacetype':'0', 'floodzone': 'unknown', 'occupancytype':'1'})

In [53]:
var_list = ['censustract', 'agriculturestructureindicator', 'condominiumindicator', 'primaryresidenceindicator', 'ratemethod', 'basementenclosurecrawlspacetype', 'floodzone', 'reportedzipcode', 'smallbusinessindicatorbuilding','obstructiontype', 'occupancytype']
for var in var_list:
  policies.groupBy(var).count().orderBy(var).show()

In [54]:
claims_agg = claims.groupBy('condominiumindicator', 'primaryresidence', 'ratemethod', 'basementenclosurecrawlspacetype', 'floodzone', 'occupancytype').agg(sum("policycount"), count("*"), sum("amountpaidonbuildingclaim"), sum("amountpaidoncontentsclaim") )
policy_agg = policies.groupBy('condominiumindicator', 'primaryresidenceindicator', 'ratemethod',  'basementenclosurecrawlspacetype', 'floodzone', 'occupancytype').agg(sum("policycount"), count("*"))

In [55]:
policy_agg.describe().show()

In [56]:
policy_agg.show()

In [57]:
claims_agg.show()

In [58]:
policy_agg.columns

In [59]:
policy_agg = policy_agg.withColumnRenamed("primaryresidenceindicator", "primaryresidence")
policy_agg = policy_agg.withColumnRenamed("sum(policycount)", "policycount")
policy_agg = policy_agg.withColumnRenamed("count(1)", "policyindex")
policy_agg.columns

In [60]:
claims_agg.columns

In [61]:
claims_agg = claims_agg.withColumnRenamed("sum(amountpaidonbuildingclaim)", "amountpaidonbuildingclaim")
claims_agg = claims_agg.withColumnRenamed("sum(amountpaidoncontentsclaim)", "amountpaidoncontentsclaim")
claims_agg = claims_agg.withColumnRenamed("sum(policycount)", "claimscount")
claims_agg = claims_agg.withColumnRenamed("count(1)", "claimsindex")
claims_agg.columns

In [62]:
for col in claims_agg.columns:
  col_null_count = claims_agg.filter(claims_agg[col].isNull()).count()
  print(col)
  print(col_null_count)

In [63]:
for pol in policy_agg.columns:
  pol_null_count = policy_agg.filter(policy_agg[pol].isNull()).count()
  print(pol)
  print(pol_null_count)

In [64]:
new_df = policy_agg.join(claims_agg, on=['condominiumindicator', 'primaryresidence', 'ratemethod', 'basementenclosurecrawlspacetype', 'floodzone', 'occupancytype' ], how='inner')


In [65]:
new_df.show()

In [66]:
print((new_df.count(), len(new_df.columns)))


In [67]:
new_df.columns

To build out the loss cost model we will combine two models - severity and frequency model

loss amount/ premium = [loss amount/claims count] * [claims count/premium]

loss cost = severity * frequency

In [70]:
from collections import defaultdict

data_types = defaultdict(list)
for entry in new_df.schema.fields:
    data_types[str(entry.dataType)].append(entry.name)

strings = data_types["StringType"]
print(strings)

In [71]:
new_df = new_df.withColumn("claims_ratio", (new_df["amountpaidoncontentsclaim"] + new_df["amountpaidonbuildingclaim"])/new_df["claimscount"])
new_df = new_df.withColumn("inverse_claims_count", new_df["claimscount"]/new_df["policycount"])
new_df = new_df.withColumn("claims_amt", new_df["amountpaidoncontentsclaim"] + new_df["amountpaidonbuildingclaim"])

In [72]:
new_df.select('claims_ratio', 'inverse_claims_count', 'claims_amt', 'claimscount').show()

In [73]:
response_vars = [ 'claimscount', 'claims_amt' ]

variable_list_emblem = ['condominiumindicator',
 'primaryresidence',
 'ratemethod',
 'basementenclosurecrawlspacetype',
 'floodzone',
 'occupancytype']

In [74]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

strings = [var for var in variable_list_emblem if var in data_types["StringType"]]
print(strings)

In [75]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

strings = [var for var in variable_list_emblem if var in data_types["StringType"]]
stage_string = [StringIndexer(inputCol= c, outputCol= c+"_string_encoded") for c in strings]
stage_one_hot = [OneHotEncoder(inputCol= c+"_string_encoded", outputCol= c+ "_one_hot") for c in strings]

ppl = Pipeline(stages= stage_string + stage_one_hot)
new_df = ppl.fit(new_df).transform(new_df)

In [76]:
new_df.columns

In [77]:
categoricals = [var for var in new_df.columns if var.endswith("_one_hot")]
vector_assembler = VectorAssembler(inputCols= categoricals, outputCol= "features")
new_df = vector_assembler.transform(new_df)

training_set, test_set = new_df.randomSplit([0.7, 0.3], seed = 2017)

In [78]:
training_set.show()

In [79]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import GeneralizedLinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [80]:
frequency_claims =  GeneralizedLinearRegression(family="poisson",
                                           link="log",
                                           maxIter=10,
                                           fitIntercept = True,
                                           labelCol = "claimscount",
                                           weightCol = "claims_amt",
                                           regParam=0.3)

frequency_claims.setPredictionCol("prediction_claims_frequency")

In [81]:
severity_claims =  GeneralizedLinearRegression(family="gamma",
                                           link="log",
                                           maxIter=10,
                                           labelCol = "claims_ratio",
                                           weightCol = "inverse_claims_count",
                                           fitIntercept = True,
                                           regParam=0.3)

severity_claims.setPredictionCol("prediction_severity_claims")

In [82]:
para_grid = ParamGridBuilder()\
           .addGrid(frequency_claims.regParam, [0.1, 0.3, 0.5, 0.7, 0.9])\
           .build()

evaluator = RegressionEvaluator(labelCol="claimscount",
                                predictionCol="prediction_claims_frequency",
                                metricName="rmse")
cross_val = CrossValidator(estimator = frequency_claims,
                           estimatorParamMaps= para_grid,
                           evaluator = evaluator)

model_frequency = cross_val.fit(training_set)

In [83]:
training_set_sev =  training_set.where((training_set["claims_amt"] > 0))

para_grid = ParamGridBuilder()\
           .addGrid(severity_claims.regParam, [0.1, 0.3, 0.5, 0.7, 0.9])\
           .build()

evaluator = RegressionEvaluator(labelCol="claims_ratio",
                                predictionCol="prediction_severity_claims",
                                metricName="rmse")
cross_val = CrossValidator(estimator = severity_claims,
                           estimatorParamMaps= para_grid,
                           evaluator = evaluator)

model_severity = cross_val.fit(training_set_sev)


Here we will look at how our models perform on the test set and combine the predictions to get our loss cost

In [85]:
df_predictions = model_frequency.transform(test_set)
df_predictions = model_severity.transform(df_predictions)


In [86]:
df_predictions = df_predictions.withColumn("target", df_predictions["claims_amt"] )
df_predictions = df_predictions.withColumn("prediction", df_predictions["prediction_severity_claims"]*df_predictions["prediction_claims_frequency"])


In [87]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def shut_down_device(r):
    if r < 0.1:
        return 0.0
    else:
        return r

udf_sd = udf(shut_down_device, DoubleType())

df_predictions = df_predictions.withColumn("truncated_count", udf_sd(df_predictions["prediction_claims_frequency"]))
df_predictions = df_predictions.withColumn("prediction",  df_predictions["prediction_severity_claims"]*df_predictions["truncated_count"])


In [88]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="target", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(df_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

from sklearn.metrics import r2_score
prediction_to_pandas = df_predictions.select(["prediction","target"]).toPandas()
r2 = r2_score(prediction_to_pandas.prediction, prediction_to_pandas.target)
print("R-square on test data = %g" % r2)

In [89]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [90]:
display(df_predictions.select('target', 'prediction'))

target,prediction
3900.96,126092104.9523314
79078.28,45763863.76910629
0.0,15431254.89728202
227377.39,54862540.26528091
137000.0,97317635.37908795
0.0,12307066.565927768
510860.44,62390063.680086285
1935562.04,35522211.10992401
0.0,21346027.51054956
21428.36,33368001.60905544


In [91]:
display(df_predictions.select('target'))

target
3900.96
79078.28
0.0
227377.39
137000.0
0.0
510860.44
1935562.04
0.0
21428.36


In [92]:
display(df_predictions.select('prediction'))

prediction
126092104.9523314
45763863.76910629
15431254.89728202
54862540.26528091
97317635.37908795
12307066.565927768
62390063.680086285
35522211.10992401
21346027.51054956
33368001.60905544
