In [110]:
import pandas as pd
import json
from pyspark.sql import SparkSession, functions, types
#Create Spark Session and context
spark = SparkSession\
    .builder\
    .appName("example code")\
    .config("spark.driver.extraClassPath","/home/jim/spark-2.4.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.49.jar")\
    .getOrCreate()
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext
#Recover stored Pandas Dataframe Objects
%store -r df1
%store -r fake_df

### Convert Pandas Dataframes to Spark Dataframes

In [48]:
spark_df1 = spark.createDataFrame(df1)
spark_df2 = spark.createDataFrame(fake_df)
print('First Dataframe:')
spark_df1.select(spark_df1.columns[:9]).show(10)
print('\nSecond Dataframe:')
spark_df2.select(spark_df2.columns[:5]).show(10,truncate=True)

First Dataframe:
+-----------+---------+-----------------------+---+--------+--------+---------+-----------------+-----------------+
|Customer ID|     City|Customer Lifetime Value|Age|Response|Coverage|Education|Effective To Date|Employment_Status|
+-----------+---------+-----------------------+---+--------+--------+---------+-----------------+-----------------+
|    DA29911|   Bhopal|     1339.3965560000001| 32|      No|Extended|Grade XII|       07-12-2020|       Unemployed|
|    PL40163|  Kolkata|            6388.288678| 34|      No|   Basic|  Diploma|       05-03-2019|       Unemployed|
|    CS14065|  Chennai|            6077.675806| 49|     Yes| Premium|  Diploma|       06-05-2020|          Retired|
|    KJ49509|  Kolkata|     3615.7581549999995| 40|     Yes| Premium|  Grade X|       01-06-2018|       Unemployed|
|    CF57721|   Bhopal|            5015.968678| 47|     Yes| Premium| Graduate|       30-10-2020|         On leave|
|    BU35480|     Pune|            2928.508467| 46|    

### Now that we have the two SPark Dataframes, let us export their schema to a JSON file so that we can call upon it later if needed

In [49]:
#Dump schemas of both the Dataframes to respective JSON files
with open("schema_1.json", "w") as f:
    json.dump(spark_df1.schema.jsonValue(), f)
    
with open("schema_2.json", "w") as f:
    json.dump(spark_df2.schema.jsonValue(), f)
    
#Print the JSON file contents
with open("schema_1.json") as f:
    print('Schema 1:\n',types.StructType.fromJson(json.load(f)).simpleString())
    
with open("schema_2.json") as f:
    print('\nSchema 2:\n',types.StructType.fromJson(json.load(f)).simpleString())

Schema 1:
 struct<Customer ID:string,City:string,Customer Lifetime Value:double,Age:bigint,Response:string,Coverage:string,Education:string,Effective To Date:string,Employment_Status:string,Gender:string,Income:bigint,Location_Code:string,Marital Status:string,Monthly Premium Auto:bigint,Total Written Premium:bigint,Losses:bigint,Loss Ratio:double,Growth Rate:double,Commissions:bigint,Months Since Last Claim:bigint,Months Since Policy Inception:bigint,Number of Open Complaints:bigint,Number of Policies:bigint,Number of previous policies:bigint,Policy_Type:string,Policy_Rating:string,Renew_Offer_Type:string,Sales_Channel:string,Total Claim Amount:double,Feedback:string>

Schema 2:
 struct<Customer ID:string,Name:string,Address:string,Phone_no:string,Job:string,Company:string,Credit Card Provider:string,Email:string,SSN:string>


## Join the Dataframes
#### The Dataframes have a common column "Customer ID" which is also the primary key for both schemas
#### Since Spark SQL supports native SQL syntax, we can also write join operations after creating temporary tables on DataFrame’s and using spark.sql()

In [50]:
#Create Temp tables in SPark.sql
spark_df1.withColumnRenamed("Customer ID","Customer_ID").createOrReplaceTempView("DF1")
spark_df2.withColumnRenamed("Customer ID","Customer__ID").createOrReplaceTempView("DF2")

#SQL JOIN
joined_df = spark.sql("SELECT * FROM DF1 INNER JOIN DF2 ON DF1.Customer_ID = DF2.Customer__ID")
joined_df.select(joined_df.columns[:6]).show(10)

