<a href="https://colab.research.google.com/github/sahilshah9111/SQLAnalysis_Spark/blob/main/GDTC_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**SECTION 1: ENVIRONMENT SETUP AND DATA LOADING**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
!pip install -q findspark
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("DE_Analysis").getOrCreate())

In [None]:
ins = spark.read.csv(header = True, inferSchema = True, path = 'insurance_data.csv')
ins.printSchema()
ins.show(3)

root
 |-- TXN_DATE_TIME: string (nullable = true)
 |-- TRANSACTION_ID: string (nullable = true)
 |-- CUSTOMER_ID: string (nullable = true)
 |-- POLICY_NUMBER: string (nullable = true)
 |-- POLICY_EFF_DT: string (nullable = true)
 |-- LOSS_DT: string (nullable = true)
 |-- REPORT_DT: string (nullable = true)
 |-- INSURANCE_TYPE: string (nullable = true)
 |-- PREMIUM_AMOUNT: double (nullable = true)
 |-- CLAIM_AMOUNT: integer (nullable = true)
 |-- CUSTOMER_NAME: string (nullable = true)
 |-- ADDRESS_LINE1: string (nullable = true)
 |-- ADDRESS_LINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- SSN: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- TENURE: integer (nullable = true)
 |-- EMPLOYMENT_STATUS: string (nullable = true)
 |-- NO_OF_FAMILY_MEMBERS: integer (nullable = true)
 |-- RISK_SEGMENTATION: string (nullable = t

In [None]:
emp = spark.read.csv(header = True, inferSchema = True, path = 'employee_data.csv')
emp.printSchema()
emp.show(5)

root
 |-- AGENT_ID: string (nullable = true)
 |-- AGENT_NAME: string (nullable = true)
 |-- DATE_OF_JOINING: string (nullable = true)
 |-- ADDRESS_LINE1: string (nullable = true)
 |-- ADDRESS_LINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- EMP_ROUTING_NUMBER: integer (nullable = true)
 |-- EMP_ACCT_NUMBER: string (nullable = true)

+----------+--------------+---------------+--------------------+-------------+----------------+-----+-----------+------------------+------------------+
|  AGENT_ID|    AGENT_NAME|DATE_OF_JOINING|       ADDRESS_LINE1|ADDRESS_LINE2|            CITY|STATE|POSTAL_CODE|EMP_ROUTING_NUMBER|   EMP_ACCT_NUMBER|
+----------+--------------+---------------+--------------------+-------------+----------------+-----+-----------+------------------+------------------+
|AGENT00001|     Ray Johns|     1993-06-05|    1402 Maggies Way|         null|Waterbury Center|   VT|   

In [None]:
emp = emp.withColumnRenamed("ADDRESS_LINE1","EMP_ADDRESS_LINE1") \
    .withColumnRenamed("ADDRESS_LINE2","EMP_ADDRESS_LINE2") \
    .withColumnRenamed("CITY","EMP_CITY") \
    .withColumnRenamed("STATE","EMP_STATE") \
    .withColumnRenamed("POSTAL_CODE","EMP_POSTAL_CODE")
emp.printSchema()


root
 |-- AGENT_ID: string (nullable = true)
 |-- AGENT_NAME: string (nullable = true)
 |-- DATE_OF_JOINING: string (nullable = true)
 |-- EMP_ADDRESS_LINE1: string (nullable = true)
 |-- EMP_ADDRESS_LINE2: string (nullable = true)
 |-- EMP_CITY: string (nullable = true)
 |-- EMP_STATE: string (nullable = true)
 |-- EMP_POSTAL_CODE: integer (nullable = true)
 |-- EMP_ROUTING_NUMBER: integer (nullable = true)
 |-- EMP_ACCT_NUMBER: string (nullable = true)



In [None]:
vend = spark.read.csv(header = True, inferSchema = True, path = 'vendor_data.csv')
vend.printSchema()
vend.show(5)

root
 |-- VENDOR_ID: string (nullable = true)
 |-- VENDOR_NAME: string (nullable = true)
 |-- ADDRESS_LINE1: string (nullable = true)
 |-- ADDRESS_LINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)

