1.Copy the file into your own directory in Google Drive

In [44]:
def get_gcs_data (bucket_name, folder_name, file_name, path_gdrive):
    url = 'https://storage.googleapis.com/' + bucket_name + '/' + folder_name + '/' + file_name
    r = requests.get(url)
    open(path_gdrive + '/' + file_name , 'wb').write(r.content)

In [45]:
# Load the Drive helper and mount
from google.colab import drive

In [46]:
# This will prompt for authorization.
drive.mount('/content/drive',force_remount = 'TRUE')

Mounted at /content/drive


In [47]:
import os
import shutil
from itertools import islice
import requests

In [48]:
bucket_name = 'msca-bdp-data-open'
folder_name = 'austin'
file_name = ['Municipal_Court_Caseload_Information.zip']
path_gdrive = '/content/drive/My Drive/Colab Datasets/BDP/austin'

os.makedirs(path_gdrive, exist_ok=True)

for file in file_name:
    get_gcs_data (bucket_name = bucket_name,
                 folder_name = folder_name,
                 file_name = file,
                 path_gdrive = path_gdrive)
    print('Downloaded: ' + file)

Downloaded: Municipal_Court_Caseload_Information.zip


In [49]:
!ls "/content/drive/My Drive/Colab Datasets/BDP"

 airlines  'Copy of BDP_Class_04.1_Colab_Spark_RDD_Struct_v7.ipynb'   gutenberg
 austin    'Copy of BDP_Class_04.2_Colab_Spark_RDD_Text_v4.ipynb'     insurance


2.Unzip the file

In [50]:
import zipfile
with zipfile.ZipFile("/content/drive/My Drive/Colab Datasets/BDP/austin/Municipal_Court_Caseload_Information.zip","r") as zip_ref:
    zip_ref.extractall("/content/drive/My Drive/Colab Datasets/BDP/austin")

In [51]:
# Check the content of BDP folder in GDrive
!ls -l '/content/drive/My Drive/Colab Datasets/BDP/austin'

total 948529
-rw------- 1 root root 877080187 Oct 27 20:07 Municipal_Court_Caseload_Information.csv
-rw------- 1 root root  94212181 Oct 27 20:07 Municipal_Court_Caseload_Information.zip


3.Read the data into Spark RDD

In [52]:
# Update environment (if needed)
# !sudo apt update

# Download and install Java
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark 3.3.2 with Hadoop 3
!wget -nc -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

# Unzip the folder
!tar xf  spark-3.3.1-bin-hadoop3.tgz

# Install findspark library that will locate Spark on the system
!pip install -q findspark

In [53]:
# Setting the environment variables, to enable running PySpark in Colab environment.
import os
import shutil
from itertools import islice
import requests

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [54]:
# Locate Spark in the system

import findspark

In [55]:
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [56]:
findspark.init()

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create spark_session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

spark.version

'3.3.1'

Read two CSV files from Google Drive and count lines. 

In [58]:
%%time

readme_raw = sc.textFile("file://" + path_gdrive + "/Municipal_Court_Caseload_Information.csv")
readme_raw.count()

CPU times: user 180 ms, sys: 21.6 ms, total: 202 ms
Wall time: 22.9 s


8485777

In [59]:
readme_raw.take(3)

['Offense Case Type,Offense Date,Offense Time,Offense Charge Description,Offense Street Name,Offense Cross Street Check , Offense Cross Street,School Zone,Construction Zone,Case Closed',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILURE TO SIGNAL INTENT TO CHANGE LANES,8000 BLOCK RESEARCH,N, ,N,N,Y']

4.Ensure your process the header record correctly

In [60]:
readme_raw = sc.textFile("file://" + path_gdrive + "/Municipal_Court_Caseload_Information.csv")
readme = readme_raw.mapPartitionsWithIndex(lambda i, iter: islice(iter, 1, None) if i == 0 else iter) #filter out header
readme.count()

8485776

In [61]:
readme.take(5)

['TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILURE TO SIGNAL INTENT TO CHANGE LANES,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,SPEEDING-STATE HIGHWAYS,1000 BLOCK NORTH U S 183,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,NO SEAT BELT-DRIVER/PASSENGER,1000 BLOCK NORTH U S 183,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,23.45.00,SPEEDING - STATE HIGHWAYS - Less than 10% over limit,1000 BLOCK RESEARCH,N, ,N,N,Y']

5.Calculate frequency of offenses by Offense Case Type

In [62]:
Case_Type = readme.map(lambda s: s.split(",")[0]) 
Case_Type_Frequency = Case_Type.map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y)
Case_Type_Frequency.take(12)

[('TR', 4313221),
 ('PK', 3388981),
 ('CM', 319078),
 ('CO', 240308),
 ('RL', 224188)]

6.Identify the most frequent offenses by Offense Charge Description 

In [63]:
Offense_Description = readme.map(lambda s: s.split(",")[3]) 

Frequency = Offense_Description.map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y)
reverse_freq = Frequency.map(lambda x:(x[1],x[0]))
sorted_freq = reverse_freq.sortByKey(ascending=False)
sorted_freq.take(12)

[(892013, 'PAY STATION RECEIPT NOT DISPLAYED'),
 (732605, 'EXPIRED PAY STATION RECEIPT'),
 (486576, 'SPEEDING-STATE HIGHWAYS'),
 (372339, 'NO DRIVERS LICENSE'),
 (345162, 'SPEEDING - POSTED CITY STREET'),
 (337672, 'FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY'),
 (310816, 'PARKING - EXPIRED METER'),
 (287570, 'SPEEDING - STATE HIGHWAY'),
 (260662, 'FAIL TO MAINTAIN FINANCIAL RESP'),
 (238168, 'TOW AWAY ZONE NO PARKING AREA')]

Therefore, the most frequent offenses by Offense Charge Description is 'PAY STATION RECEIPT NOT DISPLAYED' with a frquency of 892013

In [63]:
%%shell
jupyter nbconvert --to html /content/BDP_Assignment4_Richard_Yang.ipynb