In [0]:
#!pip install chainladder

In [0]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import col, exp, abs, when, lit
from pyspark.sql.types import *
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, CountVectorizer, StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from math import exp
import matplotlib.pyplot as plt
import chainladder as cl
from random import randint

In [0]:
# Creating Spark Object

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark


In [0]:
# Data Pre-Processing 

# Schema Interger
def schema_integer(df):
  cols=df.columns
  for i in cols:
    df=df.withColumn(i,col(i).cast(IntegerType()))
  return df

# Replacing Confidential Values
def replace_confidential_values(df):
  n=df.count()
  columns=df.columns
  merge=False
  df=df.withColumn(columns[0],col(columns[0]).cast(IntegerType()))
  initial_year=df.collect()[0][0]
  max=0

  for i in columns:
    if i!=columns[0]:
      c=df.agg({i:'max'}).collect()[0][0]
      if float(c)>float(max):
        max=int(float(c))
        
  for i in range(0,n):
    year=initial_year
    year=year+i
    c=df.where(df[columns[0]]==year)
    for j in range(0,len(columns)):
      if c.collect()[0][j]=='*':
        change=j-1
        c=c.withColumn(str(change),lit(randint(1,max)))
        merge=True
    if merge==True:
      df=df.filter(df[columns[0]]!=year)
      df=df.union(c)
    merge=False
    
  df=schema_integer(df)
  df=df.orderBy(df[columns[0]].asc())
  df=df.withColumnRenamed(columns[0],'Origin/Dev')
  
  return df

# Remodelling Devlopment Triangle for Chainladder function
def remodel_devtriangle(df):
  n=df.count()
  col=df.columns
  initial_year=df.collect()[0][0]
  df=df.na.fill(0,subset=['0'])
  
  for i in range(0,n):
    year=df.collect()[0][0]
    c=df.where(df['Origin/Dev']==year)
    if c.collect()[0][1]==0 and df.collect()[0][len(col)-1]!=None:
      c=c.fillna(0)
      df=df.filter(df['Origin/Dev']!=year)
      df=df.union(c)
    else:
      break
  df=df.orderBy(df['Origin/Dev'].asc())
  
  return df



In [0]:
# Deterministic Methods

# Cummulative Sum in PySpark
def cum_sum(df):
  col=len(df.columns)-1
  if '0' in df.columns:
    for i in range(0,col):
      if i+1<col:
        df=df.withColumn(str(i+1),df[str(i+1)]+df[str(i)])
  else:
    for i in range(1,col):
      if i+1<=col:
        df=df.withColumn(str(i+1),df[str(i+1)]+df[str(i)])
  return df

def latest_payments(df):
  l=list()
  c=len(df.columns)
  r=df.count()
  check=0
  
  for i in range(0,r):
    check=0
    for j in range(0,c):
      check=df.collect()[i][j]
      if check==None:
        l.append(df.collect()[i][j-1])
    if check!=None:
      l.append(df.collect()[i][c-1])
  
  lp=list()
  
  for i in l:
    if i!=None:
      lp.append(i)
  
  return lp

# F-function using the pyspark dataframe
def age_to_age_ratio_pyspark(df):
  f=[]
  df=df.drop('Origin/Dev')
  n=len(df.columns)
  r=df.count()

  for k in range(0,n-1):
    num1=[]
    num2=[]
    for i in range(0,r-k-1):
      if df.collect()[i][k+1]!=None and df.collect()[i][k]!=None:
        v=df.collect()[i][k+1]
        num1.append(int(v))
        v=df.collect()[i][k]
        num2.append(int(v))
    if k+1<n:
      f.append(sum(num1)/sum(num2))
  f.append(1)
  return(f)

def full_triangle_pyspark(df,f):
  n=df.count()
  col=len(df.columns)
  for i in range(0,n):
    year=int(df.collect()[0][0])
    year=year+i
    c=df.where(df['Origin/Dev']==year)
    merge=0
    for count in range(0,col):
      check=c.collect()[0][count]
      merge=check
      if check==None:
        c=c.withColumn(str(count-1),f[count-2]*c[str(count-2)])
    c=schema_integer(c)
    if merge==None:
      df=df.union(c)
  df=df.dropna()
  df=df.replace(0,None)
  return df

def ultimate_payment(df):
  c=len(df.columns)-1
  r=df.count()
  u=list()
  for i in range(0,r):
    u.append(df.collect()[i][c])
  return u


