# 02807 - Week 8 Exercises:  Getting started with Spark


# Learning objectives:

* Getting hands-on experience manipulating DataFrames with built-in Spark functions
* Distinguishing actions and transformations
* Writing your own UDFs


# Readings:


* [*Learning Spark*, Chapters 1-3](https://pages.databricks.com/rs/094-YMS-629/images/LearningSpark2.0.pdf). A nicely structured and detailed introduction to Spark.
* [A Neanderthal’s Guide to Apache Spark in Python](https://towardsdatascience.com/a-neanderthals-guide-to-apache-spark-in-python-9ef1f156d427). A brief, fun and gentle introduction to Spark for complete beginners.


# Setup

You'll need to get pyspark and make some imports. The following cells will get you started.

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 55kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 34.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=bf98ad098c64b8ae132f03214b29ffe3741f48fa5ba50e4d3840017d1d5997ac
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()
import os
import urllib.request
tit=urllib.request.urlretrieve('https://raw.githubusercontent.com/plotly/datasets/master/titanic.csv','/content/sample_data/titanic.csv')

!ls '/content/sample_data'

anscombe.json		      mnist_test.csv	     titanic.csv
california_housing_test.csv   mnist_train_small.csv
california_housing_train.csv  README.md


# Exercise 1: age bracketing for the Titanic Dataset

In this exercise you should use Spark to count the number of Titanic passengers in different age brackets. More specifically, you need to count the number of people age 0 to 9, 10 to 19, and so on.


## Loading the data

Load the Titanic data used in the lecture slides into a Spark dataframe (use schema inference).

In [None]:
# Your code goes here
df = spark.read.option('header', True).option('inferSchema', True).csv('/content/sample_data/titanic.csv')
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## Cleaning the data

Remove the rows that do not have an age 


In [None]:
# Your code goes here
filtered_df = df.filter(col('Age').isNotNull())
filtered_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      

## Adding age brackets 

Create a new column with a value that identifies the bracket that passengers are in

In [None]:
# Your code goes here
filtered_df = filtered_df.withColumn('bracket', (col('Age') / 10).cast('integer')*10)
filtered_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|bracket|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     20|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     30|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|     20|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|     30|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|     30|
|          7|   

## Age bracket counts

Create a Spark dataframe with the sum of passengers in each bracket

In [None]:
# Your code goes here
counts = filtered_df.groupby('bracket').agg(count('PassengerId').alias('Number')).show() 

+-------+------+
|bracket|Number|
+-------+------+
|     20|    47|
|     40|    11|
|     10|    22|
|     50|     8|
|     70|     2|
|     60|     2|
|     30|    23|
|      0|    11|
+-------+------+



# Exercise 2: understanding actions and transformations

For each of the following Spark operations, decide if they are transformations or actions. If they are transformations, determine if they are wide or narrow.

* ``select()``
* `groupBy()`
* `filter()`
* `where()`
* `count()`
* `show()`
* `agg()`
* `write()`

*Your answers go here*

select(): transformation, narrow

groupBy(): transformation, wide

filter(): transformation, narrow

where(): transformation, narrow

count(): action

show(): action

agg(): transformation, wide

write(): action


# Exercise 3: exploratory data analysis for the Chicago crime dataset


The Chicago Crime dataset contains a summary of the reported crimes occurred in the City of Chicago from 2001 to 2017. 

It is a fairly large dataset. You'll work with a sample of it. Execute the following cells to load it into a dataframe.



In [None]:
# to get the full dataset, run: !wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!wget https://ibm.box.com/shared/static/svflyugsr9zbqy5bmowgswqemfpm1x7f.csv

--2020-10-24 15:12:29--  https://ibm.box.com/shared/static/svflyugsr9zbqy5bmowgswqemfpm1x7f.csv
Resolving ibm.box.com (ibm.box.com)... 107.152.24.197
Connecting to ibm.box.com (ibm.box.com)|107.152.24.197|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /public/static/svflyugsr9zbqy5bmowgswqemfpm1x7f.csv [following]
--2020-10-24 15:12:29--  https://ibm.box.com/public/static/svflyugsr9zbqy5bmowgswqemfpm1x7f.csv
Reusing existing connection to ibm.box.com:443.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://ibm.ent.box.com/public/static/svflyugsr9zbqy5bmowgswqemfpm1x7f.csv [following]
--2020-10-24 15:12:29--  https://ibm.ent.box.com/public/static/svflyugsr9zbqy5bmowgswqemfpm1x7f.csv
Resolving ibm.ent.box.com (ibm.ent.box.com)... 107.152.24.201
Connecting to ibm.ent.box.com (ibm.ent.box.com)|107.152.24.201|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://public.boxcloud.com/d/1/b

In [None]:
!ls -1
!mv svflyugsr9zbqy5bmowgswqemfpm1x7f.csv reported-crimes.csv
!ls -l

sample_data
svflyugsr9zbqy5bmowgswqemfpm1x7f.csv
total 128
-rw-r--r-- 1 root root 124158 Oct 24 15:12 reported-crimes.csv
drwxr-xr-x 1 root root   4096 Oct 24 14:51 sample_data


In [None]:
rc = spark.read.csv('reported-crimes.csv',header=True)
rc.show()
rc.printSchema()

+--------+-----------+--------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+---------------------+-------+------------+------------+----+--------------------+-----------+------------+--------------------+
|      ID|CASE_NUMBER|                DATE|               BLOCK|IUCR|PRIMARY_TYPE|         DESCRIPTION|LOCATION_DESCRIPTION|ARREST|DOMESTIC|BEAT|DISTRICT|WARD|COMMUNITY_AREA_NUMBER|FBICODE|X_COORDINATE|Y_COORDINATE|YEAR|           UPDATEDON|   LATITUDE|   LONGITUDE|            LOCATION|
+--------+-----------+--------------------+--------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+---------------------+-------+------------+------------+----+--------------------+-----------+------------+--------------------+
| 3512276|   HK587712|08/28/2004 05:50:...|  047XX S KEDZIE AVE| 890|       THEFT|       FROM BUILDING|  SMALL RETAIL STORE| FALSE|   FA

Let's do some EDA. Answer the following questions.

**What percentage of reported crimes resulted in an arrest?**

In [None]:
# Your code goes here
print('{}% reported crimes resulted in an arrest. '.format('%.2f'%(100*rc.filter(col('ARREST')==True).count()/rc.count())))

30.58% reported crimes resulted in an arrest. 


**What are the top 3 locations for reported crimes?**

In [None]:
# Your code goes here
rc.groupBy('LOCATION_DESCRIPTION').agg(count('ID').alias('count')).orderBy(desc("count")).show(3)

+--------------------+-----+
|LOCATION_DESCRIPTION|count|
+--------------------+-----+
|              STREET|  136|
|           RESIDENCE|   84|
|            SIDEWALK|   64|
+--------------------+-----+
only showing top 3 rows



**What is the most common primary type of crime in district 022?**


In [None]:
# Your code goes here
rc.filter(col('DISTRICT') == '22').groupBy('PRIMARY_TYPE').agg(count('ID').alias('count')).orderBy(desc("count")).show(1)

+------------+-----+
|PRIMARY_TYPE|count|
+------------+-----+
|       THEFT|    6|
+------------+-----+
only showing top 1 row

