In [2]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=36b45b8e17b8149fab04a9b7ef1b44668241233f84d2c304ca18cd5f97f32245
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [31]:
import findspark
findspark.init()


In [32]:
from pyspark import SparkContext

In [33]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CrimeDataAnalysis") \
    .getOrCreate()

# Now you can use the 'spark' variable to read CSV files or perform other Spark operations


In [34]:
!wget https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-Present/ijzp-q8t2/

--2024-05-12 03:36:58--  https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-Present/ijzp-q8t2/
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.205, 52.206.140.199, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘index.html.1’

index.html.1            [  <=>               ] 771.94K  2.16MB/s    in 0.3s    

2024-05-12 03:37:00 (2.16 MB/s) - ‘index.html.1’ saved [790469]



In [85]:
from pyspark.sql.functions import to_timestamp,col,lit
read_csv = spark.read.csv('sample_data/crime.csv', header=True)
read_csv.show(5)


+--------+--------------------+--------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|   CASE#| DATE  OF OCCURRENCE|               BLOCK| IUCR| PRIMARY DESCRIPTION| SECONDARY DESCRIPTION| LOCATION DESCRIPTION|ARREST|DOMESTIC|BEAT|WARD|FBI CD|X COORDINATE|Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------+--------------------+--------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|JG497095|11/08/2023 08:50:...| 025XX N KEDZIE BLVD| 0810|               THEFT|             OVER $500|               STREET|     N|       N|1414|  35|    06|     1154609|     1916759|41.927407329| -87.70729439|(41.927407329, -8...|
|JG496991|11/08/2023 03:14:...| 0000X W CHICAGO AVE| 0560|             A

**Schema Handling**

In [86]:
read_csv.printSchema()

root
 |-- CASE#: string (nullable = true)
 |-- DATE  OF OCCURRENCE: string (nullable = true)
 |-- BLOCK: string (nullable = true)
 |--  IUCR: string (nullable = true)
 |--  PRIMARY DESCRIPTION: string (nullable = true)
 |--  SECONDARY DESCRIPTION: string (nullable = true)
 |--  LOCATION DESCRIPTION: string (nullable = true)
 |-- ARREST: string (nullable = true)
 |-- DOMESTIC: string (nullable = true)
 |-- BEAT: string (nullable = true)
 |-- WARD: string (nullable = true)
 |-- FBI CD: string (nullable = true)
 |-- X COORDINATE: string (nullable = true)
 |-- Y COORDINATE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LOCATION: string (nullable = true)



In [87]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, IntegerType

In [88]:
read_csv.columns

['CASE#',
 'DATE  OF OCCURRENCE',
 'BLOCK',
 ' IUCR',
 ' PRIMARY DESCRIPTION',
 ' SECONDARY DESCRIPTION',
 ' LOCATION DESCRIPTION',
 'ARREST',
 'DOMESTIC',
 'BEAT',
 'WARD',
 'FBI CD',
 'X COORDINATE',
 'Y COORDINATE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION']

In [89]:
labels= [('CASE#',StringType()),
('BLOCK',StringType()),
(' IUCR',StringType()),
(' PRIMARY DESCRIPTION',StringType()),
(' SECONDARY DESCRIPTION',StringType()),
(' LOCATION DESCRIPTION',StringType()),
('ARREST',StringType()),
('DOMESTIC', BooleanType()),
('BEAT', StringType()),
('WARD',StringType()),
('FBI CD', StringType()),
('X COORDINATE',StringType()),
('Y COORDINATE',StringType()),
('LATITUDE', DoubleType()),
('LONGITUDE', DoubleType()),
('LOCATION', StringType())]

In [90]:
schema = StructType([StructField(x[0],x[1],True) for x in labels])


In [91]:
csv_df = spark.read.csv('sample_data/crime.csv', schema = schema)

In [92]:
csv_df.printSchema()

root
 |-- CASE#: string (nullable = true)
 |-- BLOCK: string (nullable = true)
 |--  IUCR: string (nullable = true)
 |--  PRIMARY DESCRIPTION: string (nullable = true)
 |--  SECONDARY DESCRIPTION: string (nullable = true)
 |--  LOCATION DESCRIPTION: string (nullable = true)
 |-- ARREST: string (nullable = true)
 |-- DOMESTIC: boolean (nullable = true)
 |-- BEAT: string (nullable = true)
 |-- WARD: string (nullable = true)
 |-- FBI CD: string (nullable = true)
 |-- X COORDINATE: string (nullable = true)
 |-- Y COORDINATE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)



