<a href="https://colab.research.google.com/github/supreet2/ml-100k-pig/blob/main/Spark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Setup

In [None]:
!ls -l

In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

In [None]:
# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
spark

#Get data

In [None]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD


--2024-02-10 05:13:53--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.140.205, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [  <=>               ]   1.76G  3.02MB/s    in 10m 24s 

2024-02-10 05:24:17 (2.89 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1887728756]



In [None]:
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv

In [None]:
!ls -l

total 2068368
-rw-r--r--  1 root root     288545 Feb  9 11:53 crimes.csv
-rw-r--r--  1 root root     955222 Feb  9 11:53 crimes.json
-rw-r--r--  1 root root     288651 Feb  9 11:53 ijzp-q8t2.csv
-rw-r--r--  1 root root 1887728756 Feb  9 11:53 reported-crimes.csv
drwxr-xr-x  1 root root       4096 Feb  8 14:21 sample_data
drwxr-xr-x 13 1000 1000       4096 Feb 22  2021 spark-3.1.1-bin-hadoop3.2
-rw-r--r--  1 root root  228721937 Feb 22  2021 spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True,inferSchema=True)\
  .withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a'))\
  .filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|             Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
| 5741943|   HN549294|2007-08-25 09:22:18|074XX N ROGERS AVE|0560|             ASSAULT|              SIMPLE|               OTHER| false|   false|2422|      24|  49|             1|     08A|     

In [None]:
rc.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [None]:
c = rc.count()
print(c)
d = rc.select("ID").distinct().count()
print(d)

6756906
6756906


#working with dataframes

In [None]:
rc.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

## creating a schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType,TimestampType,BooleanType,DoubleType,IntegerType

In [None]:
user_defined_schema = StructType([
    StructField('ID', StringType(), True),
    StructField('Case Number',StringType(), True),
    StructField('Date', TimestampType(), True),
    StructField('Block', StringType(), True),
    StructField('IUCR', StringType(), True),
    StructField('PrimaryType', StringType(), True),
    StructField('Description', StringType(), True),
    StructField('LocationDescription', StringType(), True),
    StructField('Arrest', BooleanType(), True),
    StructField('Domestic', BooleanType(), True),
    StructField('Beat', IntegerType(), True),
    StructField('District', IntegerType(), True),
    StructField('Ward', IntegerType(), True),
    StructField('CommunityArea', IntegerType(), True),
    StructField('FBICode', StringType(), True),
    StructField('XCoordinate', IntegerType(), True),
    StructField('YCoordinate', IntegerType(), True),
    StructField('Year', IntegerType(), True),
    StructField('UpdatedOn', StringType(), True),
    StructField('Latitude', DoubleType(), True),
    StructField('Longitude', DoubleType(), True),
    StructField('Location', StringType(), True)
])

In [None]:
rc = spark.read.csv('reported-crimes.csv',header=True,schema = user_defined_schema)
rc.show(5)

##infering the schema

In [None]:
 rc = spark.read.csv('reported-crimes.csv',header=True,inferSchema=True)\
  .withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a'))\
  .filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|             Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
| 5741943|   HN549294|2007-08-25 09:22:18|074XX N ROGERS AVE|0560|             ASSAULT|              SIMPLE|               OTHER| false|   false|2422|      24|  49|             1|     08A|     

#Working with Columns


In [None]:
rc.select('IUCR').show(5)  #displaying the 5 rows of iucr column

+----+
|IUCR|
+----+
|0560|
|0820|
|1753|
|1753|
|1753|
+----+
only showing top 5 rows



In [None]:
rc.select(rc.IUCR).show(5)

+----+
|IUCR|
+----+
|0560|
|0820|
|1753|
|1753|
|1753|
+----+
only showing top 5 rows



In [None]:
rc.select(col('IUCR')).show(5)

+----+
|IUCR|
+----+
|0560|
|0820|
|1753|
|1753|
|1753|
+----+
only showing top 5 rows



In [None]:
# adding a column
rc.withColumn('One',lit(1)).show(5)

+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|      ID|Case Number|               Date|             Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|One|
+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
| 5741943|   HN549294|2007-08-25 09:22:18|074XX N ROGERS AVE|0560|             ASSAULT|              SIMPLE|               OTHER| false|   false|2422|      24|  49|             1|  

In [None]:
#drop column
rc = rc.drop(col('One'))

In [None]:
display(rc)

