<a href="https://colab.research.google.com/github/zhizhuoli1/Big-Data/blob/main/Spark_RDD_and_Hadoop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark RDD and Hadoop - Zhizhuo Li

## 1. Set Up the environment

In [1]:
# Update environment (if needed)
# !sudo apt update

# Download and install Java
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark 3.0.3 with Hadoop 3.2
!wget -nc -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz

# Unzip the folder
!tar xf  spark-3.0.3-bin-hadoop3.2.tgz

# Install findspark library that will locate Spark on the system
!pip install -q findspark

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 2.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 


In [2]:
# Setting the environment variables, to enable running PySpark in Colab environment.

import os
import shutil
from itertools import islice
import requests

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

In [3]:
# Locate Spark in the system

import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create spark_session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

spark.version

# Create spark_session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

spark.version

'3.0.3'

## 2. Copy the file into your own directory in Google Drive

In [5]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
#define self helper function to import the file
def get_gcs_data (bucket_name, folder_name, file_name, path_gdrive):
    url = 'https://storage.googleapis.com/' + bucket_name + '/' + folder_name + '/' + file_name
    r = requests.get(url)
    open(path_gdrive + '/' + file_name , 'wb').write(r.content)

In [7]:
#import the data
bucket_name = 'msca-bdp-data-open'
folder_name = 'austin'
file_name = ['Municipal_Court_Caseload_Information.zip']
path_gdrive = '/content/drive/My Drive/BigData/'
os.makedirs(path_gdrive, exist_ok=True)

for file in file_name:
    get_gcs_data (bucket_name = bucket_name,
                 folder_name = folder_name,
                 file_name = file,
                 path_gdrive = path_gdrive)
    print('Downloaded: ' + file)

Downloaded: Municipal_Court_Caseload_Information.zip


In [8]:
#list all files in the folder
!ls -l '/content/drive/My Drive/BigData/'

total 92327
drwx------ 2 root root     4096 Oct 23 01:27  age_date_income
drwx------ 2 root root     4096 Oct 22 23:25  airbnb
drwx------ 2 root root     4096 Oct 22 23:18  airlines
drwx------ 2 root root     4096 Oct 27 17:05 'Assignment 4'
drwx------ 2 root root     4096 Oct 31 18:19 'Assignment 6'
drwx------ 2 root root     4096 Nov  8 17:04 'Assignment 8'
-rw------- 1 root root    61898 Oct 23 01:28  BDP_Class_04.1_Colab_Spark_RDD_Struct_v5.ipynb
-rw------- 1 root root    36292 Oct 23 01:41  BDP_Class_04.2_Colab_Spark_RDD_Text_v3.ipynb
-rw------- 1 root root   194307 Nov  6 01:57  BDP_Class_05.1_Colab_Spark_DF_v2.ipynb
drwx------ 2 root root     4096 Oct 22 23:25  books
drwx------ 2 root root     4096 Oct 23 01:05  insurance
drwx------ 2 root root     4096 Oct 30 01:11  Insurance
-rw------- 1 root root      204 Oct 29 23:18  Insurance.csv
-rw------- 1 root root 94212181 Nov 12 21:01  Municipal_Court_Caseload_Information.zip


In [10]:
#unzip and inflate the zip file
!unzip '/content/drive/My Drive/BigData/Municipal_Court_Caseload_Information.zip' -d '/content/drive/My Drive/BigData/'

Archive:  /content/drive/My Drive/BigData/Municipal_Court_Caseload_Information.zip
  inflating: /content/drive/My Drive/BigData/Municipal_Court_Caseload_Information.csv  


## 3. Read the data into Spark RDD

In [11]:
#read the data into spark RDD
mcci_raw = sc.textFile('/content/drive/My Drive/BigData/Assignment 4/Municipal_Court_Caseload_Information.csv')
#ensure your process the header record correctly
mcci = mcci_raw.mapPartitionsWithIndex(lambda i, iter: islice(iter, 1, None) if i == 0 else iter)
mcci.count()

8485776

In [13]:
mcci.take(10)

['TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILURE TO SIGNAL INTENT TO CHANGE LANES,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,SPEEDING-STATE HIGHWAYS,1000 BLOCK NORTH U S 183,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,NO SEAT BELT-DRIVER/PASSENGER,1000 BLOCK NORTH U S 183,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,23.45.00,SPEEDING - STATE HIGHWAYS - Less than 10% over limit,1000 BLOCK RESEARCH,N, ,N,N,Y',
 'PK,05/05/2010 07:00:00 AM +0000,15.28.00,PAY STATION RECEIPT NOT DISPLAYED,700 10TH ST W,N, ,N,N,Y',
 'PK,05/05/2010 07:00:00 AM +0000,15.23.00,EXPIRED PAY STATION RECEIPT,800 RIO GRANDE ST,N, ,N,N,Y',
 'PK,05/05/2010 07:00:00 AM +0000,15.22.00,EXPIRED PAY STATION RECEIPT,800 RIO GRANDE ST,N, ,N,N,Y',
 'PK,05/05/2010 07:00:00 AM +0000,15.18.00,EXPIRED PAY STATION RECEIPT,900 RIO GRANDE ST,N, ,N,N,Y',
 'PK,05/05/2010 0

## 4. Calculate frequency of offenses by Offense Case Type

In [14]:
#take the offense Case Type Variable
OffenseCaseType = mcci.map(lambda s: s.split(",")[0]) 

In [15]:
OffenseCaseType.take(5)

['TR', 'TR', 'TR', 'TR', 'TR']

In [16]:
#map 1 to each row for offense case type
OCT = OffenseCaseType.map(lambda x: (x, 1))

In [17]:
OCT.take(5)

[('TR', 1), ('TR', 1), ('TR', 1), ('TR', 1), ('TR', 1)]

In [18]:
#reduce the key to see the frequency for each Offense Case Type
frequency_type = OCT.reduceByKey(lambda x,y:x+y)
#use collect() to show all results since we know offense case type is finite - we should use take() if possible hugh number of categories 
frequency_type.take(10)

[('TR', 4313221),
 ('PK', 3388981),
 ('CM', 319078),
 ('CO', 240308),
 ('RL', 224188)]

## 5. Identify the most frequent offenses by Offense Charge Description (Show Offense Charge Description and offense frequency count in descending order)

In [19]:
#take the offense Charge Description Variable
OffenseDescrip = mcci.map(lambda s: s.split(",")[3]) 

In [20]:
OffenseDescrip.take(5)

['FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY',
 'FAILURE TO SIGNAL INTENT TO CHANGE LANES',
 'SPEEDING-STATE HIGHWAYS',
 'NO SEAT BELT-DRIVER/PASSENGER',
 'SPEEDING - STATE HIGHWAYS - Less than 10% over limit']

In [21]:
#map 1 to each row for offense charge description
OD = OffenseDescrip.map(lambda x: (x, 1))

In [22]:
OD.take(5)

[('FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY', 1),
 ('FAILURE TO SIGNAL INTENT TO CHANGE LANES', 1),
 ('SPEEDING-STATE HIGHWAYS', 1),
 ('NO SEAT BELT-DRIVER/PASSENGER', 1),
 ('SPEEDING - STATE HIGHWAYS - Less than 10% over limit', 1)]

In [23]:
#reduce the key to see the frequency for each Offense Case Type
frequency_description = OD.reduceByKey(lambda x,y:x+y)
frequency_description.take(5)

[('BICYCLE - RAN RED LIGHT', 2397),
 ('RAN RED LIGHT', 157783),
 ('CMV - UNSAFE CONDITION-396 3 A 1', 509),
 ('PARKING - SIDEWALK AREA', 9918),
 ('CAMPING IN A PUBLIC AREA', 12144)]

In [24]:
#switch the position of description and frequency
frequency_description2 = frequency_description.map(lambda x:(x[1],x[0]))
frequency_description2.take(5)

[(2397, 'BICYCLE - RAN RED LIGHT'),
 (157783, 'RAN RED LIGHT'),
 (509, 'CMV - UNSAFE CONDITION-396 3 A 1'),
 (9918, 'PARKING - SIDEWALK AREA'),
 (12144, 'CAMPING IN A PUBLIC AREA')]

In [25]:
#sort the input RDD by the key value
frequency_description_sort = frequency_description2.sortByKey(ascending=False)
frequency_description_sort.take(5)

[(892013, 'PAY STATION RECEIPT NOT DISPLAYED'),
 (732605, 'EXPIRED PAY STATION RECEIPT'),
 (486576, 'SPEEDING-STATE HIGHWAYS'),
 (372339, 'NO DRIVERS LICENSE'),
 (345162, 'SPEEDING - POSTED CITY STREET')]

The most frequent offenses by Offense Charge Description is 'PAY STATION RECEIPT NOT DISPLAYED' with frequency of 892013.

## 6. Release the memory

In [26]:
OffenseCaseType.unpersist()
OffenseDescrip.unpersist()

PythonRDD[31] at RDD at PythonRDD.scala:53