<a href="https://colab.research.google.com/github/jinojossy93/my_stuff/blob/master/PySpark_Workshop_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL made simple with PySpark. A beginner's guide. Part 1.

Created by: Aida Martinez

In this workshop, we will create an ETL to extract crime data from different cities, transform it, and load it into a file. The datasets reflects reported incidents of crime that occurred in the Cities of San Francisco, Austin and Chicago during 2018. See the references at the end of this notebook for more information.

What will be covering?
- Installing and setting up PySpark
- Data Extraction
  - Read from Data Sources
  - Apply Schemas
  - Handling corrupt records



# Installing and Setting Up Spark

In [0]:
# Installing Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# Setting up the environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

PySpark isn't on sys.path by default, so that's why we use *finspark* to add it to sys.path at runtime. You need to import and call the *init()* method of findspark before calling PySpark.

In [0]:
import findspark
findspark.init()

For every Spark application, the first operation is to connect to the Spark master and get a
Spark session. This is an operation you will do every time. 

In [0]:
from pyspark.sql import SparkSession

# Getting a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

Until now, we connected the Spark application (or driver) to the master and we got a Spark session. SparkSession is the new entry point for Spark applications starting on **Apache Spark 2.0**. Spark Session allows you to interact with underlying Spark functionality and programming Spark with DataFrame and Dataset APIs (Scala).

In order to get and load the data into our local machine we use the `wget` command.

In [0]:
# Get San Francisco Crime data
!wget -cq https://datasets-pyspark-workshop.s3.amazonaws.com/San_Francisco_Crime_2018.csv

In [0]:
# Get Chicago crime data
!wget -cq https://datasets-pyspark-workshop.s3.amazonaws.com/Chicago_Crimes_2018.csv

In [0]:
# Get Austin crime data
!wget -cq https://datasets-pyspark-workshop.s3.amazonaws.com/Austin_Crime_Reports_2018.csv

In [0]:
# Get San Francisco corrupted data
!wget -cq https://datasets-pyspark-workshop.s3.amazonaws.com/San_Francisco_Crime_2018_corrupt.csv

# DataFrames
A dataframe is both a data structure and an API. Once created (instantiated), a DataFrame object has methods attached to it. Methods are operations one can perform on DataFrames such as filtering, counting, aggregating and many others.

Dataframes, as well as datasets and RDDs (resilient distributed datasets), are considered immutable storage. When applied a method to an DataFrame, its state cannot be modified after it is created.

The `.` indicates you are *applying a method on the object*.


# Data Extraction

## Data Sources
Spark has six **core** data sources and hundreds of external data sources written by the community. Following are Spark’s core data
sources:
- [CSV](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv)
- [JSON](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json)
- [Parquet](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.parquet)
- [ORC](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.orc)
- [JDBC/ODBC](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.jdbc) connections
- [Plain-text](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.text) files

