In [0]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [0]:
spark = SparkSession.builder.appName('claim_count').getOrCreate()

In [0]:
 
  df = spark.read.csv("/FileStore/tables/ctp_data.csv",header=True, inferSchema= True)

In [0]:
df.show()

+--------------------+-------+--------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+----------------+-----------+-----+
|Reporting period end|Subject|Category|           Data Item|Class of business group|Class of business category|   Class of business|Reporting year basis|Accident/underwriting year|Development year|      Value|Notes|
+--------------------+-------+--------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+----------------+-----------+-----+
|          12/31/2020|  Gross| Premium|Gross earned prem...|        Direct business|       Short-tail property|Commercial motor ...|       Accident year|                      2020|               0| 2864000000| null|
|          12/31/2020|  Gross| Premium|Gross earned prem...|        Direct business|       Short-tail property|Commercial motor ...|    

In [0]:
from pyspark.sql.functions import col

li=["Gross claim payments by class of business" , "Gross case estimates by class of business"]
df1 = df.filter(col("Data Item").isin(li) & (col("Class of business") == "Compulsory third party (CTP) motor vehicle"))

In [0]:
df1.show(300)

+--------------------+-------+--------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+----------------+----------+--------------------+
|Reporting period end|Subject|            Category|           Data Item|Class of business group|Class of business category|   Class of business|Reporting year basis|Accident/underwriting year|Development year|     Value|               Notes|
+--------------------+-------+--------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+----------------+----------+--------------------+
|          12/31/2020|  Gross|Ultimate cost com...|Gross case estima...|        Direct business|                 Long-tail|Compulsory third ...|       Accident year|                      2020|               0| 975000000|                null|
|          12/31/2020|  Gross|Ul

In [0]:
print((df1.count(), len(df1.columns)))

(207, 12)


In [0]:
df2= df1.select(df1["Accident/underwriting year"],df1["Development year"], df1["Value"])

In [0]:
df2.show(500)

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|     Value|
+--------------------------+----------------+----------+
|                      2020|               0| 975000000|
|                      2019|               1|1328000000|
|                      2018|               2|1403000000|
|                      2017|               3|1179000000|
|                      2016|               4| 690000000|
|                      2015|               5| 314000000|
|                      2014|               6| 225000000|
|                      2013|               7| 117000000|
|                      2012|               8|  86000000|
|                      2011|               9|  64000000|
|                      2010|              10|  54000000|
|                      null|              11| 292000000|
|                      2020|               0| 102000000|
|                      2019|               1| 336000000|
|                      2018|   

In [0]:
from pyspark.sql.types import LongType
df3 = df2.withColumn("Value", df2["Value"].cast(LongType()))

In [0]:
df3.show(100)

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|     Value|
+--------------------------+----------------+----------+
|                      2020|               0| 975000000|
|                      2019|               1|1328000000|
|                      2018|               2|1403000000|
|                      2017|               3|1179000000|
|                      2016|               4| 690000000|
|                      2015|               5| 314000000|
|                      2014|               6| 225000000|
|                      2013|               7| 117000000|
|                      2012|               8|  86000000|
|                      2011|               9|  64000000|
|                      2010|              10|  54000000|
|                      null|              11| 292000000|
|                      2020|               0| 102000000|
|                      2019|               1| 336000000|
|                      2018|   

In [0]:
df3= df3.dropna()
df3.show(300)

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|     Value|
+--------------------------+----------------+----------+
|                      2020|               0| 975000000|
|                      2019|               1|1328000000|
|                      2018|               2|1403000000|
|                      2017|               3|1179000000|
|                      2016|               4| 690000000|
|                      2015|               5| 314000000|
|                      2014|               6| 225000000|
|                      2013|               7| 117000000|
|                      2012|               8|  86000000|
|                      2011|               9|  64000000|
|                      2010|              10|  54000000|
|                      2020|               0| 102000000|
|                      2019|               1| 336000000|
|                      2018|               2| 757000000|
|                      2017|   

