<a href="https://colab.research.google.com/github/ranjuramesh/datasets/blob/master/pyspark_tutorial_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###Check the content of the current working folder
Check the content of cwd to verify the state

In [1]:
!ls

sample_data


## Setup the Environment
Download and Install JDK and Spark

In [2]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connecting to cloud                                                                               Hit:2 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
0% [Waiting for headers] [1 InRelease 73.5 kB/88.7 kB 83%] [Connecting to cloud                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:5 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease [15.4 kB]
0% [4 InRelease 15.6 kB/88.7 kB 18%] [1 InRelease 80.8 kB/88.7 kB 91%] [Connect0% [2 InRelease gpgv 21.3 kB] [4 InRelease 

Configure Spark Environment

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

##Data Processing
###Using Chicago's Reported Crime Data
Download and Preprocessing Data

In [4]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!ls -l

--2020-08-13 05:12:52--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.140.205, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [            <=>     ]   1.57G  2.87MB/s    in 8m 42s  

2020-08-13 05:21:34 (3.09 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1691104875]

total 1872076
-rw-r--r--  1 root root 1691104875 Aug 12 11:12 'rows.csv?accessType=DOWNLOAD'
drwxr-xr-x  1 root root       4096 Jul 30 16:30  sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018  spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018  spark-2.3.1-bin-hadoop2.7.tgz


Renaming file to reported-crimes.csv

In [5]:
!mv 'rows.csv?accessType=DOWNLOAD' reported-crimes.csv
!ls -l

total 1872076
-rw-r--r--  1 root root 1691104875 Aug 12 11:12 reported-crimes.csv
drwxr-xr-x  1 root root       4096 Jul 30 16:30 sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018 spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018 spark-2.3.1-bin-hadoop2.7.tgz


Filter Data till 12-Nov-2018

In [6]:
from pyspark.sql.functions import to_timestamp, col, lit
rc = spark.read.csv("reported-crimes.csv", header=True).withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a")).filter(col("Date")<lit("2018-11-12"))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11| 

##Data Exploration
Checking the schema of the dataset

In [7]:
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [8]:
#print(rc.describe())

In [9]:
#print(rc.summary())

##Working with Columns


**Display only the first 5 rows of the column name IUCR**

In [10]:
rc.select('IUCR').show(5)

+----+
|IUCR|
+----+
|1153|
|0281|
|0620|
|0810|
|0281|
+----+
only showing top 5 rows



In [11]:
rc.select(rc.IUCR).show(5)

+----+
|IUCR|
+----+
|1153|
|0281|
|0620|
|0810|
|0281|
+----+
only showing top 5 rows



In [12]:
rc.select(col("IUCR")).show(5)

+----+
|IUCR|
+----+
|1153|
|0281|
|0620|
|0810|
|0281|
+----+
only showing top 5 rows



**Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [13]:
rc.select(col("Case Number"), col("Date"), col("Arrest")).show(4)

+-----------+-------------------+------+
|Case Number|               Date|Arrest|
+-----------+-------------------+------+
|   JA366925|2001-01-01 11:00:00| false|
|   JB147188|2017-10-08 03:00:00| false|
|   JB147595|2017-03-28 14:00:00| false|
|   JB147230|2017-09-09 20:17:00| false|
+-----------+-------------------+------+
only showing top 4 rows



**Add a column with name One, with entries all 1**

In [14]:
from pyspark.sql.functions import lit
rc.withColumn("One", lit(1)).show(5) 

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|One|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            4

**Remove the column IUCR**

In [15]:
rc.drop(col("IUCR"))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11| 

##Working with Rows

**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset**

In [16]:
one_day = spark.read.csv('reported-crimes.csv', header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col("Date")==lit("2018-11-12"))
one_day.count()

3

In [17]:
rc.union(one_day).orderBy(col('Date'),ascending=False).show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11505149|   JB513151|2018-11-12 00:00:00|  003XX S WHIPPLE ST|0810|             THEFT|           OVER $500|              STREET| fal

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [18]:
rc.groupBy(col('Primary Type')).count().orderBy('count', ascending=False).show(10)

+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1418573|
|            BATTERY|1232361|
|    CRIMINAL DAMAGE| 771570|
|          NARCOTICS| 711735|
|      OTHER OFFENSE| 418890|
|            ASSAULT| 418557|
|           BURGLARY| 388054|
|MOTOR VEHICLE THEFT| 314158|
| DECEPTIVE PRACTICE| 265860|
|            ROBBERY| 255627|
+-------------------+-------+
only showing top 10 rows



###Challenge 1

**What percentage of reported crimes that resulted in an arrest?**

In [19]:
rc.filter(col('Arrest')==lit('true')).count() / rc.select(col('Arrest')).count() 

0.27753668841480433

**What are the top 3 locations for reported crimes?**

In [20]:
rc.filter(col('Location') != lit('null')).groupBy(col('Location Description')).count().orderBy('count', ascending=False).show(3)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|1757173|
|           RESIDENCE|1127010|
|           APARTMENT| 692010|
+--------------------+-------+
only showing top 3 rows



##Built-In Functions

**What is the min and max of Date column?**

In [21]:
from pyspark.sql.functions import min, max
rc.select(min(col('Date')), max(col('Date'))).show()

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2001-01-01 00:00:00|2018-11-11 23:50:00|
+-------------------+-------------------+



**What is the 3 days earlier than max datae and 3 days later than min date?**

In [22]:
from pyspark.sql.functions import date_add, date_sub
rc.select(date_sub(max(col("Date")), 3), date_add(min(col('Date')),3)).show(1)

+----------------------+----------------------+
|date_sub(max(Date), 3)|date_add(min(Date), 3)|
+----------------------+----------------------+
|            2018-11-08|            2001-01-04|
+----------------------+----------------------+



##Joins

In [23]:
!wget -O police-station.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
!ls -l

--2020-08-13 05:31:22--  https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.140.205, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘police-station.csv’

police-station.csv      [<=>                 ]       0  --.-KB/s               police-station.csv      [ <=>                ]   5.57K  --.-KB/s    in 0s      

2020-08-13 05:31:22 (555 MB/s) - ‘police-station.csv’ saved [5699]

total 1872088
-rw-r--r--  1 root root       5699 Aug 19  2019 police-station.csv
-rw-r--r--  1 root root 1691104875 Aug 12 11:12 reported-crimes.csv
drwxr-xr-x  1 root root       4096 Jul 30 16:30 sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018 spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018 spark-2.3.1-bin-hadoo

Police Station Data Analysis

In [33]:
ps = spark.read.csv('police-station.csv', header=True)
ps.show(5)

+------------+--------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+
|    DISTRICT| DISTRICT NAME|             ADDRESS|   CITY|STATE|  ZIP|             WEBSITE|       PHONE|         FAX|         TTY|X COORDINATE|Y COORDINATE|   LATITUDE|   LONGITUDE|            LOCATION|
+------------+--------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+
|Headquarters|  Headquarters| 3510 S Michigan Ave|Chicago|   IL|60653|http://home.chica...|        null|        null|        null| 1177731.401| 1881697.404|41.83070169|-87.62339535|(41.8307016873, -...|
|           1|       Central|     1718 S State St|Chicago|   IL|60616|http://home.chica...|312-745-4290|312-745-3694|312-745-3693| 1176569.052| 1891771.704|41.85837259|-87.62735617|(41.858

In [25]:
rc.cache()
rc.count()

6753835

In [34]:
ps.select(col('District')).distinct().show()

+------------+
|    District|
+------------+
|           7|
|          15|
|          11|
|           3|
|           8|
|          22|
|          16|
|           5|
|          18|
|          17|
|           6|
|          19|
|          25|
|Headquarters|
|          24|
|           9|
|           1|
|          20|
|          10|
|           4|
+------------+
only showing top 20 rows



In [35]:
from pyspark.sql.functions import lpad
ps = ps.withColumn('District_F', lpad(col('District'), 3, '0'))
ps.show()

+--------------------+-----------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+----------+
|            DISTRICT|    DISTRICT NAME|             ADDRESS|   CITY|STATE|  ZIP|             WEBSITE|       PHONE|         FAX|         TTY|X COORDINATE|Y COORDINATE|   LATITUDE|   LONGITUDE|            LOCATION|District_F|
+--------------------+-----------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+----------+
|        Headquarters|     Headquarters| 3510 S Michigan Ave|Chicago|   IL|60653|http://home.chica...|        null|        null|        null| 1177731.401| 1881697.404|41.83070169|-87.62339535|(41.8307016873, -...|       Hea|
|                   1|          Central|     1718 S State St|Chicago|   IL|60616|http://home.chica..

In [38]:
rc.join(ps, rc.District == ps.District_F, "left_outer").show()

+--------+-----------+-------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+--------+--------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+----------+
|      ID|Case Number|               Date|               Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|DISTRICT| DISTRICT NAME|             ADDRESS|   CITY|STATE|  ZIP|             WEBSITE|       PHONE|         FAX|         TTY|X COORDINATE|Y COORDINATE|   LATITUDE|   LONGITUDE|            LOCATION|District_F|
+--------+-----------+-------------------+

##Exercise
**Find the most frequently reported noncriminal activity**

In [39]:
rc.select(col('Primary Type')).distinct().count()

36

In [44]:
rc.select(col('Primary Type')).orderBy(col('Primary Type'), ascending=True).distinct().show(36)

+--------------------+
|        Primary Type|
+--------------------+
|               ARSON|
|             ASSAULT|
|             BATTERY|
|            BURGLARY|
|CONCEALED CARRY L...|
| CRIM SEXUAL ASSAULT|
|     CRIMINAL DAMAGE|
|CRIMINAL SEXUAL A...|
|   CRIMINAL TRESPASS|
|  DECEPTIVE PRACTICE|
|   DOMESTIC VIOLENCE|
|            GAMBLING|
|            HOMICIDE|
|   HUMAN TRAFFICKING|
|INTERFERENCE WITH...|
|        INTIMIDATION|
|          KIDNAPPING|
|LIQUOR LAW VIOLATION|
| MOTOR VEHICLE THEFT|
|           NARCOTICS|
|      NON - CRIMINAL|
|        NON-CRIMINAL|
|NON-CRIMINAL (SUB...|
|           OBSCENITY|
|OFFENSE INVOLVING...|
|OTHER NARCOTIC VI...|
|       OTHER OFFENSE|
|        PROSTITUTION|
|    PUBLIC INDECENCY|
|PUBLIC PEACE VIOL...|
|           RITUALISM|
|             ROBBERY|
|         SEX OFFENSE|
|            STALKING|
|               THEFT|
|   WEAPONS VIOLATION|
+--------------------+



In [54]:
rc.filter(col("Primary Type").like("NON%")).groupBy(col('Description')).count().orderBy('count',ascending=False).show(1)

+-------------+-----+
|  Description|count|
+-------------+-----+
|LOST PASSPORT|  107|
+-------------+-----+
only showing top 1 row



**Find the day of the week with the most reported crime**

In [57]:
from pyspark.sql.functions import date_format
rc.groupBy(date_format(col('Date'), 'E')).count().orderBy('count',ascending=False).show(1)

+--------------------+-------+
|date_format(Date, E)|  count|
+--------------------+-------+
|                 Fri|1016946|
+--------------------+-------+
only showing top 1 row

