In [1]:
# Update environment (if needed)
# !sudo apt update

# Download and install Java
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark with Hadoop
!wget -nc -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Unzip the folder
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Install findspark library that will locate Spark on the system
!pip install -q findspark

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 78, <> line 3.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 


In [2]:
# Setting the environment variables, to enable running PySpark in Colab environment.

import os
import shutil
from itertools import islice
import requests

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [3]:
# Locate Spark in the system

import findspark
findspark.init()

In [5]:
# Import SparkSession and SparkSQL

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create spark_session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

spark.version

'3.5.0'

In [6]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


BDP Assignment 4

In [9]:
def get_gcs_data (bucket_name, folder_name, file_name, path_gdrive):
    url = 'https://storage.googleapis.com/' + bucket_name + '/' + folder_name + '/' + file_name
    r = requests.get(url)
    open(path_gdrive + '/' + file_name , 'wb').write(r.content)

In [11]:
#Copy the file into your own directory in Google Drive

bucket_name = 'msca-bdp-data-open'
folder_name = 'austin'
file_name = ['Municipal_Court_Caseload_Information.zip']
path_gdrive = '/content/drive/My Drive/Colab Notebooks/austin'

os.makedirs(path_gdrive, exist_ok=True)

for file in file_name:
    get_gcs_data (bucket_name = bucket_name,
                 folder_name = folder_name,
                 file_name = file,
                 path_gdrive = path_gdrive)
    print('Downloaded: ' + file)

Downloaded: Municipal_Court_Caseload_Information.zip


In [20]:
#Unzip the file (using Linux commands)
!unzip '/content/drive/My Drive/Colab Notebooks/austin/Municipal_Court_Caseload_Information.zip'

Archive:  /content/drive/My Drive/Colab Notebooks/austin/Municipal_Court_Caseload_Information.zip
replace Municipal_Court_Caseload_Information.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: Municipal_Court_Caseload_Information.csv  


In [24]:
#Verify the record count and filesize of your resulting file
!ls -l 'Municipal_Court_Caseload_Information.csv'

-rw-r--r-- 1 root root 877080187 Apr 16  2015 Municipal_Court_Caseload_Information.csv


In [30]:
#Read the data into Spark RDD
Municipal_Court_Caseload_Information_raw = sc.textFile("Municipal_Court_Caseload_Information.csv")
Municipal_Court_Caseload_Information_raw.count()


8485777

In [56]:
Municipal_Court_Caseload_Information_raw.take(5)