+---------+--------------------+--------------------+-------------+------------+-----+-----------+
|VENDOR_ID|         VENDOR_NAME|       ADDRESS_LINE1|ADDRESS_LINE2|        CITY|STATE|POSTAL_CODE|
+---------+--------------------+--------------------+-------------+------------+-----+-----------+
|VNDR00001|King, Proctor and...|2027 North Shanno...|           #5|Fayetteville|   AR|      72703|
|VNDR00002|          Garcia Ltd|5701 East Shirley...|         null|  Montgomery|   AL|      36117|
|VNDR00003|          Cherry LLC|1217 Cottondale Road|         null|  Montgomery|   AL|      36109|
|VNDR00004|         Mays-Benson|227 West Montgome...|         #736|    Savannah|   GA|      31406|
|VNDR00005

In [None]:
vend = vend.withColumnRenamed("ADDRESS_LINE1","VENDOR_ADDRESS_LINE1") \
    .withColumnRenamed("ADDRESS_LINE2","VENDOR_ADDRESS_LINE2") \
    .withColumnRenamed("CITY","VENDOR_CITY") \
    .withColumnRenamed("STATE","VENDOR_STATE") \
    .withColumnRenamed("POSTAL_CODE","VENDOR_POSTAL_CODE")
vend.printSchema()

root
 |-- VENDOR_ID: string (nullable = true)
 |-- VENDOR_NAME: string (nullable = true)
 |-- VENDOR_ADDRESS_LINE1: string (nullable = true)
 |-- VENDOR_ADDRESS_LINE2: string (nullable = true)
 |-- VENDOR_CITY: string (nullable = true)
 |-- VENDOR_STATE: string (nullable = true)
 |-- VENDOR_POSTAL_CODE: integer (nullable = true)



In [None]:
from pyspark.sql.functions import *

**Task 1**

In [None]:
df_join = ins.join(emp,on="AGENT_ID", how="leftouter").join(vend,on="VENDOR_ID", how="leftouter")

In [None]:
df_join.show(5)

+---------+----------+-------------------+--------------+-----------+-------------+-------------+----------+----------+--------------+--------------+------------+--------------------+--------------------+-------------+-----------+-----+-----------+-----------+--------------+---+------+-----------------+--------------------+-----------------+----------+------------+--------------+------------------+------------------------+------------+-----------------+-------------------+----------+-----------------------+--------------+--------------+------------------------+---------------+---------------+--------------------+-----------------+-----------+---------+---------------+------------------+------------------+--------------------+--------------------+--------------------+------------+------------+------------------+
|VENDOR_ID|  AGENT_ID|      TXN_DATE_TIME|TRANSACTION_ID|CUSTOMER_ID|POLICY_NUMBER|POLICY_EFF_DT|   LOSS_DT| REPORT_DT|INSURANCE_TYPE|PREMIUM_AMOUNT|CLAIM_AMOUNT|       CUSTOMER

In [None]:
distinctDF = df_join.distinct() #Checking for Duplicates
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)


Distinct count: 10000
+---------+----------+-------------------+--------------+-----------+-------------+-------------+----------+----------+--------------+--------------+------------+---------------------+---------------------------+-------------+-----------------+-----+-----------+-----------+--------------+---+------+-----------------+--------------------+-----------------+----------+------------+--------------+------------------+------------------------+------------+-----------------+-------------------+----------+-----------------------+--------------+-----------------+------------------------+----------------+---------------+-----------------------------+-----------------+-----------------+---------+---------------+------------------+------------------+-----------------------------+---------------------------+--------------------+-----------------+------------+------------------+
|VENDOR_ID|AGENT_ID  |TXN_DATE_TIME      |TRANSACTION_ID|CUSTOMER_ID|POLICY_NUMBER|POLICY_EFF_DT|LOSS

In [None]:
df_join.createOrReplaceTempView("df")

**Task 2**

In [None]:
spark.sql(
"""
SELECT INSURANCE_TYPE, 
SUM (CLAIM_AMOUNT) 
FROM df
GROUP BY INSURANCE_TYPE
ORDER BY 2 desc;
"""
).show(3)

+--------------+-----------------+
|INSURANCE_TYPE|sum(CLAIM_AMOUNT)|
+--------------+-----------------+
|          Life|         91478000|
|      Property|         41579000|
|        Health|         18254000|
+--------------+-----------------+
only showing top 3 rows



**Task 3**