Some of the community-created data sources are:
- [Cassandra](https://docs.databricks.com/spark/latest/data-sources/cassandra.html)
- [MongoDB](https://docs.databricks.com/spark/latest/data-sources/mongodb.html)
- [AWS Redshift](https://docs.databricks.com/spark/latest/data-sources/aws/amazon-redshift.html)

## The Design Pattern

All connections to datasources work in much the same way, whether your data sits in S3, Cassandra, Redshift, Relational DB or another common data store.  The general pattern is always: 
 
 ```
 spark.read.options(<option key>, <option value>).<connection_type>(<endpoint>)
 ```.

The `connection_type` can be `csv`, `jdbc`, `parquet`, `json`, etc.

Let's start by reading the first file and saving it into a Spark DataFrame. As all good masters, the cluster manager doesn't do much, it relies on slaves or workers to do the job. 
Spark will do a distributed ingestion which means you will ask `n` workers to ingest the file at the same time.
The workers will create tasks to read the file. Each worker has access to the node’s memory and will assign a memory partition to the task. 

In [0]:
# Read the San Francisco DataFrame
sf_crimes = (spark.read                                      #spark.read
                  .option("header", "true")                  #.option(<option key>, <option value>)
                  .option("inferSchema", "true")             #.option(<option key>, <option value>)
                  .csv("San_Francisco_Crime_2018.csv")       #.<connection_type>(<endpoint>)
            )

In [0]:
sf_crimes.show(5)

+--------------------+-------------+-------------+-------------+--------------------+--------------------+-----------+-----------+---------------+----------+----------------+-----------------------+------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+--------+---------------+---------------------+-------------------+------------------+-------------------+--------------------+
|   Incident Datetime|Incident Date|Incident Time|Incident Year|Incident Day of Week|     Report Datetime|     Row ID|Incident ID|Incident Number|CAD Number|Report Type Code|Report Type Description|Filed Online|Incident Code| Incident Category|Incident Subcategory|Incident Description|          Resolution|        Intersection|     CNN|Police District|Analysis Neighborhood|Supervisor District|          Latitude|          Longitude|               point|
+--------------------+-------------+-------------+-------------+--------------------+---

Also, you can use the following core structure for reading data: 
```
DataFrameReader.format(...).option("key", "value").schema(...).load()
```
The foundation for reading data in Spark is the [DataFrameReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader). We access this through the **SparkSession** via the read attribute:

`spark.read`

After we have a DataFrame reader, we specify several values:
- The format
- The schema
- The read mode
- A series of options

The *format*, *options*, and *schema* each return a DataFrameReader that can undergo further transformations and are all optional, except for one option. Each data source has a specific set of
options that determine how the data is read into Spark. At a minimum, you must supply the DataFrameReader a path to from which to read.

In [0]:
# Read the San Francisco DataFrame
sf_crimes = (spark.read                                      #spark.read
                  .format("csv")                             #The format
                  .option("mode", "FAILFAST")                #The read mode
                  .option("header", "true")                  #.option(<option key>, <option value>)
                  .option("inferSchema", "true")             #.option(<option key>, <option value>)
                  .load("San_Francisco_Crime_2018.csv")      #path to read
            )

In [0]:
sf_crimes.show(5)

+--------------------+-------------+-------------+-------------+--------------------+--------------------+-----------+-----------+---------------+----------+----------------+-----------------------+------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+--------+---------------+---------------------+-------------------+------------------+-------------------+--------------------+
|   Incident Datetime|Incident Date|Incident Time|Incident Year|Incident Day of Week|     Report Datetime|     Row ID|Incident ID|Incident Number|CAD Number|Report Type Code|Report Type Description|Filed Online|Incident Code| Incident Category|Incident Subcategory|Incident Description|          Resolution|        Intersection|     CNN|Police District|Analysis Neighborhood|Supervisor District|          Latitude|          Longitude|               point|
+--------------------+-------------+-------------+-------------+--------------------+---

In [0]:
# Read the San Francisco DataFrame
sf_crimes = (spark.read                                             #spark.read
                  .format("csv")                                    #The format
                  .option("mode", "FAILFAST")                       #The read mode
                  .option("header", "true")                         #.option(<option key>, <option value>)
                  .option("inferSchema", "true")                    #.option(<option key>, <option value>)
                  .option("path", "San_Francisco_Crime_2018.csv")   #The path to read from
                  .load()  
            )

In [0]:
sf_crimes.show(5)

+--------------------+-------------+-------------+-------------+--------------------+--------------------+-----------+-----------+---------------+----------+----------------+-----------------------+------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+--------+---------------+---------------------+-------------------+------------------+-------------------+--------------------+
|   Incident Datetime|Incident Date|Incident Time|Incident Year|Incident Day of Week|     Report Datetime|     Row ID|Incident ID|Incident Number|CAD Number|Report Type Code|Report Type Description|Filed Online|Incident Code| Incident Category|Incident Subcategory|Incident Description|          Resolution|        Intersection|     CNN|Police District|Analysis Neighborhood|Supervisor District|          Latitude|          Longitude|               point|
+--------------------+-------------+-------------+-------------+--------------------+---

**Exercise** - Reading the Austin Crime data

In [0]:
# Read the Austin Crime file and save it into a Dataframe
# TODO

**Test your solution!**

In [0]:
# Run this cell to test your solution
rows_austin_crimes = austin_crimes.count()
expected = 102644
assert str(expected) == str(rows_austin_crimes), "{} does not equal expected {}".format(rows_austin_crimes, expected)

print("Tests passed")

NameError: ignored

**Exercise**  - Reading the Chicago crimes data 

In [0]:
# Read the Chicago crimes file and save it into a DataFrame
# TODO

**Test your solution!**

In [0]:
# Run this cell to test your solution
rows_chicago_crimes = chicago_crimes.count()
expected = 266963
assert str(expected) == str(rows_chicago_crimes), "{} does not equal expected {}".format(rows_chicago_crimes, expected)

print("Tests passed")

### Adding More Options

When you import that data into a cluster, you can add options based on the specific characteristics of the data. **`option`** is a method of `DataFrameReader`. Options are key/value pairs and must be specified before calling `.csv()`. For instance, options for reading CSV data include `header`, `delimiter`, and `inferSchema`.

Be aware that Spark doesn't read the header by default, so you would need to specify the `header` = True option every time you have a file with a header.

**NOTE**: You can find all the available options to pass along the core data sources:
- [CSV](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv)
- [JDBC](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.jdbc)
- [JSON](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json)

## Collecting Rows to the Driver
There are times when you’ll want to collect some of your data to the driver in order to manipulate it on your local machine.
Thus far, we did not explicitly define this operation. However, we used several different methods for doing so that are effectively all the same. [collect](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.collect) gets all data from the entire DataFrame,
[take](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.take) selects the first N rows, and [show](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show) prints out a number of rows nicely.

In [0]:
sf_crimes.take(5) # take works with an Integer count

The `show` method will print by default the first 20 records.

In [0]:
sf_crimes.show() # this prints it out nicely

In [0]:
sf_crimes.show(5, False)

**Exercise** - collecting rows

Let's try out some of this methods with the Austin and Chicago DataFrames.

By default, `show()` will print the first 20 rows. If you want to print something different than 20, you can pass the number of rows you would like to print as parameter.



In [0]:
# Print the first 5 rows of the Austin Crime DataFrame
# Collect all the Austin Crime DataFrame
# Select the first 2 rows of the Austin Crime DataFrame
# TODO

**Other DataFrame methods:**


*   **count**: Returns the number of rows in this DataFrame.
*   **columns**: Returns all column names as a list.
*   **dtypes**: Returns all column names and their data types as a list.
*   **describe**: Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
*  **limit**: Limits the result count to the number specified.



**Exercise** - using other methods

Use the methods above to get the columns name and dtypes of each DataFrame.

In [0]:
# Get the columns name for sf_crimes DataFrame
# TODO

In [0]:
# Get the dtypes for the austin_crimes DataFrame
# TODO

In [0]:
# Get the count of records for the chicago_crimes DataFrame
# TODO

## Applying Schemas

Schemas are at the heart of data structures in Spark. A schema describes the structure of your data by naming columns and declaring the type of data in that column. Rigorously enforcing schemas leads to significant performance optimizations and reliability of code.

Why is open source Spark so fast?
* First and foremost, Spark runs first in memory rather than reading and writing to disk.
* Second, using DataFrames allows Spark to optimize the execution of your queries because it knows what your data looks like.

**Schema Inference**

* Import data as a DataFrame and view its schema with the `printSchema()` DataFrame method.
* Store the schema as an object by calling `.schema` on a DataFrame. Schemas consist of a `StructType`, which is a collection of `StructField`s.  Each `StructField` gives a name and a type for a given field in the data.

You can see all the available Spark types here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

To display the schema of the DataFrame, use the [printSchema](httphttps://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchemas://) method.
This tells you the field name, field type, and whether the column is nullable or not (default is true)

In [0]:
sf_crimes.printSchema()

In [0]:
sf_schema = sf_crimes.schema
print(sf_schema)

**Exercise** - Print a Schema

Take a look at the schema of the DataFrames: `austin_crimes` and `chicago_crimes`. See which fields are common between all DataFrames and which aren't.


In [0]:
# Check the Austin DataFrame schema
# TODO

In [0]:
# Assign the schema to a variable for the Austin DataFrame
# TODO

In [0]:
# Check the Chicago DataFrame schema
# TODO

In [0]:
# Assign the schema to a variable for the Chicago DataFrame
# TODO

Import the necessary types from the `types` module. Build a `StructType`, which takes a list of `StructField`s.  Each `StructField` takes three arguments: the name of the field, the type of data in it, and a `Boolean` for whether this field can be `Null`.

In [0]:
# Defining the San Francisco DataFrame Schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DecimalType, DoubleType, BooleanType
             
sf_schema = StructType([StructField("Incident Datetime",StringType(),True),
                StructField("Incident Date",StringType(),True),
                StructField("Incident Time",StringType(),True),
                StructField("Incident Year",IntegerType(),True),
                StructField("Incident Day of Week",StringType(),True),
                StructField("Report Datetime",StringType(),True),
                StructField("Row ID",LongType(),True),
                StructField("Incident ID",IntegerType(),True),
                StructField("Incident Number",IntegerType(),True),
                StructField("CAD Number",IntegerType(),True),
                StructField("Report Type Code",StringType(),True),
                StructField("Report Type Description",StringType(),True),
                StructField("Filed Online",BooleanType(),True),
                StructField("Incident Code",IntegerType(),True),
                StructField("Incident Category",StringType(),True),
                StructField("Incident Subcategory",StringType(),True),
                StructField("Incident Description",StringType(),True),
                StructField("Resolution",StringType(),True),
                StructField("Intersection",StringType(),True),
                StructField("CNN",DecimalType(8,0),True),
                StructField("Police District",StringType(),True),
                StructField("Analysis Neighborhood",StringType(),True),
                StructField("Supervisor District",IntegerType(),True),
                StructField("Latitude",DoubleType(),True),
                StructField("Longitude",DoubleType(),True),
                StructField("point",StringType(),True)])

In [0]:
print(sf_schema)

Apply the schema using the `.schema` method. This `read` returns only  the columns specified in the schema.
 A `LongType` is an 8-byte integer ranging up to 9,223,372,036,854,775,807 while `IntegerType` is a 4-byte integer ranging up to 2,147,483,647.

In [0]:
# Reading the San Francisco DataFrame with a user defined schema
sf_crimes2 = (spark.read
                  .format("csv")
                  .schema(sf_schema)
                  .option("header", "true")            
                  .load("San_Francisco_Crime_2018.csv")
            )

In [0]:
sf_crimes2.show(5)

**Benefits of user defined schemas include**:

* Avoiding the extra scan of your data needed to infer the schema
* Providing alternative data types
* Parsing only the fields you need (JSON files)

**Exercise (Optional)** - Defining the Chicago crime data schema

In [0]:
# Define a schema "chicago_schema" for the Chicago crime DataFrame
# Read the Chicago data using the schema you just defined
# TODO

## Handling Corrupt Records

ETL pipelines need robust solutions to handle corrupt data. This is because data corruption scales as the size of data and complexity of the data application grow. Corrupt data includes:

* Missing information
* Incomplete information
* Schema mismatch
* Differing formats or data types
* User errors when writing data producers

Since ETL pipelines are built to be automated, production-oriented solutions must ensure pipelines behave as expected. This means that data engineers must both expect and systematically handle corrupt records.

### Read modes
Reading data from an external source naturally entails encountering malformed data, especially when working with only semi-structured data sources. Read modes specify what will happen
when Spark does come across malformed records.

![alt text](https://images-workshop.s3.amazonaws.com/Spark_read_modes.png)

Source: Spark: The Definitive Guide. Big Data Processing Made Simple By Matei Zaharia, Bill Chambers

The default is *permissive*.

### Permissive mode
Sets all fields to `null` when it encounters a corrupted record and places all corrupted records in a string column called `corrupt_record`.
You will need to add this column to your schema if you are manually defining it.

Let's see what happens when we commented out one of the schema fields.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DecimalType, DoubleType, BooleanType
             
sf_schema = StructType([StructField("Incident Datetime",StringType(),True),
                StructField("Incident Date",StringType(),True),
                StructField("Incident Time",StringType(),True),
                StructField("Incident Year",IntegerType(),True),
                StructField("Incident Day of Week",StringType(),True),
                StructField("Report Datetime",StringType(),True),
                StructField("Row ID",LongType(),True),
                StructField("Incident ID",IntegerType(),True),
                StructField("Incident Number",IntegerType(),True),
                # StructField("CAD Number",IntegerType(),True),
                StructField("Report Type Code", StringType(),True),
                StructField("Report Type Description",StringType(),True),
                StructField("Filed Online",BooleanType(),True),
                StructField("Incident Code",IntegerType(),True),
                StructField("Incident Category",StringType(),True),
                StructField("Incident Subcategory",StringType(),True),
                StructField("Incident Description",StringType(),True),
                StructField("Resolution",StringType(),True),
                StructField("Intersection",StringType(),True),
                StructField("CNN",DecimalType(8,0),True),
                StructField("Police District",StringType(),True),
                StructField("Analysis Neighborhood",StringType(),True),
                StructField("Supervisor District",IntegerType(),True),
                StructField("Latitude",DoubleType(),True),
                StructField("Longitude",DoubleType(),True),
                StructField("point",StringType(),True),
                StructField("_corrupt_record", StringType(), True)
                ])

We just ignored one of the columns in the csv file. When Sparks meets a corrupted record in a CSV, it puts the malformed string into a field configured by `columnNameOfCorruptRecord`, and sets other fields to null.
Let's see what happen when we read the file using an user-define schema. 

**NOTE:** you should set a string type field named `_corrupt_record` in an user-defined schema to get the corrupt records.

In [0]:
# Read the San Francisco DataFrame using permissive mode
from pyspark.sql.functions import col

sf_crimes2 = (spark.read
                  .schema(sf_schema)
                  .format("csv")
                  .option("mode", "permissive")
                  .option("header", "true") 
                  .load("San_Francisco_Crime_2018.csv")
            )

sf_crimes2.show(5)

What about if we have a file where some records have less/more fields? When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens. In both cases, the `_corrupt_record` will contain the string value of the whole row.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DecimalType, DoubleType, BooleanType
             
sf_schema = StructType([StructField("Incident Datetime",StringType(),True),
                StructField("Incident Date",StringType(),True),
                StructField("Incident Time",StringType(),True),
                StructField("Incident Year",IntegerType(),True),
                StructField("Incident Day of Week",StringType(),True),
                StructField("Report Datetime",StringType(),True),
                StructField("Row ID",LongType(),True),
                StructField("Incident ID",IntegerType(),True),
                StructField("Incident Number",IntegerType(),True),
                StructField("CAD Number",IntegerType(),True),
                StructField("Report Type Code", StringType(),True),
                StructField("Report Type Description",StringType(),True),
                StructField("Filed Online",BooleanType(),True),
                StructField("Incident Code",IntegerType(),True),
                StructField("Incident Category",StringType(),True),
                StructField("Incident Subcategory",StringType(),True),
                StructField("Incident Description",StringType(),True),
                StructField("Resolution",StringType(),True),
                StructField("Intersection",StringType(),True),
                StructField("CNN",DecimalType(8,0),True),
                StructField("Police District",StringType(),True),
                StructField("Analysis Neighborhood",StringType(),True),
                StructField("Supervisor District",IntegerType(),True),
                StructField("Latitude",DoubleType(),True),
                StructField("Longitude",DoubleType(),True),
                StructField("point",StringType(),True),
                StructField("_corrupt_record",StringType(), True)
                ])

In [0]:
# Read the San Francisco DataFrame using permissive mode with corrupted records
sf_crimes2 = (spark.read
                  .format("csv")
                  .schema(sf_schema)
                  .option("mode", "permissive")
                  .option("header", "true")            
                  .load("San_Francisco_Crime_2018_corrupt.csv")
            )

sf_crimes2.filter(col("_corrupt_record").isNotNull()).show(truncate=False)

Let's see now the record with less columns. Spark just assigned NULL to those columns (Longitude, point).

In [0]:
sf_crimes2.filter((col("Incident Datetime")=="2018/12/31 11:15:00 PM") & (col("Report Datetime")=="2019/01/01 11:26:00 PM")).show()

Below is the record with extra columns. See how the original record contained the last column with the value of "Testing" and how it has been removed from the data that was read into the DataFrame.

In [0]:
sf_crimes2.filter(col("Incident Datetime")=="2018/12/30 11:10:00 PM").first()["_corrupt_record"]

### Failfast mode
Fails immediately upon finding a corrupted record.

Let's see what happens when we load a file with less records.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DecimalType, DoubleType, BooleanType

sf_schema = StructType([StructField("Incident Datetime",StringType(),True),
                StructField("Incident Date",StringType(),True),
                StructField("Incident Time",StringType(),True),
                StructField("Incident Year",IntegerType(),True),
                StructField("Incident Day of Week",StringType(),True),
                StructField("Report Datetime",StringType(),True),
                StructField("Row ID",LongType(),True),
                StructField("Incident ID",IntegerType(),True),
                StructField("Incident Number",IntegerType(),True),
                StructField("CAD Number",IntegerType(),True),
                StructField("Report Type Code", StringType(),True),
                StructField("Report Type Description",StringType(),True),
                StructField("Filed Online",BooleanType(),True),
                StructField("Incident Code",IntegerType(),True),
                StructField("Incident Category",StringType(),True),
                StructField("Incident Subcategory",StringType(),True),
                StructField("Incident Description",StringType(),True),
                StructField("Resolution",StringType(),True),
                StructField("Intersection",StringType(),True),
                StructField("CNN",DecimalType(8,0),True),
                StructField("Police District",StringType(),True),
                StructField("Analysis Neighborhood",StringType(),True),
                StructField("Supervisor District",IntegerType(),True),
                StructField("Latitude",DoubleType(),True),
                StructField("Longitude",DoubleType(),True),
                StructField("point",StringType(),True)
                ])

In [0]:
# Read the San Francisco DataFrame using failfast mode 
sf_crimes3 = (spark.read
                  .schema(sf_schema)
                  .format("csv")
                  .option("mode", "failfast")
                  .option("header", "true")          
                  .load("San_Francisco_Crime_2018_corrupt.csv")
            )

sf_crimes3.show()

### DropMalformed mode
Now, let's see how our two malformed records get dropped from our corrupt DataFrame. If we read our data again using the permissive mode and we compare with our original dataFrame we can see below the total number of records we should have.

In [0]:
sf_crimes2 = (spark.read
                  .schema(sf_schema)
                  .format("csv")
                  .option("mode", "permissive")
                  .option("header", "true")            
                  .load("San_Francisco_Crime_2018_corrupt.csv")
            )

In [0]:
sf_crimes2.count()

In [0]:
sf_crimes.count()

**Exercise** - DropMalformed mode

Try to read the file using the `dropMalformed` mode and check whether the new DataFrame has two less records.

In [0]:
# Use DropMalformed mode to read the San Francisco crime file
# Filename: San_Francisco_Crime_2018_corrupt.csv
# TODO

**Test your solution!**

In [0]:
# Run this cell to test your solution
rows_sf_crimes2 = sf_crimes2.count()
expected = 154530
assert str(expected) == str(rows_sf_crimes2), "{} does not equal expected {}".format(rows_sf_crimes2, expected)

print("Tests passed")

# References

- Download Spark: http://apache.osuosl.org/spark/
- San Francisco Crime Dataset: https://data.sfgov.org/Public-Safety/Police-Department-Incident-Reports-2018-to-Present/wg3w-h783
- Austin Crime Dataset: https://data.austintexas.gov/Public-Safety/Crime-Reports-2018/vmn9-3bvu
- Chicago Crime Dataset: https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
- Population dataset: https://en.wikipedia.org/wiki/List_of_United_States_cities_by_population


- PySpark Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
- Spark: The Definitive Guide. Big Data Processing Made Simple
By Matei Zaharia, Bill Chambers
Publisher: O'Reilly Media
Release Date: February 2018
- Mastering Apache Spark: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
- SP820: ETL Part 1: Data Extraction (AWS Databricks) https://academy.databricks.com/course/SP820
- SP821: ETL Part 2: Transformations and Loads (AWS Databricks) https://academy.databricks.com/course/SP821
- SP822: ETL Part 3: Production (AWS Databricks) https://academy.databricks.com/course/SP822