In [0]:
print((df3.count(), len(df3.columns)))

(198, 3)


In [0]:
df3= df3.groupBy("Accident/underwriting year","Development year") \
    .sum("Value")

In [0]:
print((df3.count(), len(df3.columns)))

(99, 3)


In [0]:
df3.show(100)

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|sum(Value)|
+--------------------------+----------------+----------+
|                      2012|               0|1160000000|
|                      2007|               6|1695000000|
|                      2010|               7|1852000000|
|                      2015|               2|2202000000|
|                      2017|               3|2674000000|
|                      2008|               8|1763000000|
|                      2014|               4|2216000000|
|                      2005|              10|1632000000|
|                      2014|               0|1442000000|
|                      2004|               8|1583000000|
|                      2012|               8|2101000000|
|                      2013|               2|2136000000|
|                      2012|               4|2125000000|
|                      2007|              10|1685000000|
|                      2004|   

In [0]:
initial_df = df3.withColumn("Development year",
					df3["Accident/underwriting year"] + df3["Development year"])
initial_df.show()

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|sum(Value)|
+--------------------------+----------------+----------+
|                      2012|            2012|1160000000|
|                      2007|            2013|1695000000|
|                      2010|            2017|1852000000|
|                      2015|            2017|2202000000|
|                      2017|            2020|2674000000|
|                      2008|            2016|1763000000|
|                      2014|            2018|2216000000|
|                      2005|            2015|1632000000|
|                      2014|            2014|1442000000|
|                      2004|            2012|1583000000|
|                      2012|            2020|2101000000|
|                      2013|            2015|2136000000|
|                      2012|            2016|2125000000|
|                      2007|            2017|1685000000|
|                      2004|   

In [0]:
initial_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/initial_df.csv")

input_path = "dbfs:/FileStore/df/initial_df.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,sum(Value)
2012,2012,1160000000
2007,2013,1695000000
2010,2017,1852000000
2015,2017,2202000000
2017,2020,2674000000
2008,2016,1763000000
2014,2018,2216000000
2005,2015,1632000000
2014,2014,1442000000
2004,2012,1583000000


In [0]:
premium_df = df.filter((col("Data Item") == "Gross earned premium by class of business") & (col("Class of business") == "Compulsory third party (CTP) motor vehicle"))
premium_df = premium_df.select(premium_df["Accident/underwriting year"],premium_df["Development year"], premium_df["Value"])

premium_df.show()


+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|     Value|
+--------------------------+----------------+----------+
|                      2020|               0|3367000000|
|                      2019|               1|3578000000|
|                      2018|               2|3768000000|
|                      2017|               3|4134000000|
|                      2016|               4|3768000000|
|                      2015|               5|3554000000|
|                      2014|               6|3468000000|
|                      2013|               7|3210000000|
|                      2012|               8|2931000000|
|                      2011|               9|2792000000|
|                      2010|              10|2597000000|
|                      2009|              10|2287000000|
|                      2008|              10|2106000000|
|                      2007|              10|2288000000|
|                      2006|   

In [0]:
premium_df=premium_df.orderBy(col("Accident/underwriting year").asc())
premium_df.show()

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|     Value|
+--------------------------+----------------+----------+
|                      2002|              10|2127000000|
|                      2003|              10|2245000000|
|                      2004|              10|2420000000|
|                      2005|              10|2441000000|
|                      2006|              10|2359000000|
|                      2007|              10|2288000000|
|                      2008|              10|2106000000|
|                      2009|              10|2287000000|
|                      2010|              10|2597000000|
|                      2011|               9|2792000000|
|                      2012|               8|2931000000|
|                      2013|               7|3210000000|
|                      2014|               6|3468000000|
|                      2015|               5|3554000000|
|                      2016|   

In [0]:
premium_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/premium_df1.csv")