In [None]:
spark.sql(
"""
SELECT STATE, 
SUM (CLAIM_AMOUNT) 
FROM df
WHERE RISK_SEGMENTATION == 'H'
GROUP BY STATE
ORDER BY 2 desc;
"""
).show(5)

+-----+-----------------+
|STATE|sum(CLAIM_AMOUNT)|
+-----+-----------------+
|   CA|          3396500|
|   AZ|          1879500|
|   AL|          1855500|
|   AR|          1794700|
|   CO|          1524900|
+-----+-----------------+
only showing top 5 rows



**Task 4**

In [None]:
from pyspark.sql import functions as f
df_join = df_join.withColumn(
    'COLOCATION',
    f.when((f.col("STATE") == f.col("INCIDENT_STATE")) & (f.col("INCIDENT_STATE") == f.col("EMP_STATE")), 1)\
    .otherwise(0)
)



In [None]:
df_join.agg({'COLOCATION': 'mean'}).show()

+---------------+
|avg(COLOCATION)|
+---------------+
|         0.0044|
+---------------+



**Task 5**

In [None]:
df_join = df_join.withColumn("AUTHORITY_CONTACTED", when(df_join.POLICE_REPORT_AVAILABLE == 1,"Police") \
          .otherwise(df_join.AUTHORITY_CONTACTED))
df_join.show()

+---------+----------+-------------------+--------------+-----------+-------------+-------------+----------+----------+--------------+--------------+------------+--------------------+--------------------+-------------+-----------+-----+-----------+-----------+--------------+---+------+-----------------+--------------------+-----------------+----------+------------+--------------+------------------+------------------------+------------+-----------------+-------------------+----------+-----------------------+--------------+-----------------+------------------------+-----------------+---------------+--------------------+-----------------+-----------------+---------+---------------+------------------+------------------+--------------------+--------------------+--------------------+------------+------------+------------------+----------+
|VENDOR_ID|  AGENT_ID|      TXN_DATE_TIME|TRANSACTION_ID|CUSTOMER_ID|POLICY_NUMBER|POLICY_EFF_DT|   LOSS_DT| REPORT_DT|INSURANCE_TYPE|PREMIUM_AMOUNT|CLAIM_

**Task 6**

In [None]:
from pyspark.sql.types import DateType
df1 = df_join.withColumn("TXN_DATE_TIME",df_join["TXN_DATE_TIME"].cast(DateType()))
df1.printSchema()

root
 |-- VENDOR_ID: string (nullable = true)
 |-- AGENT_ID: string (nullable = true)
 |-- TXN_DATE_TIME: date (nullable = true)
 |-- TRANSACTION_ID: string (nullable = true)
 |-- CUSTOMER_ID: string (nullable = true)
 |-- POLICY_NUMBER: string (nullable = true)
 |-- POLICY_EFF_DT: string (nullable = true)
 |-- LOSS_DT: string (nullable = true)
 |-- REPORT_DT: string (nullable = true)
 |-- INSURANCE_TYPE: string (nullable = true)
 |-- PREMIUM_AMOUNT: double (nullable = true)
 |-- CLAIM_AMOUNT: integer (nullable = true)
 |-- CUSTOMER_NAME: string (nullable = true)
 |-- ADDRESS_LINE1: string (nullable = true)
 |-- ADDRESS_LINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- SSN: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- TENURE: integer (nullable = true)
 |-- EMPLOYMENT_STATUS: string (nullable = true)
 |-- NO_OF_FAMILY_

In [None]:
curr_date = df_join.agg({'TXN_DATE_TIME': 'max'}).collect()[0][0] #Knowing current date

'2021-06-30 00:00:00'

In [None]:
last_30 = df1.filter((df1.TXN_DATE_TIME > '2021-05-30') & (df1.TXN_DATE_TIME < '2021-06-30'))
last_30.show(3)

+---------+----------+-------------+--------------+-----------+-------------+-------------+----------+----------+--------------+--------------+------------+-------------+--------------------+-------------+-----------+-----+-----------+-----------+--------------+---+------+-----------------+--------------------+-----------------+----------+------------+--------------+------------------+------------------------+------------+-----------------+-------------------+----------+-----------------------+--------------+-----------------+------------------------+----------------+---------------+------------------+-----------------+----------+---------+---------------+------------------+------------------+--------------------+--------------------+--------------------+-----------+------------+------------------+----------+
|VENDOR_ID|  AGENT_ID|TXN_DATE_TIME|TRANSACTION_ID|CUSTOMER_ID|POLICY_NUMBER|POLICY_EFF_DT|   LOSS_DT| REPORT_DT|INSURANCE_TYPE|PREMIUM_AMOUNT|CLAIM_AMOUNT|CUSTOMER_NAME|       AD