In [93]:
# csv_df.show(5)

**Showing column**

In [94]:
read_csv.select(' IUCR').show(5)

+-----+
| IUCR|
+-----+
| 0810|
| 0560|
| 051A|
| 0820|
| 0810|
+-----+
only showing top 5 rows



In [95]:
read_csv.select(col(' IUCR')).show(5)

+-----+
| IUCR|
+-----+
| 0810|
| 0560|
| 051A|
| 0820|
| 0810|
+-----+
only showing top 5 rows



**Showing multiple cols**

In [96]:
read_csv.select(' IUCR', ' PRIMARY DESCRIPTION', 'DOMESTIC').show(5)

+-----+--------------------+--------+
| IUCR| PRIMARY DESCRIPTION|DOMESTIC|
+-----+--------------------+--------+
| 0810|               THEFT|       N|
| 0560|             ASSAULT|       N|
| 051A|             ASSAULT|       N|
| 0820|               THEFT|       N|
| 0810|               THEFT|       N|
+-----+--------------------+--------+
only showing top 5 rows



**Add new column named NEW COL with all entries 1s

In [97]:
from pyspark.sql.functions import lit

In [98]:
read_csv.withColumn('NEW COLUMN', lit(1)).show(5)

+--------+--------------------+--------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+----------+
|   CASE#| DATE  OF OCCURRENCE|               BLOCK| IUCR| PRIMARY DESCRIPTION| SECONDARY DESCRIPTION| LOCATION DESCRIPTION|ARREST|DOMESTIC|BEAT|WARD|FBI CD|X COORDINATE|Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|NEW COLUMN|
+--------+--------------------+--------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+----------+
|JG497095|11/08/2023 08:50:...| 025XX N KEDZIE BLVD| 0810|               THEFT|             OVER $500|               STREET|     N|       N|1414|  35|    06|     1154609|     1916759|41.927407329| -87.70729439|(41.927407329, -8...|         1|
|JG496991|11/08/2023 03:14:.

**Dropping column ` IUCR`**

In [99]:
read_csv.drop(' IUCR').show(5)

+--------+--------------------+--------------------+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|   CASE#| DATE  OF OCCURRENCE|               BLOCK| PRIMARY DESCRIPTION| SECONDARY DESCRIPTION| LOCATION DESCRIPTION|ARREST|DOMESTIC|BEAT|WARD|FBI CD|X COORDINATE|Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------+--------------------+--------------------+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|JG497095|11/08/2023 08:50:...| 025XX N KEDZIE BLVD|               THEFT|             OVER $500|               STREET|     N|       N|1414|  35|    06|     1154609|     1916759|41.927407329| -87.70729439|(41.927407329, -8...|
|JG496991|11/08/2023 03:14:...| 0000X W CHICAGO AVE|             ASSAULT|                SIMPLE|

#### WORKING WITH ROWS

In [104]:
read_csv.select('DATE  OF OCCURRENCE').show(5)

+--------------------+
| DATE  OF OCCURRENCE|
+--------------------+
|11/08/2023 08:50:...|
|11/08/2023 03:14:...|
|11/08/2023 10:55:...|
|03/07/2024 02:15:...|
|03/07/2024 04:53:...|
+--------------------+
only showing top 5 rows



Filtering on Date

In [113]:
one_day_crime_df = read_csv.filter(col('DATE  OF OCCURRENCE').contains('11/09/2023'))
one_day_crime_df.count()



696

In [114]:
one_day_crime_df.show(5)

+--------+--------------------+-------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|   CASE#| DATE  OF OCCURRENCE|              BLOCK| IUCR| PRIMARY DESCRIPTION| SECONDARY DESCRIPTION| LOCATION DESCRIPTION|ARREST|DOMESTIC|BEAT|WARD|FBI CD|X COORDINATE|Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------+--------------------+-------------------+-----+--------------------+----------------------+---------------------+------+--------+----+----+------+------------+------------+------------+-------------+--------------------+
|JG497283|11/09/2023 12:00:...|  001XX N DAMEN AVE| 0810|               THEFT|             OVER $500| PARKING LOT / GAR...|     N|       N|1223|  27|    06|     1163055|     1900872|41.883638831|-87.676705638|(41.883638831, -8...|
|JG503042|11/09/2023 12:00:...|  077XX S PEORIA ST| 2826|       OTHER OFFENS