input_path = "dbfs:/FileStore/df/premium_df1.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,Value
2002,10,2127000000
2003,10,2245000000
2004,10,2420000000
2005,10,2441000000
2006,10,2359000000
2007,10,2288000000
2008,10,2106000000
2009,10,2287000000
2010,10,2597000000
2011,9,2792000000


In [0]:
df4 =  df.filter((col("Data Item") == "Number of claims reported by class of business") & (col("Class of business") == "Compulsory third party (CTP) motor vehicle"))
df4= df4.select(df4["Accident/underwriting year"],df4["Development year"], df4["Value"])
df4.show(100)


+--------------------------+----------------+-----+
|Accident/underwriting year|Development year|Value|
+--------------------------+----------------+-----+
|                      2020|               0|17000|
|                      2019|               1|25000|
|                      2018|               2|27000|
|                      2017|               3|27000|
|                      2016|               4|26000|
|                      2015|               5|24000|
|                      2014|               6|22000|
|                      2013|               7|22000|
|                      2012|               8|23000|
|                      2011|               9|20000|
|                      2010|              10|19000|
|                      2019|               0|21000|
|                      2018|               1|27000|
|                      2017|               2|29000|
|                      2016|               3|27000|
|                      2015|               4|25000|
|           

In [0]:
print((df4.count(), len(df4.columns)))

(99, 3)


In [0]:
df6=df3.orderBy(col("Accident/underwriting year").asc(),col("Development year").asc())

In [0]:
df7= df4.orderBy(col("Accident/underwriting year").asc(),col("Development year").asc())

In [0]:
df6.show(100)
df7.show(100)

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|sum(Value)|
+--------------------------+----------------+----------+
|                      2002|              10|1564000000|
|                      2003|               9|1408000000|
|                      2003|              10|1399000000|
|                      2004|               8|1583000000|
|                      2004|               9|1578000000|
|                      2004|              10|1591000000|
|                      2005|               7|1622000000|
|                      2005|               8|1611000000|
|                      2005|               9|1611000000|
|                      2005|              10|1632000000|
|                      2006|               6|1695000000|
|                      2006|               7|1692000000|
|                      2006|               8|1688000000|
|                      2006|               9|1688000000|
|                      2006|   

In [0]:
from pyspark.sql import functions as F


In [0]:
df6= df6.withColumn('row_id',F.monotonically_increasing_id())
df6.show()

+--------------------------+----------------+----------+------+
|Accident/underwriting year|Development year|sum(Value)|row_id|
+--------------------------+----------------+----------+------+
|                      2002|              10|1564000000|     0|
|                      2003|               9|1408000000|     1|
|                      2003|              10|1399000000|     2|
|                      2004|               8|1583000000|     3|
|                      2004|               9|1578000000|     4|
|                      2004|              10|1591000000|     5|
|                      2005|               7|1622000000|     6|
|                      2005|               8|1611000000|     7|
|                      2005|               9|1611000000|     8|
|                      2005|              10|1632000000|     9|
|                      2006|               6|1695000000|    10|
|                      2006|               7|1692000000|    11|
|                      2006|            

In [0]:
df7=df7.withColumn('row_id',F.monotonically_increasing_id())
df7.show()

+----+---+-----+------+
|  AY| DY|Value|row_id|
+----+---+-----+------+
|2002| 10|23000|     0|
|2003|  9|21000|     1|
|2003| 10|21000|     2|
|2004|  8|20000|     3|
|2004|  9|20000|     4|
|2004| 10|20000|     5|
|2005|  7|19000|     6|
|2005|  8|19000|     7|
|2005|  9|19000|     8|
|2005| 10|19000|     9|
|2006|  6|18000|    10|
|2006|  7|18000|    11|
|2006|  8|18000|    12|
|2006|  9|18000|    13|
|2006| 10|18000|    14|
|2007|  5|18000|    15|
|2007|  6|18000|    16|
|2007|  7|18000|    17|
|2007|  8|18000|    18|
|2007|  9|18000|    19|
+----+---+-----+------+
only showing top 20 rows