+-----------+---------+-----------------------+---+--------+--------+
|Customer_ID|     City|Customer Lifetime Value|Age|Response|Coverage|
+-----------+---------+-----------------------+---+--------+--------+
|    CF15071|  Chennai|            5182.902158| 27|     Yes|   Basic|
|    CP45498|     Pune|            2487.436701| 43|      No|   Basic|
|    GH17700|  Kolkata|            6856.146035| 45|      No|   Basic|
|    GR55787|Bengaluru|      7805.536112000001| 37|      No|   Basic|
|    HO30596|   Bhopal|            2191.281335| 28|     Yes|   Basic|
|    IG45345|     Pune|     2651.6145890000003| 50|      No|   Basic|
|    JJ22950|  Chennai|      8931.096209000001| 30|     Yes|Extended|
|    NM95686|Bengaluru|      9604.305604000001| 44|      No|Extended|
|    NV68421|    Delhi|             9735.67953| 33|      No| Premium|
|    OZ20388|Bengaluru|             8176.13623| 47|      No|Extended|
+-----------+---------+-----------------------+---+--------+--------+
only showing top 10 

#### Checkpoint and cache() the dataframe so we spend less time performing computations

In [51]:
master = joined_df
joined_df.cache()

DataFrame[Customer_ID: string, City: string, Customer Lifetime Value: double, Age: bigint, Response: string, Coverage: string, Education: string, Effective To Date: string, Employment_Status: string, Gender: string, Income: bigint, Location_Code: string, Marital Status: string, Monthly Premium Auto: bigint, Total Written Premium: bigint, Losses: bigint, Loss Ratio: double, Growth Rate: double, Commissions: bigint, Months Since Last Claim: bigint, Months Since Policy Inception: bigint, Number of Open Complaints: bigint, Number of Policies: bigint, Number of previous policies: bigint, Policy_Type: string, Policy_Rating: string, Renew_Offer_Type: string, Sales_Channel: string, Total Claim Amount: double, Feedback: string, Customer__ID: string, Name: string, Address: string, Phone_no: string, Job: string, Company: string, Credit Card Provider: string, Email: string, SSN: string]

## Cleaning
### Now we will apply the standard cleaning procedures:
### 1.Dropping Rows With Empty Values


In [52]:
df_len = joined_df.count()
joined_df = joined_df.dropna(how='any')
print('{} rows containing one or more null values were dropped'.format(df_len-joined_df.count()))

0 rows containing one or more null values were dropped


### 2.Dropping Duplicate Rows i.e. duplicated entries/values
#### Since our primary key is the Customer ID, we need to be sure that there is only one data entry for each CUstomer ID. In an ideal dataset, this step would drop zero rows 

In [53]:
joined_df = joined_df.dropDuplicates(subset=["Customer_ID"])
print('{} rows containing one or more null values were dropped'.format(df_len-joined_df.count()))

0 rows containing one or more null values were dropped


### 3. Clean Individual columns one by one
#### Let us proceed from left to right. Customer lifetimevalue seems to have many decimals and they are varying. We can standardize them

In [54]:
#Round the number of decimals to 2
joined_df = joined_df.withColumn("Customer Lifetime Value", functions.round("Customer Lifetime Value", 2))

#SHow the result
joined_df.select("Customer Lifetime Value").show(5,truncate=False)

#Print min and max values
print('\nRange of this columns is {} - {}'.format(joined_df.select('Customer Lifetime Value').rdd.min()[0],joined_df.select('Customer Lifetime Value').rdd.max()[0]))

+-----------------------+
|Customer Lifetime Value|
+-----------------------+
|5182.9                 |
|2487.44                |
|6856.15                |
|7805.54                |
|2191.28                |
+-----------------------+
only showing top 5 rows


Range of this columns is 1000.92 - 9998.96


#### We will apply the same function to the columns "Loss Ratio","Growth Rate" and "Total Claim Amount"

In [55]:
#Round the number of decimals to 2
joined_df = joined_df.withColumn("Loss Ratio", functions.round("Loss Ratio", 3))
joined_df = joined_df.withColumn("Growth Rate", functions.round("Growth Rate", 3))
joined_df = joined_df.withColumn("Total Claim Amount", functions.round("Total Claim Amount", 3))

#Show results
joined_df.select("Loss Ratio","Growth Rate","Total Claim Amount").show(5,truncate=False)

#Print ranges
for i in ["Loss Ratio","Growth Rate","Total Claim Amount"]:
    print('Range of {} is {} - {}'.format(i,joined_df.select(i).rdd.min()[0],joined_df.select(i).rdd.max()[0]))

