##**Setup - Reference: Big Data Scripts**

In [1]:
# Update environment (if needed)
# !sudo apt update

# Download and install Java
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Apache Spark 3.3.0 with Hadoop 3
!wget -nc -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

# Unzip the folder
!tar xf  spark-3.3.1-bin-hadoop3.tgz

# Install findspark library that will locate Spark on the system
!pip install -q findspark
!pip install pyspark

# Setting the environment variables, to enable running PySpark in Colab environment.
import os
import shutil
from itertools import islice
import requests

import pandas as pd

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 2.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 41.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# from pyspark.sql import functions as F
from pyspark.sql.types import *

# Create spark_session
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

spark.version

'3.3.1'

In [3]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


## **Code**

In [4]:
# Check the content of BDP folder in GDrive
!mkdir "/content/drive/My Drive/Colab Datasets/BDP/Assignment4/"
!ls "/content/drive/My Drive/Colab Datasets/BDP/Assignment4/"

mkdir: cannot create directory ‘/content/drive/My Drive/Colab Datasets/BDP/Assignment4/’: File exists
Municipal_Court_Caseload_Information.csv
Municipal_Court_Caseload_Information.zip


In [5]:
url = 'https://storage.googleapis.com/msca-bdp-data-open/austin/Municipal_Court_Caseload_Information.zip'
r = requests.get(url)
open('/content/drive/My Drive/Colab Datasets/BDP/Assignment4/Municipal_Court_Caseload_Information.zip' , 'wb').write(r.content)
print('Downloaded File')

Downloaded File


In [6]:
!unzip '/content/drive/My Drive/Colab Datasets/BDP/Assignment4/Municipal_Court_Caseload_Information.zip' -d '/content/drive/My Drive/Colab Datasets/BDP/Assignment4/'

Archive:  /content/drive/My Drive/Colab Datasets/BDP/Assignment4/Municipal_Court_Caseload_Information.zip
replace /content/drive/My Drive/Colab Datasets/BDP/Assignment4/Municipal_Court_Caseload_Information.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: yea
  inflating: /content/drive/My Drive/Colab Datasets/BDP/Assignment4/Municipal_Court_Caseload_Information.csv  


In [7]:
caseload_file = sc.textFile("file:///content/drive/My Drive/Colab Datasets/BDP/Assignment4/Municipal_Court_Caseload_Information.csv")
caseload_file.count()

8485777

In [8]:
caseload_file.take(5)

['Offense Case Type,Offense Date,Offense Time,Offense Charge Description,Offense Street Name,Offense Cross Street Check , Offense Cross Street,School Zone,Construction Zone,Case Closed',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILED TO MAINTAIN FINANCIAL RESPONSIBILITY,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,22.40.00,FAILURE TO SIGNAL INTENT TO CHANGE LANES,8000 BLOCK RESEARCH,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,SPEEDING-STATE HIGHWAYS,1000 BLOCK NORTH U S 183,N, ,N,N,Y',
 'TR,04/29/2010 07:00:00 AM +0000,20.00.00,NO SEAT BELT-DRIVER/PASSENGER,1000 BLOCK NORTH U S 183,N, ,N,N,Y']

In [12]:
%%time 
offense_case_type = caseload_file.map(lambda s: s.split(",")[0])
header = offense_case_type.first()
offense_case_type = offense_case_type.filter(lambda line: line != header)
offense_case_type = offense_case_type.map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x : x[1], ascending= False).collect()
display(pd.DataFrame(offense_case_type, columns = ['Offense Case Type', 'Count']))

Unnamed: 0,Offense Case Type,Count
0,TR,4313221
1,PK,3388981
2,CM,319078
3,CO,240308
4,RL,224188


CPU times: user 319 ms, sys: 39 ms, total: 358 ms
Wall time: 34.1 s


In [13]:
%%time
offense_charge_description = caseload_file.map(lambda s: s.split(",")[3])
header = offense_charge_description.first()
offense_charge_description = offense_charge_description.filter(lambda line: line != header)
offense_charge_description = offense_charge_description.map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x : x[1], ascending = False)
display(pd.DataFrame(offense_charge_description.take(1), columns = ['Offense Charge Description', 'Count']))

Unnamed: 0,Offense Charge Description,Count
0,PAY STATION RECEIPT NOT DISPLAYED,892013


CPU times: user 320 ms, sys: 47.7 ms, total: 368 ms
Wall time: 35.2 s