In [0]:
df7= df7.withColumnRenamed("Accident/underwriting year", "AY")
df7= df7.withColumnRenamed("Development year", "DY")
df7= df7.withColumnRenamed("row_id", "id")
df7.show()


+----+---+-----+---+
|  AY| DY|Value| id|
+----+---+-----+---+
|2002| 10|23000|  0|
|2003|  9|21000|  1|
|2003| 10|21000|  2|
|2004|  8|20000|  3|
|2004|  9|20000|  4|
|2004| 10|20000|  5|
|2005|  7|19000|  6|
|2005|  8|19000|  7|
|2005|  9|19000|  8|
|2005| 10|19000|  9|
|2006|  6|18000| 10|
|2006|  7|18000| 11|
|2006|  8|18000| 12|
|2006|  9|18000| 13|
|2006| 10|18000| 14|
|2007|  5|18000| 15|
|2007|  6|18000| 16|
|2007|  7|18000| 17|
|2007|  8|18000| 18|
|2007|  9|18000| 19|
+----+---+-----+---+
only showing top 20 rows



In [0]:
df8= df6.join(df7,df6["row_id"] ==  df7["id"],"left")
df8.show()
    

+--------------------------+----------------+----------+------+----+---+-----+---+
|Accident/underwriting year|Development year|sum(Value)|row_id|  AY| DY|Value| id|
+--------------------------+----------------+----------+------+----+---+-----+---+
|                      2002|              10|1564000000|     0|2002| 10|23000|  0|
|                      2003|               9|1408000000|     1|2003|  9|21000|  1|
|                      2003|              10|1399000000|     2|2003| 10|21000|  2|
|                      2004|               8|1583000000|     3|2004|  8|20000|  3|
|                      2004|               9|1578000000|     4|2004|  9|20000|  4|
|                      2004|              10|1591000000|     5|2004| 10|20000|  5|
|                      2005|               7|1622000000|     6|2005|  7|19000|  6|
|                      2005|               8|1611000000|     7|2005|  8|19000|  7|
|                      2005|               9|1611000000|     8|2005|  9|19000|  8|
|   

In [0]:
df9 = df8.withColumn('sum(Value)', df8["sum(Value)"]/df8["Value"])
df9.show()


+--------------------------+----------------+-----------------+------+----+---+-----+---+
|Accident/underwriting year|Development year|       sum(Value)|row_id|  AY| DY|Value| id|
+--------------------------+----------------+-----------------+------+----+---+-----+---+
|                      2002|              10|          68000.0|     0|2002| 10|23000|  0|
|                      2003|               9|67047.61904761905|     1|2003|  9|21000|  1|
|                      2003|              10|66619.04761904762|     2|2003| 10|21000|  2|
|                      2004|               8|          79150.0|     3|2004|  8|20000|  3|
|                      2004|               9|          78900.0|     4|2004|  9|20000|  4|
|                      2004|              10|          79550.0|     5|2004| 10|20000|  5|
|                      2005|               7|85368.42105263157|     6|2005|  7|19000|  6|
|                      2005|               8|84789.47368421052|     7|2005|  8|19000|  7|
|         

In [0]:
df9.show(110)

+--------------------------+----------------+------------------+------+----+---+-----+---+
|Accident/underwriting year|Development year|        sum(Value)|row_id|  AY| DY|Value| id|
+--------------------------+----------------+------------------+------+----+---+-----+---+
|                      2002|              10|           68000.0|     0|2002| 10|23000|  0|
|                      2003|               9| 67047.61904761905|     1|2003|  9|21000|  1|
|                      2003|              10| 66619.04761904762|     2|2003| 10|21000|  2|
|                      2004|               8|           79150.0|     3|2004|  8|20000|  3|
|                      2004|               9|           78900.0|     4|2004|  9|20000|  4|
|                      2004|              10|           79550.0|     5|2004| 10|20000|  5|
|                      2005|               7| 85368.42105263157|     6|2005|  7|19000|  6|
|                      2005|               8| 84789.47368421052|     7|2005|  8|19000|  7|

