# Exploring Data with Dataframes

### Introduction

Now one of the main benefits of working with Pyspark is the ability to explore large datasets.  In this lesson, we'll work with Houston Flood Insurance claims data to do just that.

In [43]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Then, let's read in the our csv data.

In [49]:
claims_df = spark.read.format("csv").option("header", "true").csv("./houston_claims.csv")

There are a lot of columns to this dataset, so let's set `vertical = True`. 

In [55]:
claims_df.show(vertical = True, n = 1)

-RECORD 0-----------------------------------------
 _c0                       | 0                    
 reportedCity              | HOUSTON              
 dateOfLoss                | 2017-08-27T00:00:... 
 elevatedBuildingIndicator | False                
 floodZone                 | X                    
 latitude                  | 29.7                 
 longitude                 | -95.5                
 lowestFloodElevation      | null                 
 amountPaidOnBuildingClaim | 195857.43            
 amountPaidOnContentsClaim | 0.0                  
 yearofLoss                | 2017-01-01T00:00:... 
 reportedZipcode           | 77096                
 id                        | 5e398d6774cbd479f... 
only showing top 1 row



And then let's take a look at the schema.

### Setting DataTypes

Now we can see that a number of columns are not in the correct format.

In [77]:
claims_df.dtypes

[('_c0', 'string'),
 ('reportedCity', 'string'),
 ('dateOfLoss', 'string'),
 ('elevatedBuildingIndicator', 'string'),
 ('floodZone', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('lowestFloodElevation', 'string'),
 ('amountPaidOnBuildingClaim', 'string'),
 ('amountPaidOnContentsClaim', 'string'),
 ('yearofLoss', 'string'),
 ('reportedZipcode', 'string'),
 ('id', 'string')]

For example, let's change `latitude` and `longitude` into floats, and the `amountPaid` columns into floats, and we can change the `yearOfLoss` column into an integer.

In [66]:
from pyspark.sql.types import DateType, BooleanType, DateType, IntegerType, FloatType

In [67]:
from pyspark.sql.functions import col

In [73]:
updated_claims_df = claims_df.withColumn("yearOfLoss", col("yearOfLoss").cast(IntegerType())) \
    .withColumn("latitude",col("latitude").cast(FloatType())) \
    .withColumn("longitude",col("longitude").cast(FloatType())) \
    .withColumn("amountPaidOnBuildingClaim",col("amountPaidOnBuildingClaim").cast(IntegerType())) \
    .withColumn("amountPaidOnContentsClaim",col("amountPaidOnContentsClaim").cast(IntegerType())) \
    .withColumn("dateOfLoss",col("dateOfLoss").cast(DateType()))

So we can see that we changed `latitude` and `longitude` into floats and 

In [80]:
updated_claims_df.dtypes

[('_c0', 'string'),
 ('reportedCity', 'string'),
 ('dateOfLoss', 'date'),
 ('elevatedBuildingIndicator', 'string'),
 ('floodZone', 'string'),
 ('latitude', 'float'),
 ('longitude', 'float'),
 ('lowestFloodElevation', 'string'),
 ('amountPaidOnBuildingClaim', 'int'),
 ('amountPaidOnContentsClaim', 'int'),
 ('yearOfLoss', 'int'),
 ('reportedZipcode', 'string'),
 ('id', 'string')]

In [97]:
df = spark.read.csv('houston_claims.csv', inferSchema=True, header=True)

In [98]:
df.dtypes

[('_c0', 'int'),
 ('reportedCity', 'string'),
 ('dateOfLoss', 'timestamp'),
 ('elevatedBuildingIndicator', 'boolean'),
 ('floodZone', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('lowestFloodElevation', 'double'),
 ('amountPaidOnBuildingClaim', 'double'),
 ('amountPaidOnContentsClaim', 'double'),
 ('yearofLoss', 'timestamp'),
 ('reportedZipcode', 'int'),
 ('id', 'string')]

### Aggregate Methods

Now we can get an overview of the data in each column with something like the following:

In [84]:
updated_claims_df.describe(['latitude', 'longitude']).show()

+-------+-------------------+-------------------+
|summary|           latitude|          longitude|
+-------+-------------------+-------------------+
|  count|              19943|              19943|
|   mean| 29.779967891591927| -95.44820225664996|
| stddev|0.31282914829415176|0.48715072859743314|
|    min|               29.5|             -149.8|
|    max|               61.6|              -80.2|
+-------+-------------------+-------------------+



Above, we use the `describe` method to calculate the `count`, `mean`, `stddev`, `min` and `max` of the `latitude` and `longitude` columns.  

> If we did not pass through the list `['latitude', 'longitude']` in the `describe` function, spark would show us all of the columns.

In [92]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [96]:
updated_claims_df.select([avg('latitude').alias("avg latitude"),
                          stddev('latitude')]).show()

+------------------+---------------------+
|      avg latitude|stddev_samp(latitude)|
+------------------+---------------------+
|29.779967891591927|  0.31282914829415176|
+------------------+---------------------+



### Resources

[Pyspark Operations](https://hendra-herviawan.github.io/)

[Spark SQL string Functions](https://sparkbyexamples.com/spark/usage-of-spark-sql-string-functions/)

[Pyspark From Pandas](https://databricks.com/session/data-wrangling-with-pyspark-for-data-scientists-who-know-pandas)