DataFrame[ID: int, Case Number: string, Date: timestamp, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: int, District: int, Ward: int, Community Area: int, FBI Code: string, X Coordinate: int, Y Coordinate: int, Year: int, Updated On: string, Latitude: double, Longitude: double, Location: string]

#working with rows

In [None]:
## adding data from 12 th of nov to a new dataframe
## and then adding it to our original data frame
one_day  = spark.read.csv('reported-crimes.csv',header=True,inferSchema=True)\
  .withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a'))\
  .filter(col('Date') == lit('2018-11-12'))
one_day.count()

4

In [None]:
rc.union(one_day).orderBy('Date',ascending=False).show(5)

+--------+-----------+-------------------+-------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|              Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+-------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11516594|   JB528186|2018-11-12 00:00:00|049XX S PRAIRIE AVE|2826|     OTHER OFFENSE|HARASSMENT BY ELE...|               OTHER| false| 

#Top ten reporeted crime

In [None]:
top_reported_crimes = rc.groupBy('Primary Type').count().orderBy('count',ascending=False)
top_reported_crimes.show(10)


+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1418529|
|            BATTERY|1232293|
|    CRIMINAL DAMAGE| 771523|
|          NARCOTICS| 711778|
|      OTHER OFFENSE| 419046|
|            ASSAULT| 418522|
|           BURGLARY| 388042|
|MOTOR VEHICLE THEFT| 314136|
| DECEPTIVE PRACTICE| 267273|
|            ROBBERY| 255603|
+-------------------+-------+
only showing top 10 rows



##Find the percentage of reported crime that led to an arrest


In [None]:
display(rc)   ## see the columns

DataFrame[ID: int, Case Number: string, Date: timestamp, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: int, District: int, Ward: int, Community Area: int, FBI Code: string, X Coordinate: int, Y Coordinate: int, Year: int, Updated On: string, Latitude: double, Longitude: double, Location: string]

###preparing the data

In [None]:
#percentage of reported crime leading to an arrest

df2 = rc.select('Primary Type','Arrest').filter(col('Arrest')==True)\
  .groupBy('Primary Type','Arrest').count()
df2.show(5)

+-----------------+------+------+
|     Primary Type|Arrest| count|
+-----------------+------+------+
|CRIMINAL TRESPASS|  true|141634|
|         GAMBLING|  true| 14318|
|   NON - CRIMINAL|  true|     6|
|     NON-CRIMINAL|  true|    10|
|HUMAN TRAFFICKING|  true|     9|
+-----------------+------+------+
only showing top 5 rows



In [None]:
df3 = df2.withColumnRenamed("count","arrest_count")
df3 = df3.withColumnRenamed("Primary Type", "Primary_Type")

In [None]:
df4 = top_reported_crimes.withColumnRenamed("count","report_count")

In [None]:
display(df3)

DataFrame[Primary_Type: string, Arrest: boolean, arrest_count: bigint]

###join the df3 and df4 to get count of arrest vs reported crimes

In [None]:
from pyspark.sql import functions as F
crime_count_df = df4.join(df3,F.col("Primary Type") == F.col("Primary_Type"))
crime_count_df.show(5)