In [0]:
df10= df9.select("Accident/underwriting year","Development year","sum(Value)","Value")
df10.show(100)

+--------------------------+----------------+------------------+-----+
|Accident/underwriting year|Development year|        sum(Value)|Value|
+--------------------------+----------------+------------------+-----+
|                      2002|              10|           68000.0|23000|
|                      2003|               9| 67047.61904761905|21000|
|                      2003|              10| 66619.04761904762|21000|
|                      2004|               8|           79150.0|20000|
|                      2004|               9|           78900.0|20000|
|                      2004|              10|           79550.0|20000|
|                      2005|               7| 85368.42105263157|19000|
|                      2005|               8| 84789.47368421052|19000|
|                      2005|               9| 84789.47368421052|19000|
|                      2005|              10| 85894.73684210527|19000|
|                      2006|               6| 94166.66666666667|18000|
|     

In [0]:
bootstrapped_df =  df10.sample(withReplacement = True, fraction =100000.0)
bootstrapped_df.show()

+--------------------------+----------------+----------+-----+
|Accident/underwriting year|Development year|sum(Value)|Value|
+--------------------------+----------------+----------+-----+
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0|23000|
|                      2002|              10|   68000.0

In [0]:
df11 = bootstrapped_df.withColumn('claim_cost', bootstrapped_df["sum(Value)"]*bootstrapped_df["Value"])
df11.show()

+--------------------------+----------------+----------+-----+----------+
|Accident/underwriting year|Development year|sum(Value)|Value|claim_cost|
+--------------------------+----------------+----------+-----+----------+
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|              10|   68000.0|23000|   1.564E9|
|                      2002|          

In [0]:
from pyspark.sql.types import LongType
df11 = df11.withColumn("claim_cost", df11["claim_cost"].cast(LongType()))
df11.show()

+--------------------------+----------------+----------+-----+----------+
|Accident/underwriting year|Development year|sum(Value)|Value|claim_cost|
+--------------------------+----------------+----------+-----+----------+
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|              10|   68000.0|23000|1564000000|
|                      2002|          

In [0]:
claim_cost_df= df11.groupBy("Accident/underwriting year","Development year").sum("claim_cost")
claim_cost_df.show(100)
    

    

+--------------------------+----------------+---------------+
|Accident/underwriting year|Development year|sum(claim_cost)|
+--------------------------+----------------+---------------+
|                      2002|              10|156784744000000|
|                      2003|               9|140593024000000|
|                      2003|              10|139748908000000|
|                      2004|               8|157821934000000|
|                      2004|               9|156913164000000|
|                      2004|              10|158411097000000|
|                      2005|               7|162240550000000|
|                      2005|               8|161396423899816|
|                      2005|               9|160544204900345|
|                      2005|              10|163147776000000|
|                      2006|               6|169586445000000|
|                      2006|               7|168687324000000|
|                      2006|               8|168013392000000|
|       

In [0]:
claim_cost_df = claim_cost_df.withColumn("Development year",
					claim_cost_df["Accident/underwriting year"] + claim_cost_df["Development year"])
claim_cost_df.show()



+--------------------------+----------------+---------------+
|Accident/underwriting year|Development year|sum(claim_cost)|
+--------------------------+----------------+---------------+
|                      2002|            2012|156784744000000|
|                      2003|            2012|140593024000000|
|                      2003|            2013|139748908000000|
|                      2004|            2012|157821934000000|
|                      2004|            2013|156913164000000|
|                      2004|            2014|158411097000000|
|                      2005|            2012|162240550000000|
|                      2005|            2013|161396423899816|
|                      2005|            2014|160544204900345|
|                      2005|            2015|163147776000000|
|                      2006|            2012|169586445000000|
|                      2006|            2013|168687324000000|
|                      2006|            2014|168013392000000|
|       

