# PySpark Tutorial - Data Transformation

Create the Spark Session required for any PySpark program.  Most programs will store this in a variable named `spark`.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, length, lit, concat, to_date
from pyspark.sql.types import IntegerType, DateType, StringType

spark = SparkSession.builder.appName("PySparkTutorial").getOrCreate()

## Read the Source Data
The following code creates a DataFrame for both the policies and claims by reading CSV files.

In [2]:
policyDF = spark.read.option("header",True).csv("./data/policy.csv") \
                .withColumn("sum_insured", col("sum_insured").cast(IntegerType())) \
                .withColumn("vehicle_age", col("vehicle_age").cast(IntegerType()))
claimsDF = spark.read.option("header",True).csv("./data/claims.csv") \
                .withColumn("cost", col("cost").cast(IntegerType()))

Analysis of the Schemas show that the date variables are `string` types rather than Spark `date` types.

In [3]:
policyDF.printSchema()
claimsDF.printSchema()

root
 |-- policy: string (nullable = true)
 |-- make: string (nullable = true)
 |-- vehicle_age: integer (nullable = true)
 |-- sum_insured: integer (nullable = true)
 |-- inception_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)

root
 |-- policy: string (nullable = true)
 |-- incident_date: string (nullable = true)
 |-- cost: integer (nullable = true)



## Creating new Columns
The `withColumn()` method can be used to create new column in a number of ways.

`when()` and `otherwise` can be used to create columns using similar logic to SQL `case` statements.

Note that referring to columns in code can be done using `dataframe.column-name` or `col("column-name")` in this case with equal results.

In [4]:
# These two methods of creating policyDF produce identical results.
policyDF = policyDF.withColumn("status", when(policyDF.start_date==policyDF.inception_date, "New Business") \
                                        .otherwise("Renewal"))
policyDF = policyDF.withColumn("status", when(col("start_date")==col("inception_date"), "New Business") \
                                        .otherwise("Renewal"))

The `select()` method can be used to both subset the number of columns, and also order them in the resulting DataFrame.

In [5]:
policyDF.select(["policy", "start_date", "status", "make", "sum_insured"]).show(5)

+-------+----------+------------+------+-----------+
| policy|start_date|      status|  make|sum_insured|
+-------+----------+------------+------+-----------+
|CAR0001|  20180101|New Business|TOYOTA|      15000|
|CAR0001|  20190101|     Renewal|TOYOTA|      13500|
|CAR0001|  20200101|     Renewal|TOYOTA|      12000|
|CAR0002|  20200210|New Business|SUBARU|      14000|
|CAR0003|  20180315|New Business|  FORD|      10000|
+-------+----------+------------+------+-----------+
only showing top 5 rows



## Transformation using a Python Function
A python function can provide abilities that are not available in SQL.

The following function will iterate over all columns in a DataFrame using `df.columns`. This appears to be constantly creating new DataFrames, however it should be remembered that Spark uses Lazy Evaluation, and this is just creating a chain of instructions that can later be optimised.

In [6]:
def fix_dates(df):
    """Find all columns named *_date and convert from string to Spark Date type."""
    for col in df.columns:
        if col.endswith("_date") and dict(df.dtypes)[col]=='string':
            print("NOTE: Fixing date column '{}'.".format(col))
            df = df.withColumn(col, to_date(df[col], "yyyyMMdd"))
    return df

policyDF = fix_dates(policyDF)
claimsDF = fix_dates(claimsDF)

NOTE: Fixing date column 'inception_date'.
NOTE: Fixing date column 'start_date'.
NOTE: Fixing date column 'end_date'.
NOTE: Fixing date column 'incident_date'.


Viewing the updated DataFrames shows that the date columns are now using the Spark `date` types.

In [7]:
policyDF.printSchema()
policyDF.show(5)

root
 |-- policy: string (nullable = true)
 |-- make: string (nullable = true)
 |-- vehicle_age: integer (nullable = true)
 |-- sum_insured: integer (nullable = true)
 |-- inception_date: date (nullable = true)
 |-- start_date: date (nullable = true)
 |-- end_date: date (nullable = true)
 |-- status: string (nullable = false)

+-------+------+-----------+-----------+--------------+----------+----------+------------+
| policy|  make|vehicle_age|sum_insured|inception_date|start_date|  end_date|      status|
+-------+------+-----------+-----------+--------------+----------+----------+------------+
|CAR0001|TOYOTA|          1|      15000|    2018-01-01|2018-01-01|2018-12-31|New Business|
|CAR0001|TOYOTA|          2|      13500|    2018-01-01|2019-01-01|2019-12-31|     Renewal|
|CAR0001|TOYOTA|          3|      12000|    2018-01-01|2020-01-01|2020-12-31|     Renewal|
|CAR0002|SUBARU|          2|      14000|    2020-02-10|2020-02-10|2021-02-09|New Business|
|CAR0003|  FORD|          6|      