['Offense Case Type,Offense Date,Offense Time,Offense Charge Description,Offense Street Name,Offense Cross Street Check , Offense Cross Street,School Zone,Construction Zone,Case Closed',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILURE TO SIGNAL INTENT TO CHANGE LANES,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,SPEEDING-STATE HIGHWAYS,1000 BLOCK NORTH U S 183,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,NO SEAT BELT-DRIVER/PASSENGER,1000 BLOCK NORTH U S 183,N, ,N,N,Y']

In [61]:
#Ensure your process the header record correctly
#filter out header
Municipal_Court_Caseload_Information = Municipal_Court_Caseload_Information_raw.mapPartitionsWithIndex(lambda i, iter: islice(iter, 1, None) if i == 0 else iter) #filter out header
Municipal_Court_Caseload_Information.count()

8485776

Calculate frequency of offenses by Offense Case Type


In [96]:
Municipal_Court_Caseload_Information_Offense_Case_Type = Municipal_Court_Caseload_Information.map(lambda s: s.split(",")[0])

In [98]:
Municipal_Court_Caseload_Information_Offense_Case_Type.take(10)

['TR', 'TR', 'TR', 'TR', 'TR', 'PK', 'PK', 'PK', 'PK', 'PK']

In [102]:
Municipal_Court_Caseload_Information_Offense_Case_Type_1 = Municipal_Court_Caseload_Information_Offense_Case_Type.map(lambda x: (x, 1))
Municipal_Court_Caseload_Information_Offense_Case_Type_1.take(10)

[('TR', 1),
 ('TR', 1),
 ('TR', 1),
 ('TR', 1),
 ('TR', 1),
 ('PK', 1),
 ('PK', 1),
 ('PK', 1),
 ('PK', 1),
 ('PK', 1)]

In [101]:
# result Calculate frequency of offenses by Offense Case Type
Municipal_Court_Caseload_Information_Offense_Case_Type_2 = Municipal_Court_Caseload_Information_Offense_Case_Type_1.reduceByKey(lambda x,y:x+y)
Municipal_Court_Caseload_Information_Offense_Case_Type_2.take(10)

[('TR', 4313221),
 ('PK', 3388981),
 ('CM', 319078),
 ('CO', 240308),
 ('RL', 224188)]

Identify the most frequent offenses by Offense Charge Description (Show Offense Charge Description and offense frequency count in descending order)


In [103]:
Municipal_Court_Caseload_Information_Offense_Charge_Description = Municipal_Court_Caseload_Information.map(lambda s: s.split(",")[3])

In [104]:
Municipal_Court_Caseload_Information_Offense_Charge_Description.take(10)

['FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY',
 'FAILURE TO SIGNAL INTENT TO CHANGE LANES',
 'SPEEDING-STATE HIGHWAYS',
 'NO SEAT BELT-DRIVER/PASSENGER',
 'SPEEDING - STATE HIGHWAYS - Less than 10% over limit',
 'PAY STATION RECEIPT NOT DISPLAYED',
 'EXPIRED PAY STATION RECEIPT',
 'EXPIRED PAY STATION RECEIPT',
 'EXPIRED PAY STATION RECEIPT',
 'PAY STATION RECEIPT NOT DISPLAYED']

In [105]:
Municipal_Court_Caseload_Information_Offense_Charge_Description_1 = Municipal_Court_Caseload_Information_Offense_Charge_Description.map(lambda x: (x, 1))
Municipal_Court_Caseload_Information_Offense_Charge_Description_1.take(10)

[('FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY', 1),
 ('FAILURE TO SIGNAL INTENT TO CHANGE LANES', 1),
 ('SPEEDING-STATE HIGHWAYS', 1),
 ('NO SEAT BELT-DRIVER/PASSENGER', 1),
 ('SPEEDING - STATE HIGHWAYS - Less than 10% over limit', 1),
 ('PAY STATION RECEIPT NOT DISPLAYED', 1),
 ('EXPIRED PAY STATION RECEIPT', 1),
 ('EXPIRED PAY STATION RECEIPT', 1),
 ('EXPIRED PAY STATION RECEIPT', 1),
 ('PAY STATION RECEIPT NOT DISPLAYED', 1)]

In [106]:
Municipal_Court_Caseload_Information_Offense_Charge_Description_2 = Municipal_Court_Caseload_Information_Offense_Charge_Description_1.reduceByKey(lambda x,y:x+y)
Municipal_Court_Caseload_Information_Offense_Charge_Description_2.take(10)

[('BICYCLE - RAN RED LIGHT', 2397),
 ('RAN RED LIGHT', 157783),
 ('CMV - UNSAFE CONDITION-396 3 A 1', 509),
 ('PARKING - SIDEWALK AREA', 9918),
 ('CAMPING IN A PUBLIC AREA', 12144),
 ('SEATBELT - PASSENGER', 3042),
 ('ALCOHOL - SELLING/POSSESSING IN PROHIBITED AREA', 3266),
 ('PARKING - DOUBLE PARKED', 2820),
 ('MOTOR VEHICLE INSPECTION VIOLATION', 43),
 ('CROSSING PROPERTY TO TURN RIGHT OR LEFT', 6866)]

In [107]:
Municipal_Court_Caseload_Information_Offense_Charge_Description_3 = Municipal_Court_Caseload_Information_Offense_Charge_Description_2.map(lambda x:(x[1],x[0]))
Municipal_Court_Caseload_Information_Offense_Charge_Description_3.take(10)

[(2397, 'BICYCLE - RAN RED LIGHT'),
 (157783, 'RAN RED LIGHT'),
 (509, 'CMV - UNSAFE CONDITION-396 3 A 1'),
 (9918, 'PARKING - SIDEWALK AREA'),
 (12144, 'CAMPING IN A PUBLIC AREA'),
 (3042, 'SEATBELT - PASSENGER'),
 (3266, 'ALCOHOL - SELLING/POSSESSING IN PROHIBITED AREA'),
 (2820, 'PARKING - DOUBLE PARKED'),
 (43, 'MOTOR VEHICLE INSPECTION VIOLATION'),
 (6866, 'CROSSING PROPERTY TO TURN RIGHT OR LEFT')]

In [108]:
Municipal_Court_Caseload_Information_Offense_Charge_Description_4 = Municipal_Court_Caseload_Information_Offense_Charge_Description_3.sortByKey(ascending=False)
Municipal_Court_Caseload_Information_Offense_Charge_Description_4.take(10)

[(892013, 'PAY STATION RECEIPT NOT DISPLAYED'),
 (732605, 'EXPIRED PAY STATION RECEIPT'),
 (486576, 'SPEEDING-STATE HIGHWAYS'),
 (372339, 'NO DRIVERS LICENSE'),
 (345162, 'SPEEDING - POSTED CITY STREET'),
 (337672, 'FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY'),
 (310816, 'PARKING - EXPIRED METER'),
 (287570, 'SPEEDING - STATE HIGHWAY'),
 (260662, 'FAIL TO MAINTAIN FINANCIAL RESP'),
 (238168, 'TOW AWAY ZONE NO PARKING AREA')]

In [110]:
# result for Identify the most frequent offenses by Offense Charge Description (Show Offense Charge Description and offense frequency count in descending order)

Municipal_Court_Caseload_Information_Offense_Charge_Description_5 = Municipal_Court_Caseload_Information_Offense_Charge_Description_4.map(lambda x:(x[1],x[0]))
Municipal_Court_Caseload_Information_Offense_Charge_Description_5.take(10)

[('PAY STATION RECEIPT NOT DISPLAYED', 892013),
 ('EXPIRED PAY STATION RECEIPT', 732605),
 ('SPEEDING-STATE HIGHWAYS', 486576),
 ('NO DRIVERS LICENSE', 372339),
 ('SPEEDING - POSTED CITY STREET', 345162),
 ('FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY', 337672),
 ('PARKING - EXPIRED METER', 310816),
 ('SPEEDING - STATE HIGHWAY', 287570),
 ('FAIL TO MAINTAIN FINANCIAL RESP', 260662),
 ('TOW AWAY ZONE NO PARKING AREA', 238168)]