In [0]:
claim_cost_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/ctp_claim_cost_df.csv")

input_path = "dbfs:/FileStore/df/ctp_claim_cost_df.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,sum(claim_cost)
2002,2012,156784744000000
2003,2012,140593024000000
2003,2013,139748908000000
2004,2012,157821934000000
2004,2013,156913164000000
2004,2014,158411097000000
2005,2012,162240550000000
2005,2013,161396423899816
2005,2014,160544204900345
2005,2015,163147776000000


In [0]:
# claim count data

claim_count_data= df11.select("Accident/underwriting year","Development year","Value")
claim_count_data.show()


+--------------------------+----------------+-----+
|Accident/underwriting year|Development year|Value|
+--------------------------+----------------+-----+
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|                      2002|              10|23000|
|           

In [0]:
claim_count_data= claim_count_data.withColumn("Value", claim_count_data["Value"].cast(LongType()))

claim_count_data= claim_count_data.groupBy("Accident/underwriting year","Development year").sum("Value")
claim_count_data.show()

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|sum(Value)|
+--------------------------+----------------+----------+
|                      2002|              10|2305658000|
|                      2003|               9|2096913000|
|                      2003|              10|2097732000|
|                      2004|               8|1993960000|
|                      2004|               9|1988760000|
|                      2004|              10|1991340000|
|                      2005|               7|1900475000|
|                      2005|               8|1903496000|
|                      2005|               9|1893445000|
|                      2005|              10|1899392000|
|                      2006|               6|1800918000|
|                      2006|               7|1794546000|
|                      2006|               8|1791612000|
|                      2006|               9|1802232000|
|                      2006|   

In [0]:
claim_count_data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/ctp_claim_count_df.csv")

input_path = "dbfs:/FileStore/df/ctp_claim_count_df.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,sum(Value)
2002,10,2305658000
2003,9,2096913000
2003,10,2097732000
2004,8,1993960000
2004,9,1988760000
2004,10,1991340000
2005,7,1900475000
2005,8,1903496000
2005,9,1893445000
2005,10,1899392000


In [0]:
claim_count_data = claim_count_data.withColumn("Development year",
					claim_count_data["Accident/underwriting year"] + claim_count_data["Development year"])
claim_count_data.show()

+--------------------------+----------------+----------+
|Accident/underwriting year|Development year|sum(Value)|
+--------------------------+----------------+----------+
|                      2002|            2012|2305658000|
|                      2003|            2012|2096913000|
|                      2003|            2013|2097732000|
|                      2004|            2012|1993960000|
|                      2004|            2013|1988760000|
|                      2004|            2014|1991340000|
|                      2005|            2012|1900475000|
|                      2005|            2013|1903496000|
|                      2005|            2014|1893445000|
|                      2005|            2015|1899392000|
|                      2006|            2012|1800918000|
|                      2006|            2013|1794546000|
|                      2006|            2014|1791612000|
|                      2006|            2015|1802232000|
|                      2006|   

In [0]:
claim_count_data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/ctp_claim_count_df1.csv")

input_path = "dbfs:/FileStore/df/ctp_claim_count_df1.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,sum(Value)
2002,2012,2305658000
2003,2012,2096913000
2003,2013,2097732000
2004,2012,1993960000
2004,2013,1988760000
2004,2014,1991340000
2005,2012,1900475000
2005,2013,1903496000
2005,2014,1893445000
2005,2015,1899392000


In [0]:
print((claim_count_data.count(), len(claim_count_data.columns)))

(99, 3)


In [0]:
print((claim_cost_df.count(), len(claim_cost_df.columns)))

(99, 3)


In [0]:
df12=claim_cost_df.withColumn('row_id',F.monotonically_increasing_id())
df13=claim_count_data.withColumn('row_id',F.monotonically_increasing_id())



In [0]:
df12.show()
df13.show()

