#BDP Assignment 4: Data Exploration and Preparation Using Spark RDD

## Name: Zeel Patel

In [2]:
# Update environment (if needed)
# !sudo apt update

# Download and install Java
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark with Hadoop
!wget -nc -q https://downloads.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

# Unzip the folder
!tar xf spark-3.5.3-bin-hadoop3.tgz

# Install findspark library that will locate Spark on the system
!pip install -q findspark


In [3]:
# Setting the environment variables, to enable running PySpark in Colab environment.

import os
import shutil
from itertools import islice
import requests

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [4]:
# Locate Spark in the system

import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create spark_session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

spark.version

'3.5.3'

In [6]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


### 1. Copy the file into your own directory in Google Drive

In [7]:
!wget -P /content/drive/MyDrive/Colab Notebooks/Big Data/Assignment_4/ https://storage.googleapis.com/msca-bdp-data-open/austin/Municipal_Court_Caseload_Information.zip


--2024-10-29 03:05:04--  http://notebooks/Big
Resolving notebooks (notebooks)... failed: Name or service not known.
wget: unable to resolve host address ‘notebooks’
--2024-10-29 03:05:04--  http://data/Assignment_4/
Resolving data (data)... failed: No address associated with hostname.
wget: unable to resolve host address ‘data’
--2024-10-29 03:05:04--  https://storage.googleapis.com/msca-bdp-data-open/austin/Municipal_Court_Caseload_Information.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.139.207, 173.194.212.207, 173.194.210.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.139.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 94212181 (90M) [application/x-zip-compressed]
Saving to: ‘/content/drive/MyDrive/Colab/Municipal_Court_Caseload_Information.zip’


2024-10-29 03:05:06 (45.4 MB/s) - ‘/content/drive/MyDrive/Colab/Municipal_Court_Caseload_Information.zip’ saved [94212181/94212181]

FINISHED --2024-1

In [8]:
!ls -l "/content/drive/My Drive/Colab Notebooks/Big Data/Assignment_4/"

total 92005
-rw------- 1 root root 94212181 Sep 26  2021 Municipal_Court_Caseload_Information.zip


###2. Unzip the file (using Linux commands)

In [11]:
!unzip "/content/drive/MyDrive/Colab Notebooks/Big Data/Assignment_4/Municipal_Court_Caseload_Information.zip" -d "/content/drive/MyDrive/Colab Notebooks/Big Data/Assignment_4/"

Archive:  /content/drive/MyDrive/Colab Notebooks/Big Data/Assignment_4/Municipal_Court_Caseload_Information.zip
  inflating: /content/drive/MyDrive/Colab Notebooks/Big Data/Assignment_4/Municipal_Court_Caseload_Information.csv  


In [12]:
file_path =  "/content/drive/MyDrive/Colab Notebooks/Big Data/Assignment_4/"

### 3. Read the data into Spark RDD


In [24]:
readme_raw = sc.textFile("file://" + file_path  + "/Municipal_Court_Caseload_Information.csv")

In [25]:
type(readme_raw)

### 4. Ensure your process the header record correctly

In [15]:
readme_raw.take(2)

['Offense Case Type,Offense Date,Offense Time,Offense Charge Description,Offense Street Name,Offense Cross Street Check , Offense Cross Street,School Zone,Construction Zone,Case Closed',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY,8000 BLOCK RESEARCH,N, ,N,N,Y']

In [16]:
header = readme_raw.first()
readme_raw_processed = readme_raw.filter(lambda row: row != header)

In [17]:
offense_case_type = readme_raw_processed.map(lambda s: s.split(",")[0])

In [18]:
offense_case_type.first()

'TR'

In [19]:
count = offense_case_type.map(lambda x: (x, 1))

### 5. Calculate frequency of offenses by Offense Case Type

In [20]:
count_total = count.reduceByKey(lambda x,y:x+y)
count_total.take(10)

[('TR', 4313221),
 ('PK', 3388981),
 ('CM', 319078),
 ('CO', 240308),
 ('RL', 224188)]

In [21]:
offense_charge_desc = readme_raw_processed.map(lambda s: s.split(",")[3])

### 6. Identify the most frequent offenses by Offense Charge Description (Show Offense Charge Description and offense frequency count in descending order)

In [22]:
count_desc = offense_charge_desc.map(lambda x: (x, 1))
count_desc_total = count_desc.reduceByKey(lambda x,y:x+y)
count_desc_total.take(10)

[('BICYCLE - RAN RED LIGHT', 2397),
 ('RAN RED LIGHT', 157783),
 ('CMV - UNSAFE CONDITION-396 3 A 1', 509),
 ('PARKING - SIDEWALK AREA', 9918),
 ('CAMPING IN A PUBLIC AREA', 12144),
 ('SEATBELT - PASSENGER', 3042),
 ('ALCOHOL - SELLING/POSSESSING IN PROHIBITED AREA', 3266),
 ('PARKING - DOUBLE PARKED', 2820),
 ('MOTOR VEHICLE INSPECTION VIOLATION', 43),
 ('CROSSING PROPERTY TO TURN RIGHT OR LEFT', 6866)]

In [23]:
count_key = count_desc_total.map(lambda x:(x[1],x[0]))
most_frequent_offense = count_key.sortByKey(ascending=False)
most_frequent_offense.take(10)

[(892013, 'PAY STATION RECEIPT NOT DISPLAYED'),
 (732605, 'EXPIRED PAY STATION RECEIPT'),
 (486576, 'SPEEDING-STATE HIGHWAYS'),
 (372339, 'NO DRIVERS LICENSE'),
 (345162, 'SPEEDING - POSTED CITY STREET'),
 (337672, 'FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY'),
 (310816, 'PARKING - EXPIRED METER'),
 (287570, 'SPEEDING - STATE HIGHWAY'),
 (260662, 'FAIL TO MAINTAIN FINANCIAL RESP'),
 (238168, 'TOW AWAY ZONE NO PARKING AREA')]