+--------------------+------------+--------------------+------+------------+
|        Primary Type|report_count|        Primary_Type|Arrest|arrest_count|
+--------------------+------------+--------------------+------+------------+
|OFFENSE INVOLVING...|       46869|OFFENSE INVOLVING...|  true|       10165|
|CRIMINAL SEXUAL A...|        1407|CRIMINAL SEXUAL A...|  true|         249|
|            STALKING|        3388|            STALKING|  true|         549|
|PUBLIC PEACE VIOL...|       47785|PUBLIC PEACE VIOL...|  true|       30727|
|NON-CRIMINAL (SUB...|           9|NON-CRIMINAL (SUB...|  true|           3|
+--------------------+------------+--------------------+------+------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import expr
arrest_count = crime_count_df.withColumn("ArrestPercentage",expr("arrest_count/report_count *100"))
arrest_count.show(25)

+--------------------+------------+--------------------+------+------------+------------------+
|        Primary Type|report_count|        Primary_Type|Arrest|arrest_count|  ArrestPercentage|
+--------------------+------------+--------------------+------+------------+------------------+
|OFFENSE INVOLVING...|       46869|OFFENSE INVOLVING...|  true|       10165|21.688109411337987|
|CRIMINAL SEXUAL A...|        1407|CRIMINAL SEXUAL A...|  true|         249| 17.69722814498934|
|            STALKING|        3388|            STALKING|  true|         549|16.204250295159387|
|PUBLIC PEACE VIOL...|       47785|PUBLIC PEACE VIOL...|  true|       30727| 64.30260542011091|
|NON-CRIMINAL (SUB...|           9|NON-CRIMINAL (SUB...|  true|           3| 33.33333333333333|
|           OBSCENITY|         586|           OBSCENITY|  true|         472| 80.54607508532423|
|               ARSON|       11157|               ARSON|  true|        1464| 13.12180693734875|
|   DOMESTIC VIOLENCE|           1|   DO

In [None]:
from pyspark.sql.functions import round

result = arrest_count.select("Primary Type",round(col("ArrestPercentage"),2)).orderBy("ArrestPercentage",ascending=False).show(25)

+--------------------+--------------------------+
|        Primary Type|round(ArrestPercentage, 2)|
+--------------------+--------------------------+
|   DOMESTIC VIOLENCE|                     100.0|
|        PROSTITUTION|                     99.61|
|           NARCOTICS|                     99.42|
|    PUBLIC INDECENCY|                     99.38|
|            GAMBLING|                     99.28|
|LIQUOR LAW VIOLATION|                     99.12|
|CONCEALED CARRY L...|                     95.42|
|INTERFERENCE WITH...|                     91.72|
|           OBSCENITY|                     80.55|
|   WEAPONS VIOLATION|                     79.59|
|   CRIMINAL TRESPASS|                     73.24|
|OTHER NARCOTIC VI...|                     70.97|
|PUBLIC PEACE VIOL...|                      64.3|
|            HOMICIDE|                     51.44|
|NON-CRIMINAL (SUB...|                     33.33|
|         SEX OFFENSE|                     30.66|
|             ASSAULT|                     23.14|


In [None]:
total_arrest_percentage = rc.filter(col("Arrest") == True).count() / rc.select("Arrest").count()

In [None]:
print(total_arrest_percentage)

0.2775376777477739


## Find top 3 locations for reported crimes

In [None]:

display(rc)

DataFrame[ID: int, Case Number: string, Date: timestamp, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: int, District: int, Ward: int, Community Area: int, FBI Code: string, X Coordinate: int, Y Coordinate: int, Year: int, Updated On: string, Latitude: double, Longitude: double, Location: string]

In [None]:
rc.groupBy("Location Description").count().orderBy('count',ascending=False).show(3)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|1770638|
|           RESIDENCE|1146346|
|           APARTMENT| 699273|
+--------------------+-------+
only showing top 3 rows



# Built in functions

In [None]:
from pyspark.sql import functions as f
print(dir(functions))



##String functions

In [None]:
help(f.lower)

Help on function lower in module pyspark.sql.functions:

lower(col)
    Converts a string expression to lower case.
    
    .. versionadded:: 1.5



In [None]:
from pyspark.sql.functions import lower,upper,substring
rc.select(lower(col("Primary Type")).alias("crime_type"),col("id")).show(5)

+--------------------+--------+
|          crime_type|      id|
+--------------------+--------+
|             assault| 5741943|
|               theft| 1930689|
|offense involving...|12416974|
|offense involving...|12536164|
|offense involving...|12536166|
+--------------------+--------+
only showing top 5 rows



In [None]:
rc.select(upper(col("Primary Type")).alias("crime_type"),substring(col("Primary Type"),1,7).alias("subst_ex")).show(5)

+--------------------+--------+
|          crime_type|subst_ex|
+--------------------+--------+
|             ASSAULT| ASSAULT|
|               THEFT|   THEFT|
|OFFENSE INVOLVING...| OFFENSE|
|OFFENSE INVOLVING...| OFFENSE|
|OFFENSE INVOLVING...| OFFENSE|
+--------------------+--------+
only showing top 5 rows



##numeric functions

In [None]:
from pyspark.sql.functions import min,max


In [None]:
rc.select(min(col("date")),max(col("date"))).show(1)

+-------------------+-------------------+
|          min(date)|          max(date)|
+-------------------+-------------------+
|2001-01-01 00:00:00|2018-11-11 00:00:00|
+-------------------+-------------------+



##Date functions

In [None]:
from pyspark.sql.functions import date_add,date_sub

In [None]:
rc.select(date_sub(min(col("date")),3),date_add(max(col("date")),3)).show(1)

+----------------------+----------------------+
|date_sub(min(date), 3)|date_add(max(date), 3)|
+----------------------+----------------------+
|            2000-12-29|            2018-11-14|
+----------------------+----------------------+