+--------------------------+----------------+---------------+------+
|Accident/underwriting year|Development year|sum(claim_cost)|row_id|
+--------------------------+----------------+---------------+------+
|                      2002|            2012|156784744000000|     0|
|                      2003|            2012|140593024000000|     1|
|                      2003|            2013|139748908000000|     2|
|                      2004|            2012|157821934000000|     3|
|                      2004|            2013|156913164000000|     4|
|                      2004|            2014|158411097000000|     5|
|                      2005|            2012|162240550000000|     6|
|                      2005|            2013|161396423899816|     7|
|                      2005|            2014|160544204900345|     8|
|                      2005|            2015|163147776000000|     9|
|                      2006|            2012|169586445000000|    10|
|                      2006|      

In [0]:
df13= df13.withColumnRenamed("Accident/underwriting year", "AY")
df13= df13.withColumnRenamed("Development year", "DY")
df13= df13.withColumnRenamed("row_id", "id")
df13.show()

+----+----+----------+---+
|  AY|  DY|sum(Value)| id|
+----+----+----------+---+
|2002|2012|2305658000|  0|
|2003|2012|2096913000|  1|
|2003|2013|2097732000|  2|
|2004|2012|1993960000|  3|
|2004|2013|1988760000|  4|
|2004|2014|1991340000|  5|
|2005|2012|1900475000|  6|
|2005|2013|1903496000|  7|
|2005|2014|1893445000|  8|
|2005|2015|1899392000|  9|
|2006|2012|1800918000| 10|
|2006|2013|1794546000| 11|
|2006|2014|1791612000| 12|
|2006|2015|1802232000| 13|
|2006|2016|1795698000| 14|
|2007|2012|1795122000| 15|
|2007|2013|1805526000| 16|
|2007|2014|1800504000| 17|
|2007|2015|1799910000| 18|
|2007|2016|1812312000| 19|
+----+----+----------+---+
only showing top 20 rows



In [0]:
df14= df12.join(df13,df12["row_id"] ==  df13["id"],"left")
df14.show()

+--------------------------+----------------+---------------+------+----+----+----------+---+
|Accident/underwriting year|Development year|sum(claim_cost)|row_id|  AY|  DY|sum(Value)| id|
+--------------------------+----------------+---------------+------+----+----+----------+---+
|                      2007|            2016|170760064000000|    19|2007|2016|1812312000| 19|
|                      2002|            2012|156784744000000|     0|2002|2012|2305658000|  0|
|                      2005|            2013|161396423899816|     7|2005|2013|1903496000|  7|
|                      2005|            2012|162240550000000|     6|2005|2012|1900475000|  6|
|                      2005|            2015|163147776000000|     9|2005|2015|1899392000|  9|
|                      2007|            2014|171948132000000|    17|2007|2014|1800504000| 17|
|                      2004|            2014|158411097000000|     5|2004|2014|1991340000|  5|
|                      2003|            2012|140593024000000

In [0]:
df14 = df14.withColumn('sum(claim_cost)', df14["sum(claim_cost)"]/df14["sum(Value)"])
df14.show()


+--------------------------+----------------+-----------------+------+----+----+----------+---+
|Accident/underwriting year|Development year|  sum(claim_cost)|row_id|  AY|  DY|sum(Value)| id|
+--------------------------+----------------+-----------------+------+----+----+----------+---+
|                      2007|            2016|94222.22222222222|    19|2007|2016|1812312000| 19|
|                      2002|            2012|          68000.0|     0|2002|2012|2305658000|  0|
|                      2005|            2013|84789.47363157895|     7|2005|2013|1903496000|  7|
|                      2005|            2012|85368.42105263157|     6|2005|2012|1900475000|  6|
|                      2005|            2015|85894.73684210527|     9|2005|2015|1899392000|  9|
|                      2007|            2014|          95500.0|    17|2007|2014|1800504000| 17|
|                      2004|            2014|          79550.0|     5|2004|2014|1991340000|  5|
|                      2003|            