In [0]:
# Stochastic Method

# Poisson Regression
 
def poisson_regression_data(data):

  newCols=list()

  origin_values=data.select('origin').distinct()
  ov=list()
  for i in range(0,origin_values.count()):
    ov.append(str(origin_values.collect()[i][0]))

  ov=sorted(ov,reverse=False)

  dev_values=data.select('dev').distinct()
  dv=list()
  for i in range(0,dev_values.count()):
    dv.append(str(dev_values.collect()[i][0]))

  dv=sorted(dv,reverse=False)

  for i in ov:
    org=data.select('origin','dev').where(data['origin']==str(i))
    d=org.collect()[org.count()-1][1]
    if d!=dv[-1]:
      newCols.append(i)
  
  for i in dv:
    newCols.append(i)

  newCols.remove(dv[0])

  for i in ov:
    data=data.withColumn(str(i),when(data.origin==str(i),1).otherwise(0))

  for i in dv:
    data=data.withColumn(str(i),when(data.dev==str(i),1).otherwise(0))

  assembler=VectorAssembler(inputCols=newCols,outputCol='features')
  pipeline=Pipeline(stages=[assembler])

  poisson_data=pipeline.fit(data)
  poisson_data=poisson_data.transform(data)

  return poisson_data

def poisson_model(df):
  glm=GeneralizedLinearRegression(labelCol='inc_paid',featuresCol='features',family='poisson',link='log')
  preg=glm.fit(df)
  predictions=preg.transform(df)
  predictions=predictions.select('origin','dev','inc_paid','prediction')
  return preg,predictions


In [0]:
# Model Results

# Evaluation of the model

def model_evaluation(df):
  r2=RegressionEvaluator(labelCol='inc_paid',predictionCol='prediction',metricName='r2')
  r2=r2.evaluate(df)
  print('R-Squared:'+str(r2))

  mse=RegressionEvaluator(labelCol='inc_paid',predictionCol='prediction',metricName='mse')
  mse=mse.evaluate(df)
  print('MSE:'+str(mse))

  mae=RegressionEvaluator(labelCol='inc_paid',predictionCol='prediction',metricName='mae')
  mae=mae.evaluate(df)
  print('MAE:'+str(mae))

# Plotting the results 

def model_plot(df):
  x1,x2,y,z=list(),list(),list(),list()
  for i in range(0,df.count()):
    x1.append(df.select('inc_paid').collect()[i][0])
    x2.append(df.select('prediction').collect()[i][0])  
    y.append(df.select('origin').collect()[i][0])
    z.append(df.select('dev').collect()[i][0])
    
  fig=plt.figure()
  ax=plt.axes(projection ='3d')
  ax.scatter3D(x1,y,z,'green')
  ax.scatter3D(x2,y,z,'red')
  ax.set_title('Actual Vs Predicted')
  fig.set_size_inches(8,8)
  plt.show()


In [0]:
# Insurance Valuation

def case_scenario_generator(reserve,premiums,num_scenarios):
  loss_ratio=reserve/sum(premiums)
  print('The initial loss ratio is '+str(loss_ratio))
  loss,c,p=list(),list(),list()
  new_reserve,new_premiums,new_loss_ratio=0,0,0.00
  for i in range(0,num_scenarios):
    pr_random=randint(0,100)
    cl_random=randint(0,100)
    new_reserve=(cl_random*reserve)+reserve
    new_premiums=(pr_random*sum(premiums))+sum(premiums)
    new_loss_ratio=new_reserve/new_premiums
    loss.append(new_loss_ratio)
    c.append(new_reserve)
    p.append(new_premiums)
    
  return loss,c,p

In [0]:
insurance=spark.read.format('csv').load('dbfs:/FileStore/shared_uploads/srivatsramaswamy10@gmail.com/BigClaimsData.csv',header=True)
insurance.show()

+--------------------+-------+--------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+----------------+--------------+-----+
|Reporting period end|Subject|Category|           Data Item|Class of business group|Class of business category|   Class of business|Reporting year basis|Accident/underwriting year|Development year|         Value|Notes|
+--------------------+-------+--------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+----------------+--------------+-----+
|          31-12-2020|  Gross| Premium|Gross earned prem...|        Direct business|       Short-tail property|Commercial motor ...|       Accident year|                      2020|               0| 2864000000.00| null|
|          31-12-2020|  Gross| Premium|Gross earned prem...|        Direct business|       Short-tail property|Commercial mo