In [None]:
last_30.groupBy("INSURANCE_TYPE").agg(mean("CLAIM_AMOUNT")).show()

+--------------+------------------+
|INSURANCE_TYPE| avg(CLAIM_AMOUNT)|
+--------------+------------------+
|         Motor| 5573.770491803279|
|          Life|52242.857142857145|
|        Travel|            2800.0|
|        Health|10904.411764705883|
|        Mobile|378.35820895522386|
|      Property|24627.906976744187|
+--------------+------------------+



**Task 7**

In [None]:
spark.sql(
"""
SELECT AGENT_ID,COUNT(DISTINCT INSURANCE_TYPE),
SUM (CLAIM_AMOUNT) 
FROM df
GROUP BY AGENT_ID
HAVING COUNT(DISTINCT INSURANCE_TYPE)>2
ORDER BY 3 desc;
"""
).show()

+----------+------------------------------+-----------------+
|  AGENT_ID|count(DISTINCT INSURANCE_TYPE)|sum(CLAIM_AMOUNT)|
+----------+------------------------------+-----------------+
|AGENT00807|                             6|           528800|
|AGENT00679|                             4|           489000|
|AGENT00771|                             5|           422100|
|AGENT00125|                             4|           400400|
|AGENT00789|                             4|           392900|
|AGENT00525|                             6|           385900|
|AGENT00319|                             6|           375600|
|AGENT00388|                             6|           373800|
|AGENT00439|                             5|           370000|
|AGENT00482|                             4|           368000|
|AGENT00881|                             6|           366400|
|AGENT00763|                             5|           361000|
|AGENT00914|                             3|           360000|
|AGENT00

**Task 8**

In [None]:
df_join.agg({'PREMIUM_AMOUNT': 'sum'}).show()

+-------------------+
|sum(PREMIUM_AMOUNT)|
+-------------------+
|  885085.9499999995|
+-------------------+



In [None]:
df2 = df_join.withColumn("PREMIUM_AMOUNT", when(df_join.INSURANCE_TYPE == 'Mobile',df_join.PREMIUM_AMOUNT*0.9) \
          .when(df_join.INSURANCE_TYPE == 'Travel',df_join.PREMIUM_AMOUNT*0.9) \
          .when(df_join.INSURANCE_TYPE == 'Health',df_join.PREMIUM_AMOUNT*1.07) \
          .when(df_join.INSURANCE_TYPE == 'Property',df_join.PREMIUM_AMOUNT*1.07) \
          .when(df_join.INSURANCE_TYPE == 'Life',df_join.PREMIUM_AMOUNT*1.02) \
          .when(df_join.INSURANCE_TYPE == 'Motor',df_join.PREMIUM_AMOUNT*1.02) \
          .otherwise(df_join.PREMIUM_AMOUNT))

df2.show()

+---------+----------+-------------------+--------------+-----------+-------------+-------------+----------+----------+--------------+------------------+------------+--------------------+--------------------+-------------+-----------+-----+-----------+-----------+--------------+---+------+-----------------+--------------------+-----------------+----------+------------+--------------+------------------+------------------------+------------+-----------------+-------------------+----------+-----------------------+--------------+-----------------+------------------------+-----------------+---------------+--------------------+-----------------+-----------------+---------+---------------+------------------+------------------+--------------------+--------------------+--------------------+------------+------------+------------------+----------+
|VENDOR_ID|  AGENT_ID|      TXN_DATE_TIME|TRANSACTION_ID|CUSTOMER_ID|POLICY_NUMBER|POLICY_EFF_DT|   LOSS_DT| REPORT_DT|INSURANCE_TYPE|    PREMIUM_AMOUN

In [None]:
df2.agg({'PREMIUM_AMOUNT': 'sum'}).show()

+-------------------+
|sum(PREMIUM_AMOUNT)|
+-------------------+
|  908805.0041999974|
+-------------------+



