# Setting up a Schema in Spark

### Introduction

In this lesson, we'll practice using Spark to change the datatypes of our schema.  We'll do so using the database of flood insurance claims.  Let's get started.

### Getting Set Up

Let's get started by connecting to our Spark cluster with a spark session.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("houstonClaims") \
    .getOrCreate()

Then, let's read in the our csv data.

### Setting DataTypes

Then let's load our data from the csv file.

In [8]:
import pandas as pd
df = pd.read_csv('./houston_claims.csv', index_col = 0).astype(str)

And then load the data into a spark dataframe.

In [9]:
claims_df = spark.createDataFrame(df)

Next use the `dtypes` method to see the format of our data.

In [11]:
claims_df.dtypes

# [('reportedCity', 'string'),
#  ('dateOfLoss', 'string'),
#  ('elevatedBuildingIndicator', 'string'),
#  ('floodZone', 'string'),
#  ('latitude', 'string'),
#  ('longitude', 'string'),
#  ('lowestFloodElevation', 'string'),
#  ('amountPaidOnBuildingClaim', 'string'),
#  ('amountPaidOnContentsClaim', 'string'),
#  ('yearofLoss', 'string'),
#  ('reportedZipcode', 'string'),
#  ('id', 'string')]

[('reportedCity', 'string'),
 ('dateOfLoss', 'string'),
 ('elevatedBuildingIndicator', 'string'),
 ('floodZone', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('lowestFloodElevation', 'string'),
 ('amountPaidOnBuildingClaim', 'string'),
 ('amountPaidOnContentsClaim', 'string'),
 ('yearofLoss', 'string'),
 ('reportedZipcode', 'string'),
 ('id', 'string')]

So we can see that multiple columns are not in the correct format.

Begin by making the following changes. 
* Change `latitude` and `longitude` into floats.

In [27]:
from pyspark.sql.types import DateType, BooleanType, DateType, IntegerType, FloatType

In [28]:
from pyspark.sql.functions import col

In [30]:
updated_claims_df = claims_df \
    .withColumn("latitude",col("latitude").cast(FloatType())) \
    .withColumn("longitude",col("longitude").cast(FloatType()))


If we check the datatypes of `latitude` `longitude` and `id`, we should see the following:

In [33]:
updated_claims_df.select(['latitude', 'longitude'])
# DataFrame[latitude: float, longitude: float, id: int]

DataFrame[latitude: float, longitude: float]

And we should see the following values.

In [34]:
updated_claims_df.select(['latitude', 'longitude']).show(2)

+--------+---------+
|latitude|longitude|
+--------+---------+
|    29.7|    -95.5|
|    29.5|    -95.1|
+--------+---------+
only showing top 2 rows



Of course, we still have some additional columns that we should update.

In [35]:
c.printSchema()

root
 |-- reportedCity: string (nullable = true)
 |-- dateOfLoss: string (nullable = true)
 |-- elevatedBuildingIndicator: string (nullable = true)
 |-- floodZone: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- lowestFloodElevation: string (nullable = true)
 |-- amountPaidOnBuildingClaim: string (nullable = true)
 |-- amountPaidOnContentsClaim: string (nullable = true)
 |-- yearofLoss: string (nullable = true)
 |-- reportedZipcode: string (nullable = true)
 |-- id: string (nullable = true)



So now let's make the following changes to the `amountPaid` and `yearOfLoss` columns:
* Change the `amountPaidOnBuildingClaim` and columns `amountPaidOnContentsClaim` into floats, and rename the columns to `amount_paid_bldg`, and `amount_paid_contents` respectively.   
* And change the `yearOfLoss` column into an integer, renaming the column to `year`.

In [43]:
updated_renamed_claims_df = updated_claims_df.withColumnRenamed('yearOfLoss', 'year') \
.withColumn("year", col("year").cast(IntegerType())) \
.withColumnRenamed("amountPaidOnBuildingClaim", "amount_paid_bldg") \
.withColumn("amount_paid_bldg",col("amount_paid_bldg").cast(IntegerType())) \
.withColumnRenamed("amountPaidOnContentsClaim", "amount_paid_contents") \
.withColumn("amount_paid_contents",col("amount_paid_contents").cast(IntegerType()))

And then let's take a look at this new dataframe.

In [44]:
updated_renamed_claims_df.dtypes

[('reportedCity', 'string'),
 ('dateOfLoss', 'string'),
 ('elevatedBuildingIndicator', 'string'),
 ('floodZone', 'string'),
 ('latitude', 'float'),
 ('longitude', 'float'),
 ('lowestFloodElevation', 'string'),
 ('amount_paid_bldg', 'int'),
 ('amount_paid_contents', 'int'),
 ('year', 'int'),
 ('reportedZipcode', 'string'),
 ('id', 'string')]

Ok, so the remaining column to change is the `dateOfLoss` column.  Let's tackle this in the next section.

### Changing the date column

Finally change the `to_date` column.

In [45]:
updated_renamed_claims_df.select('dateOfLoss').show(2)

+--------------------+
|          dateOfLoss|
+--------------------+
|2017-08-27T00:00:...|
|2008-09-12T00:00:...|
+--------------------+
only showing top 2 rows



In [46]:
from pyspark.sql.types import TimestampType

In [57]:
coerced_claims_df = updated_renamed_claims_df.withColumnRenamed('dateOfLoss', 'date'). \
withColumn("date", col("date").cast(TimestampType()))

In [58]:
coerced_claims_df.select('date').show(3)

+-------------------+
|               date|
+-------------------+
|2017-08-26 20:00:00|
|2008-09-11 20:00:00|
|2004-06-28 20:00:00|
+-------------------+
only showing top 3 rows



### Using SQL

In [59]:
coerced_claims_df.createOrReplaceTempView("claims")

In [62]:
spark.sql("SELECT year(date) as year_of_claim FROM claims LIMIT 2").show()

+-------------+
|year_of_claim|
+-------------+
|         2017|
|         2008|
+-------------+



### Resources

[Pyspark Operations](https://hendra-herviawan.github.io/)

[Spark SQL string Functions](https://sparkbyexamples.com/spark/usage-of-spark-sql-string-functions/)

[Pyspark From Pandas](https://databricks.com/session/data-wrangling-with-pyspark-for-data-scientists-who-know-pandas)

[Spark Tricks Blog](https://georgheiler.com/2019/02/22/dynamically-select-columns-by-type/)