In [0]:
mortgage_trial=spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/srivatsramaswamy10@gmail.com/mortgage_trial.csv")
mortgage_trial.show()

+-----------------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|Underwriting year|   0|    1|    2|    3|    4|    5|    6|    7|    8|    9|   10|
+-----------------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|             2006|null| null| null| null| null| null|25.00|16.00|12.00| 9.00| 8.00|
|             2007|null| null| null| null| null|45.00|30.00|24.00|18.00|17.00|19.00|
|             2008|null| null| null| null|    *|31.00|31.00|27.00|23.00|20.00|21.00|
|             2009|null| null| null|    *|    *|    *|22.00|    *|20.00|17.00|    *|
|             2010|null| null|13.00|    *|    *|12.00|    *|11.00|10.00|    *| 7.00|
|             2011|null|18.00|    *|    *|    *|    *|18.00|16.00|    *|12.00| null|
|             2012|9.00|18.00|11.00|    *|40.00|28.00|25.00|    *|19.00| null| null|
|             2013|9.00| 9.00|15.00|    *|30.00|29.00|    *|24.00| null| null| null|
|             2014|7.00| 8.00|18.00|18.00|23.00|    *|26.00| null

In [0]:
mortgage_trial=replace_confidential_values(mortgage_trial)
mortgage_trial.show()

+----------+----+----+----+----+----+----+----+----+----+----+----+
|Origin/Dev|   0|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+----------+----+----+----+----+----+----+----+----+----+----+----+
|      2006|null|null|null|null|null|null|  25|  16|  12|   9|   8|
|      2007|null|null|null|null|null|  45|  30|  24|  18|  17|  19|
|      2008|null|null|null|null|  28|  31|  31|  27|  23|  20|  21|
|      2009|null|null|null|  22|  17|  21|  22|  15|  20|  17|  38|
|      2010|null|null|  13|  15|  23|  12|  25|  11|  10|  19|   7|
|      2011|null|  18|  18|  38|  27|  14|  18|  16|   5|  12|null|
|      2012|   9|  18|  11|  14|  40|  28|  25|  25|  19|null|null|
|      2013|   9|   9|  15|   4|  30|  29|  32|  24|null|null|null|
|      2014|   7|   8|  18|  18|  23|  26|  26|null|null|null|null|
|      2015|   2|   7|  11|  15|  19|  19|null|null|null|null|null|
|      2016|   3|   8|   9|  11|  13|null|null|null|null|null|null|
|      2017|   3|   6|   9|  11|null|null|null|n

In [0]:
f=age_to_age_ratio_pyspark(mortgage_trial)
print(f)

[1.7317073170731707, 1.2048192771084338, 1.2115384615384615, 1.4014598540145986, 0.8695652173913043, 1.0145631067961165, 0.7596153846153846, 0.7985074626865671, 1.0681818181818181, 1.1341463414634145, 1]


In [0]:
mortgage_trial=remodel_devtriangle(mortgage_trial)
mortgage_trial.show()

+----------+---+----+----+----+----+----+----+----+----+----+----+
|Origin/Dev|  0|   1|   2|   3|   4|   5|   6|   7|   8|   9|  10|
+----------+---+----+----+----+----+----+----+----+----+----+----+
|      2006|  0|   0|   0|   0|   0|   0|  25|  16|  12|   9|   8|
|      2007|  0|   0|   0|   0|   0|  45|  30|  24|  18|  17|  19|
|      2008|  0|   0|   0|   0|  28|  31|  31|  27|  23|  20|  21|
|      2009|  0|   0|   0|  22|  17|  21|  22|  15|  20|  17|  38|
|      2010|  0|   0|  13|  15|  23|  12|  25|  11|  10|  19|   7|
|      2011|  0|  18|  18|  38|  27|  14|  18|  16|   5|  12|null|
|      2012|  9|  18|  11|  14|  40|  28|  25|  25|  19|null|null|
|      2013|  9|   9|  15|   4|  30|  29|  32|  24|null|null|null|
|      2014|  7|   8|  18|  18|  23|  26|  26|null|null|null|null|
|      2015|  2|   7|  11|  15|  19|  19|null|null|null|null|null|
|      2016|  3|   8|   9|  11|  13|null|null|null|null|null|null|
|      2017|  3|   6|   9|  11|null|null|null|null|null|null|n