In [None]:
a = df_join.agg({'PREMIUM_AMOUNT': 'sum'}).collect()[0][0]
b = df2.agg({'PREMIUM_AMOUNT': 'sum'}).collect()[0][0]
perc_change = ((b-a)/a)*100
perc_change

2.679858854385603

**Task 9**

In [None]:
df_join = df_join.withColumn(
    'ELIGIBLE_FOR_DISCOUNT',
    f.when((f.col("TENURE") > 60) & (f.col("EMPLOYMENT_STATUS") == 'N') & (f.col("NO_OF_FAMILY_MEMBERS") >= 4), 1)\
    .otherwise(0)
)


In [None]:
df_join.agg({'ELIGIBLE_FOR_DISCOUNT': 'mean'}).show()

+--------------------------+
|avg(ELIGIBLE_FOR_DISCOUNT)|
+--------------------------+
|                    0.0299|
+--------------------------+



**Task 11**

In [None]:
spark.sql(
"""
SELECT AGENT_ID, 
SUM (CLAIM_AMOUNT),
PERCENT_RANK () over (ORDER BY SUM(CLAIM_AMOUNT))*100 as Percentile
FROM df
GROUP BY AGENT_ID
"""
).show(5)

+----------+-----------------+-------------------+
|  AGENT_ID|sum(CLAIM_AMOUNT)|         Percentile|
+----------+-----------------+-------------------+
|AGENT01154|              800|                0.0|
|AGENT01077|             4200|0.08340283569641367|
|AGENT00621|             5300|0.16680567139282734|
|AGENT00604|             5500|0.25020850708924103|
|AGENT01037|             5600| 0.3336113427856547|
+----------+-----------------+-------------------+
only showing top 5 rows



**Task 12**

In [None]:
df3 = df_join.filter((df_join.CLAIM_STATUS == 'A') & (df_join.RISK_SEGMENTATION == 'H') & (df_join.INCIDENT_SEVERITY == 'Major Loss'))
df3.show(5)

+---------+----------+-------------------+--------------+-----------+-------------+-------------+----------+----------+--------------+--------------+------------+-------------------+--------------------+-------------+----------+-----+-----------+-----------+--------------+---+------+-----------------+--------------------+-----------------+----------+------------+--------------+------------------+------------------------+------------+-----------------+-------------------+----------+-----------------------+--------------+-------------+------------------------+-----------------+---------------+--------------------+-----------------+---------------+---------+---------------+------------------+------------------+--------------------+--------------------+--------------------+------------+------------+------------------+----------+---------------------+
|VENDOR_ID|  AGENT_ID|      TXN_DATE_TIME|TRANSACTION_ID|CUSTOMER_ID|POLICY_NUMBER|POLICY_EFF_DT|   LOSS_DT| REPORT_DT|INSURANCE_TYPE|PREMIUM

In [None]:
df3.createOrReplaceTempView("df_3")

In [None]:
suspicious = spark.sql(
"""
select AGENT_ID, SUM(CLAIM_AMOUNT) as Total_Claim_Amount
from df_3
GROUP BY AGENT_ID
"""
)

In [None]:
suspicious.show(5)

+----------+------------------+
|  AGENT_ID|Total_Claim_Amount|
+----------+------------------+
|AGENT00514|              5000|
|AGENT01019|              2000|
|AGENT00315|             39400|
|AGENT00739|             18000|
|AGENT01020|             78000|
+----------+------------------+
only showing top 5 rows



In [None]:
suspicious = suspicious.withColumn(
    'IS_SUSP',
    f.when(f.col("Total_Claim_Amount") >= 15000, 1)\
    .otherwise(0)
)

suspicious.show(5)

+----------+------------------+-------+
|  AGENT_ID|Total_Claim_Amount|IS_SUSP|
+----------+------------------+-------+
|AGENT00514|              5000|      0|
|AGENT01019|              2000|      0|
|AGENT00315|             39400|      1|
|AGENT00739|             18000|      1|
|AGENT01020|             78000|      1|
+----------+------------------+-------+
only showing top 5 rows



In [None]:
suspicious.agg({'IS_SUSP': 'mean'}).show()

+------------------+
|      avg(IS_SUSP)|
+------------------+
|0.4269340974212034|
+------------------+

