In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [None]:
#reading the bucket from Google CLoud Storage 
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("CSVReader").getOrCreate()

csv_file_path = "gs://nypd-shooting-dataset/NYPD_Shooting_Incident_Data_Historic_Corrected.csv"

df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
df.printSchema()
df.head()


In [4]:
#replace empty values with unknown

from pyspark.sql.functions import col 

df = df.fillna('UNKNOWN') 
df.select('PERP_AGE_GROUP', 'PERP_SEX', 'PERP_RACE' ).show()


+--------------+--------+---------+
|PERP_AGE_GROUP|PERP_SEX|PERP_RACE|
+--------------+--------+---------+
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|         25-44|       M|    BLACK|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|         25-44|       M|    BLACK|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|         25-44|       M|    BLACK|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN|       U|  UNKNOWN|
|       UNKNOWN|       U|  UNKNOWN|
|       UNKNOWN|       M|  UNKNOWN|
|         25-44|       M|    BLACK|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|       UNKNOWN| UNKNOWN|  UNKNOWN|
|         25-44|       M|    BLACK|
+--------------+--------+---------+
only showing top 20 rows



In [5]:
#format date column from hh:mm:ss to hh:mm

from pyspark.sql.functions import col, substring

# Assuming your time column is named 'OCCUR_TIME'
df = df.withColumn('OCCUR_TIME', substring(col('OCCUR_TIME'), 1, 5))

# Show the updated DataFrame
df.select('OCCUR_TIME').show()


+----------+
|OCCUR_TIME|
+----------+
|     21:30|
|     17:40|
|     03:56|
|     18:30|
|     22:58|
|     21:36|
|     22:47|
|     19:41|
|     05:45|
|     01:10|
|     03:21|
|     01:27|
|     20:17|
|     21:58|
|     20:13|
|     01:27|
|     02:22|
|     21:07|
|     02:44|
|     21:17|
+----------+
only showing top 20 rows



In [6]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import year, month, dayofweek, to_date
# Create a Spark sessionspark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

In [7]:
df = df.withColumn("OCCUR_DATE", to_date("OCCUR_DATE", "MM/dd/yyyy"))

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, date_format
df = df.withColumn("Year", year("OCCUR_DATE")).withColumn("Month", month("OCCUR_DATE")).withColumn("DayOfWeek", date_format("OCCUR_DATE", 'E'))
df.select('OCCUR_DATE', 'Year', 'Month', 'DayOfWeek').show()


+----------+----+-----+---------+
|OCCUR_DATE|Year|Month|DayOfWeek|
+----------+----+-----+---------+
|2021-05-27|2021|    5|      Thu|
|2014-06-27|2014|    6|      Fri|
|2015-11-21|2015|   11|      Sat|
|2015-10-09|2015|   10|      Fri|
|2009-02-19|2009|    2|      Thu|
|2020-10-21|2020|   10|      Wed|
|2012-06-17|2012|    6|      Sun|
|2010-03-08|2010|    3|      Mon|
|2012-02-05|2012|    2|      Sun|
|2012-08-26|2012|    8|      Sun|
|2010-10-10|2010|   10|      Sun|
|2010-08-29|2010|    8|      Sun|
|2021-10-09|2021|   10|      Sat|
|2011-05-25|2011|    5|      Wed|
|2008-11-09|2008|   11|      Sun|
|2007-07-05|2007|    7|      Thu|
|2010-07-27|2010|    7|      Tue|
|2012-11-14|2012|   11|      Wed|
|2021-07-01|2021|    7|      Thu|
|2021-03-07|2021|    3|      Sun|
+----------+----+-----+---------+
only showing top 20 rows



In [14]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("NYPD_data").getOrCreate()

# Assuming df is your PySpark DataFrame after preprocessing

# Write the preprocessed data to GCS
bucket_name = "nypd_processed"  # Replace with your GCS bucket name
output_path = "gs://{}/nypd_processed".format(bucket_name)  # Replace with your desired GCS path

# Use the write method directly on df
df.write.mode("overwrite").parquet(output_path)


In [15]:
df.show()

+------------+----------+----------+---------+-----------------+--------+-----------------+------------------+--------------------+-----------------------+--------------+--------+---------+-------------+-------+--------------+------------+-------------+------------------+------------------+--------------------+----+-----+---------+
|INCIDENT_KEY|OCCUR_DATE|OCCUR_TIME|     BORO|LOC_OF_OCCUR_DESC|PRECINCT|JURISDICTION_CODE|LOC_CLASSFCTN_DESC|       LOCATION_DESC|STATISTICAL_MURDER_FLAG|PERP_AGE_GROUP|PERP_SEX|PERP_RACE|VIC_AGE_GROUP|VIC_SEX|      VIC_RACE|  X_COORD_CD|   Y_COORD_CD|          Latitude|         Longitude|             Lon_Lat|Year|Month|DayOfWeek|
+------------+----------+----------+---------+-----------------+--------+-----------------+------------------+--------------------+-----------------------+--------------+--------+---------+-------------+-------+--------------+------------+-------------+------------------+------------------+--------------------+----+-----+---------