# Data Wrangling

In this lesson, we will acquire and prepare the data we will use in the rest of this module.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from env import get_db_url

spark = SparkSession.builder.getOrCreate()

## Reading Data

Spark lets us read data in from a variety of data sources using what it calls a `DataFrameReader`. We can access the `read` property of our `spark` object and then set various options and read from a data source.

In [2]:
df = spark.read.csv("data/source.csv", sep=",", header=True, inferSchema=True)

In [8]:
url = get_db_url("311_data")
query = "SELECT * FROM source;"
df = pd.read_sql(query, url)
df = spark.createDataFrame(df)
df.show(3)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
|    1|   103582|     Carmen Cura|
|    2|   106463| Richard Sanchez|
+-----+---------+----------------+
only showing top 3 rows



The above code could also be written like so:

In [3]:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("data/source.csv")
)

DataFrame[source_id: string, source_username: string]

### Data Schemas

Spark includes a concept of a *data schema*, which is a way to specify the types of our data ahead of time. Doing so lets us be sure about the structure of our data, and can significantly increase the speed of loading data (inferring the schema can be a costly operation for large datasets).

We'll import several things from the `pyspark.sql.types` module:

- `StringType`
- `DoubleType`
- `IntegerType`
- `LongType`
- `ShortType`
- `TimestampType`
- `FloatType`
- `DateType`

All of the above types will go inside of a `StructField`, which will be encapsulated in a `StructType`, and the resulting object will represent our data schema.

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("data/source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

Notice that instead of `inferSchema=True`, we pass the `schema` object in to the `read.csv` call.

### Writing Data

A spark dataframe can be written to a local destination using the `.write` property. Several common output formats are:

- `csv`: for writing to a local csv file(s)
- `parquet`: [Parquet](https://parquet.apache.org/) is a very popular columnar storage format for Hadoop.
- `json`: for writing to a local json file(s)
- `jdbc`: for writing to a SQL database table

In [5]:
# for demo purposes
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

mpg.write.json("data/mpg_json", mode="overwrite")

# like much else in spark, there's multiple ways we could do this:
(
    mpg.write.format("csv")
    .mode("overwrite")
    .option("header", "true")
    .save("data/mpg_csv")
)

## Data Preparation

For the rest of this lesson, we'll take a look at the `case` data from the San Antonio 311 calls dataset.

In [6]:
df = spark.read.csv("data/case.csv", header=True, inferSchema=True)
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

We will now cover various pieces of data preparation we wish to do and how to do those with spark.

### Rename Columns

We'll rename this column to match with the other date-type columns.

In [7]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

### Correct Data Types

Two columns, `case_closed` and `case_late` store yes/no values. Currently spark thinks they are strings; let's turn them into booleans:

In [8]:
# demonstrating we only have yes/no in each field
df.groupBy("case_closed", "case_late").count().show()

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



In [9]:
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



The `council_district` column appears as though it is an integer, but this is just a unique identifier for each district, that is, we aren't going to be performing arithmetic with this number, so we will turn it into a string type.

In [10]:
df.groupBy("council_district").count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



In [11]:
df = df.withColumn("council_district", col("council_district").cast("string"))

Now we will handle the 3 columns that have dates in them. We'll use spark's  `to_timestamp` function for this.

In order to work properly, we'll need to provide the date format when using `to_timestamp`. The date format is a little different than the date functionality we've worked with in pandas, this is becuase it is using [Java's `SimpleDateFormat`][1].

[1]: https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html

In [12]:
print("--- Before handling dates")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

print("--- After")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

--- Before handling dates
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows

--- After
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 00:42:00|2018-01-01 00:42:00|
|2018-01-01 00:46:00|2018-01-01 00:46:00|2018-01-01 00:46:00|
|2018-01-01 00:48:00|2018-01-01 00:48:00|2018-01-01 00:48:00|
|2018-01-01 01:29:00|2018-01-01 01:29:00|2018-01-01 01:29:00|
|2018-01-01 01:34:00|2018-01-01 01:34:00|2018-01-01 01:

### Data Transformations

Now that we have everything stored as the correct data type, we will make a few transformations to the data.

We'll begin by normalizing the request address field. Using the `trim` and `lower` functions lets us strip any leading or trailing whitespace and convert everything to lowercase.

In [13]:
print("--- Before")
df.select("request_address").show(5)

df = df.withColumn("request_address", trim(lower(df.request_address)))

print("--- After")
df.select("request_address").show(5)

--- Before
+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows

--- After
+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



Here we will convert the number of days a case is late to a number of weeks.

In [14]:
df = df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

df.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



Lastly, we can format the council district column a little differently. We'll add leading 0s to it:

In [15]:
df = df.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

df.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



### New Features

Let's now create some new features based on our existing data.

We will first extract the zipcode from the address:

In [16]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



Here we have defined the zipcode as the last sequence of digits at the end of the string.

Next we will create several new, related columns:

- `case_age`: How old the case is; the difference in days between when the case was opened and the current day
- `days_to_closed`: The number of days between when the case was opened and when it was closed
- `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [17]:
df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 00:42:00|     707|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-01 00:46:00|     707|             0|            0|
|       true|2018-01-01 00:48:00|2018-01-01 00:48:00|     707|             0|            0|
|       true|2018-01-01 01:29:00|2018-01-01 01:29:00|     707|             0|            0|
|       true|2018-01-01 01:34:00|2018-01-01 01:34:00|     707|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|ca

### Joining Department Data

We have access to another dataset, `dept.csv`, that contains more information about the various different departments.

In [18]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



It might be useful to include this data, so we can join it to our case dataframe using the `dept_division` column.

In [19]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 707                  
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

## Train Test Split

In [20]:
train, test = df.randomSplit([0.8, 0.2])
#
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

## Exercises

### Data Acquisition

These exercises should go in a notebook or script named `wrangle`. Add, commit, and push your changes.

This exercises uses the `case.csv`, `dept.csv`, and `source.csv` files from the
san antonio 311 call dataset.

1. Read the case, department, and source data into their own spark dataframes.

1. Let's see how writing to the local disk works in spark:

    - Write the code necessary to store the source data in both csv and json
      format, store these as `sources_csv` and `sources_json`
    - Inspect your folder structure. What do you notice?

9. Inspect the data in your dataframes. Are the data types appropriate? Write
   the code necessary to cast the values to the appropriate types.


---

1. How old is the latest (in terms of days past SLA) currently open issue? How
   long has the oldest (in terms of days since opened) currently opened issue
   been open?
1. How many Stray Animal cases are there?
1. How many service requests that are assigned to the Field Operations
   department (`dept_division`) are not classified as "Officer Standby" request
   type (`service_request_type`)?

1. Convert the `council_district` column to a string column.
1. Extract the year from the `case_closed_date` column.
1. Convert `num_days_late` from days to hours in new columns `num_hours_late`.

1. Join the case data with the source and department data.
1. Are there any cases that do not have a request source?

1. What are the top 10 service request types in terms of number of requests?
1. What are the top 10 service request types in terms of average days late?
1. Does number of days late depend on department?
1. How do number of days late depend on department and request type?

---

You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.