## Joining DataFrames

In [8]:
combinedDF = policyDF.select(["policy", "make", "vehicle_age", "sum_insured", "start_date", "end_date"]) \
                     .join(claimsDF, (policyDF.policy==claimsDF.policy) &
                                     (policyDF.start_date<=claimsDF.incident_date) &
                                     (policyDF.end_date>=claimsDF.incident_date))
combinedDF.show(5)

+-------+------+-----------+-----------+----------+----------+-------+-------------+-----+
| policy|  make|vehicle_age|sum_insured|start_date|  end_date| policy|incident_date| cost|
+-------+------+-----------+-----------+----------+----------+-------+-------------+-----+
|CAR0001|TOYOTA|          3|      12000|2020-01-01|2020-12-31|CAR0001|   2020-06-05| 5000|
|CAR0004| MAZDA|          5|      10000|2020-04-02|2021-04-01|CAR0004|   2020-06-10| 3000|
|CAR0007|   BMW|          4|      24000|2020-07-13|2021-07-12|CAR0007|   2020-09-10|24000|
|CAR0009| TESLA|          1|      72000|2019-09-17|2020-09-16|CAR0009|   2020-02-10|15000|
+-------+------+-----------+-----------+----------+----------+-------+-------------+-----+



The duplicate `policy` column can be removed with either another `select()` (which may be a very long list) or a `drop()`.

In [9]:
combinedDF = policyDF.select(["policy", "make", "vehicle_age", "sum_insured", "start_date", "end_date"]) \
                     .join(claimsDF, (policyDF.policy==claimsDF.policy) &
                                     (policyDF.start_date<=claimsDF.incident_date) &
                                     (policyDF.end_date>=claimsDF.incident_date)) \
                     .drop(claimsDF.policy)
combinedDF.show(5)

+-------+------+-----------+-----------+----------+----------+-------------+-----+
| policy|  make|vehicle_age|sum_insured|start_date|  end_date|incident_date| cost|
+-------+------+-----------+-----------+----------+----------+-------------+-----+
|CAR0001|TOYOTA|          3|      12000|2020-01-01|2020-12-31|   2020-06-05| 5000|
|CAR0004| MAZDA|          5|      10000|2020-04-02|2021-04-01|   2020-06-10| 3000|
|CAR0007|   BMW|          4|      24000|2020-07-13|2021-07-12|   2020-09-10|24000|
|CAR0009| TESLA|          1|      72000|2019-09-17|2020-09-16|   2020-02-10|15000|
+-------+------+-----------+-----------+----------+----------+-------------+-----+



Note that multiple `select`s can appear in the DataFrame logic as required.  The optimiser should help however forcing an early `select` will not hurt!

In [10]:
combinedDF = policyDF.select(["policy", "make", "vehicle_age", "sum_insured", "start_date", "end_date"]) \
                     .join(claimsDF, (policyDF.policy==claimsDF.policy) &
                                     (policyDF.start_date<=claimsDF.incident_date) &
                                     (policyDF.end_date>=claimsDF.incident_date)) \
                     .drop(claimsDF.policy) \
                     .select(["policy", "make", "vehicle_age", "sum_insured", "incident_date", "cost"])
combinedDF.show(5)

+-------+------+-----------+-----------+-------------+-----+
| policy|  make|vehicle_age|sum_insured|incident_date| cost|
+-------+------+-----------+-----------+-------------+-----+
|CAR0001|TOYOTA|          3|      12000|   2020-06-05| 5000|
|CAR0004| MAZDA|          5|      10000|   2020-06-10| 3000|
|CAR0007|   BMW|          4|      24000|   2020-09-10|24000|
|CAR0009| TESLA|          1|      72000|   2020-02-10|15000|
+-------+------+-----------+-----------+-------------+-----+



## Summarisation

The SQL concepts of `group by` and `order by` are also supported in PySpark.

The following example shows summarisation with `groupBy`:

In [11]:
summary = policyDF.groupBy("status").sum("sum_insured") \
                  .withColumnRenamed("sum(sum_insured)", "total_insured")
summary.show()

+------------+-------------+
|      status|total_insured|
+------------+-------------+
|     Renewal|       107500|
|New Business|       199000|
+------------+-------------+



It helps to save resources if you `stop()` the Spark session when you are finished.  Note that by doing this you will be unable to re-run any of the code above without first re-creating the `spark` variable.

In [12]:
spark.stop()