In [0]:
avg_claim_cost_data= df14.select("Accident/underwriting year","Development year","sum(claim_cost)")
avg_claim_cost_data.show()


+--------------------------+----------------+-----------------+
|Accident/underwriting year|Development year|  sum(claim_cost)|
+--------------------------+----------------+-----------------+
|                      2007|            2016|94222.22222222222|
|                      2002|            2012|          68000.0|
|                      2005|            2013|84789.47363157895|
|                      2005|            2012|85368.42105263157|
|                      2005|            2015|85894.73684210527|
|                      2007|            2014|          95500.0|
|                      2004|            2014|          79550.0|
|                      2003|            2012|67047.61904761905|
|                      2006|            2012|94166.66666666667|
|                      2004|            2012|          79150.0|
|                      2006|            2014|93777.77777777778|
|                      2005|            2014|84789.47363157895|
|                      2006|            

In [0]:
avg_claim_cost_data= avg_claim_cost_data.orderBy(col("Accident/underwriting year").asc(),col("Development year").asc())
avg_claim_cost_data.show()

+--------------------------+----------------+-----------------+
|Accident/underwriting year|Development year|  sum(claim_cost)|
+--------------------------+----------------+-----------------+
|                      2002|            2012|          68000.0|
|                      2003|            2012|67047.61904761905|
|                      2003|            2013|66619.04761904762|
|                      2004|            2012|          79150.0|
|                      2004|            2013|          78900.0|
|                      2004|            2014|          79550.0|
|                      2005|            2012|85368.42105263157|
|                      2005|            2013|84789.47363157895|
|                      2005|            2014|84789.47363157895|
|                      2005|            2015|85894.73684210527|
|                      2006|            2012|94166.66666666667|
|                      2006|            2013|          94000.0|
|                      2006|            

In [0]:
avg_claim_cost_data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/ctp_avg_claim_cost_data2.csv")

input_path = "dbfs:/FileStore/df/ctp_avg_claim_cost_data2.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,sum(claim_cost)
2002,2012,68000.0
2003,2012,67047.61904761905
2003,2013,66619.04761904762
2004,2012,79150.0
2004,2013,78900.0
2004,2014,79550.0
2005,2012,85368.42105263157
2005,2013,84789.47363157895
2005,2014,84789.47363157895
2005,2015,85894.73684210527


In [0]:
print((avg_claim_cost_data.count(), len(avg_claim_cost_data.columns)))
print((df10.count(), len(df10.columns)))


(99, 3)
(99, 4)


In [0]:

df10= df10.select("Accident/underwriting year","Development year","sum(Value)")
df10.show()


+--------------------------+----------------+-----------------+
|Accident/underwriting year|Development year|       sum(Value)|
+--------------------------+----------------+-----------------+
|                      2002|              10|          68000.0|
|                      2003|               9|67047.61904761905|
|                      2003|              10|66619.04761904762|
|                      2004|               8|          79150.0|
|                      2004|               9|          78900.0|
|                      2004|              10|          79550.0|
|                      2005|               7|85368.42105263157|
|                      2005|               8|84789.47368421052|
|                      2005|               9|84789.47368421052|
|                      2005|              10|85894.73684210527|
|                      2006|               6|94166.66666666667|
|                      2006|               7|          94000.0|
|                      2006|            

In [0]:
df10.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").option("compression", "gzip").save("dbfs:/FileStore/df/new_avg_claim_cost_data1.csv")

input_path = "dbfs:/FileStore/df/new_avg_claim_cost_data1.csv"

in_data = spark.read.csv(input_path, header=True)

display(in_data)

Accident/underwriting year,Development year,sum(Value)
2002,10,2754.696132596685
2003,9,2177.855274629468
2003,10,2658.772874058127
2004,8,2285.4799015586545
2004,9,2281.609195402299
2004,10,2279.6052631578946
2005,7,2378.565921356977
2005,8,2361.5205585725366
2005,9,2352.578906851424
2005,10,2358.9147286821703