+----------+-----------+------------------+
|Loss Ratio|Growth Rate|Total Claim Amount|
+----------+-----------+------------------+
|0.734     |0.715      |39565.507         |
|0.987     |7.916      |90278.463         |
|0.977     |-7.184     |61447.61          |
|0.735     |-3.985     |43840.157         |
|0.134     |-0.794     |18663.599         |
+----------+-----------+------------------+
only showing top 5 rows

Range of Loss Ratio is 0.0 - 1.0
Range of Growth Rate is -9.998 - 9.995
Range of Total Claim Amount is 10023.563 - 99982.258


#### Next we shall clean the Address column
Notice how the newline character seems to divide each entry into the house and town. We will just keep the first part.

In [56]:
joined_df = joined_df.withColumn("Address",functions.split("Address", "\n").getItem(0))
joined_df.select("Address").show(10,truncate=False)

+--------------------------------+
|Address                         |
+--------------------------------+
|40710 Christina Forest Suite 049|
|689 Hughes Haven Apt. 941       |
|PSC 3151, Box 0733              |
|775 Dean Lights Suite 377       |
|Unit 3938 Box 4045              |
|Unit 8463 Box 7425              |
|1596 Howe Field Apt. 622        |
|997 Peterson Center Suite 651   |
|2402 Cooper Neck Suite 618      |
|78122 Sims Green                |
+--------------------------------+
only showing top 10 rows



#### Now the Job column is clean but sometimes there is too much information, separated by a comma. We can discard the second part

In [57]:
joined_df = joined_df.withColumn("Job",functions.split("Job", ",").getItem(0))
joined_df.select("Job").show(10,truncate=False)

+------------------------+
|Job                     |
+------------------------+
|Insurance broker        |
|Magazine features editor|
|Scientist               |
|Medical physicist       |
|Best boy                |
|Electronics engineer    |
|Editor                  |
|Ship broker             |
|Herbalist               |
|Horticulturist          |
+------------------------+
only showing top 10 rows



#### Similarly, we can do so for the "Company" column but here we must keep the second part. So if there is only one, part, the code must handle this

In [71]:
joined_df = joined_df.withColumn("Company",functions.reverse(functions.split("Company", ",")).getItem(0))
joined_df.select('Company').show(10,truncate=False)

+--------------------+
|Company             |
+--------------------+
|Hoffman LLC         |
| Ward and Rice      |
|Johnson Inc         |
| Dickerson and Bates|
| Walton and Garcia  |
|Armstrong and Sons  |
|Irwin and Sons      |
|Branch Group        |
| Webb and Holloway  |
| White and Curtis   |
+--------------------+
only showing top 10 rows



### 4.Drop unnecessary columns 
#### While there is a lot of useful data, some of the columns are not required when it comes to using the data for churn prediction. Some of the unnecessary columns here are Customer_ID, Name, Address(not CIty), Phone number, Email and SSN as these fields are unlikely to affect a person's decision to buy insurance.

In [78]:
cols_to_drop = ["Customer__ID","Name","Address","Phone_no","Email","SSN"]
final_df = joined_df.select([C for C in joined_df.columns if C not in cols_to_drop])
final_df.columns

['Customer_ID',
 'City',
 'Customer Lifetime Value',
 'Age',
 'Response',
 'Coverage',
 'Education',
 'Effective To Date',
 'Employment_Status',
 'Gender',
 'Income',
 'Location_Code',
 'Marital Status',
 'Monthly Premium Auto',
 'Total Written Premium',
 'Losses',
 'Loss Ratio',
 'Growth Rate',
 'Commissions',
 'Months Since Last Claim',
 'Months Since Policy Inception',
 'Number of Open Complaints',
 'Number of Policies',
 'Number of previous policies',
 'Policy_Type',
 'Policy_Rating',
 'Renew_Offer_Type',
 'Sales_Channel',
 'Total Claim Amount',
 'Feedback',
 'Job',
 'Company',
 'Credit Card Provider']

## Load into MySQL Database

In [111]:
final_df.write\
    .format("jdbc")\
    .option("url", "jdbc:mysql://localhost/Insurance")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("dbtable", "Insurance_data").option("user", "jsully")\
    .option("password", "whatisreal1").mode('append').save()

Py4JJavaError: An error occurred while calling o1841.save.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:193)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:197)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [113]:
final_df.coalesce(1).write.format('csv').option('header',True).mode('overwrite').save('Insurance.csv')