In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [28]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar -xzf spark-3.5.0-bin-hadoop3.tgz

!pip install -q findspark


import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
findspark.init()


In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType


In [29]:
spark = SparkSession.builder \
    .appName("NYC 311 Dataset") \
    .getOrCreate()


In [30]:
df = spark.read.csv("/content/drive/MyDrive/Csv folder/311-service-requests-from-2010-to-present.csv", header=True, inferSchema=True)
df.printSchema()


root
 |-- Unique Key: integer (nullable = true)
 |-- Created Date: timestamp (nullable = true)
 |-- Closed Date: timestamp (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location Type: string (nullable = true)
 |-- Incident Zip: integer (nullable = true)
 |-- Incident Address: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Cross Street 1: string (nullable = true)
 |-- Cross Street 2: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Landmark: string (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Due Date: string (nullable = true)
 |-- Resolution Description: string (nullable = true)
 |-- Resolutio

In [46]:
for i, col in enumerate(df):
    print(f"{i}: {col}")


0: Column<'Unique Key'>
1: Column<'Created Date'>
2: Column<'Closed Date'>
3: Column<'Agency'>
4: Column<'Agency Name'>
5: Column<'Complaint Type'>
6: Column<'Descriptor'>
7: Column<'Location Type'>
8: Column<'Incident Zip'>
9: Column<'Incident Address'>
10: Column<'Street Name'>
11: Column<'Cross Street 1'>
12: Column<'Cross Street 2'>
13: Column<'Intersection Street 1'>
14: Column<'Intersection Street 2'>
15: Column<'Address Type'>
16: Column<'City'>
17: Column<'Landmark'>
18: Column<'Facility Type'>
19: Column<'Status'>
20: Column<'Due Date'>
21: Column<'Resolution Description'>
22: Column<'Resolution Action Updated Date'>
23: Column<'Community Board'>
24: Column<'BBL'>
25: Column<'Borough'>
26: Column<'X Coordinate (State Plane)'>
27: Column<'Y Coordinate (State Plane)'>
28: Column<'Open Data Channel Type'>
29: Column<'Park Facility Name'>
30: Column<'Park Borough'>
31: Column<'Vehicle Type'>
32: Column<'Taxi Company Borough'>
33: Column<'Taxi Pick Up Location'>
34: Column<'Bridge 

In [31]:
from pyspark.sql.functions import col

columns_to_check = ['Created Date', 'Complaint Type', 'Borough']
for column in columns_to_check:
    missing = df.filter(col(column).isNull() | (col(column) == "")).count()
    print(f"Missing values in '{column}': {missing}")

df_clean = df.dropna(subset=["Created Date", "Complaint Type", "Borough"])


Missing values in 'Created Date': 0
Missing values in 'Complaint Type': 0
Missing values in 'Borough': 0


In [32]:
from pyspark.sql.functions import to_timestamp

df_clean = df_clean.withColumn("Created Date", to_timestamp("Created Date", "MM/dd/yyyy hh:mm:ss a"))
df_clean.select("Created Date").show(5, truncate=False)


+-------------------+
|Created Date       |
+-------------------+
|2019-12-01 02:04:01|
|2019-12-01 01:59:41|
|2019-12-01 01:59:08|
|2019-12-01 01:58:23|
|2019-12-01 01:58:07|
+-------------------+
only showing top 5 rows



In [33]:
from pyspark.sql.functions import desc

complaint_counts = df.groupBy("Complaint Type").count().orderBy(desc("count"))

complaint_counts.show(10, truncate=False)


+--------------------------+-------+
|Complaint Type            |count  |
+--------------------------+-------+
|Illegal Parking           |2484920|
|Noise - Residential       |2087491|
|Blocked Driveway          |1650587|
|Noise - Street/Sidewalk   |1481688|
|For Hire Vehicle Complaint|901169 |
|Street Condition          |712470 |
|Noise - Vehicle           |678929 |
|Water System              |557700 |
|Abandoned Vehicle         |511689 |
|Street Light Condition    |497443 |
+--------------------------+-------+
only showing top 10 rows



In [34]:
borough_counts = df.groupBy("Borough").count().orderBy(desc("count"))

borough_counts.show()


+-------------+-------+
|      Borough|  count|
+-------------+-------+
|     BROOKLYN|6804536|
|       QUEENS|5730027|
|    MANHATTAN|4884708|
|        BRONX|3358118|
|STATEN ISLAND| 997615|
|  Unspecified| 184996|
+-------------+-------+



In [43]:
rdd_raw = spark.sparkContext.textFile("/content/drive/MyDrive/Csv folder/311-service-requests-from-2010-to-present.csv")

header = rdd_raw.first()

rdd = rdd_raw.filter(lambda row: row != header)

rdd_split = rdd.map(lambda row: row.split(","))


In [47]:
complaint_index = 5

complaint_counts = (
    rdd_split
    .map(lambda row: (row[complaint_index].strip().upper(), 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda x: -x[1])
)

complaint_counts.take(10)


[('ILLEGAL PARKING', 2484920),
 ('NOISE - RESIDENTIAL', 2087491),
 ('BLOCKED DRIVEWAY', 1650587),
 ('NOISE - STREET/SIDEWALK', 1481688),
 ('FOR HIRE VEHICLE COMPLAINT', 901169),
 ('STREET CONDITION', 712470),
 ('NOISE - VEHICLE', 678929),
 ('WATER SYSTEM', 557700),
 ('ABANDONED VEHICLE', 511689),
 ('STREET LIGHT CONDITION', 497443)]

In [48]:
borough_index = 25

borough_counts = (
    rdd_split
    .map(lambda row: (row[borough_index].strip().upper(), 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda x: -x[1])
)

borough_counts.collect()


[('BROOKLYN', 6303792),
 ('QUEENS', 5375393),
 ('MANHATTAN', 4494707),
 ('BRONX', 3080197),
 ('STATEN ISLAND', 894504),
 ('UNSPECIFIED', 169641),
 ('', 130486),
 ('05 MANHATTAN', 26290),
 ('01 MANHATTAN', 24152),
 ('01 BROOKLYN', 24125),
 ('07 QUEENS', 23933),
 ('03 MANHATTAN', 19769),
 ('07 MANHATTAN', 17634),
 ('12 QUEENS', 17512),
 ('05 QUEENS', 17455),
 ('04 MANHATTAN', 15491),
 ('14 BROOKLYN', 15263),
 ('12 BROOKLYN', 15215),
 ('02 MANHATTAN', 13218),
 ('02 STATEN ISLAND', 13121),
 ('15 BROOKLYN', 13102),
 ('09 QUEENS', 13064),
 ('06 MANHATTAN', 11167),
 ('08 MANHATTAN', 11134),
 ('01 QUEENS', 11033),
 ('08 BROOKLYN', 10967),
 ('02 QUEENS', 10955),
 ('06 BROOKLYN', 10954),
 ('07 BROOKLYN', 10947),
 ('03 BROOKLYN', 10934),
 ('01 STATEN ISLAND', 10922),
 ('11 BROOKLYN', 10911),
 ('06 QUEENS', 10909),
 ('18 BROOKLYN', 10889),
 ('02 BROOKLYN', 8942),
 ('17 BROOKLYN', 8759),
 ('2028570028', 8628),
 ('13 QUEENS', 6669),
 ('04 BROOKLYN', 6639),
 ('08 QUEENS', 6625),
 ('11 QUEENS', 6597),

In [50]:
df = spark.read.csv("/content/drive/MyDrive/Csv folder/311-service-requests-from-2010-to-present.csv", header=True, inferSchema=True)

df.createOrReplaceTempView("nyc311")


In [53]:
complaint_df = df.groupBy("Complaint Type").count().orderBy("count", ascending=False)

complaint_df.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#774L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#774L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=586]
      +- HashAggregate(keys=[Complaint Type#608], functions=[count(1)])
         +- Exchange hashpartitioning(Complaint Type#608, 200), ENSURE_REQUIREMENTS, [plan_id=583]
            +- HashAggregate(keys=[Complaint Type#608], functions=[partial_count(1)])
               +- FileScan csv [Complaint Type#608] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Csv folder/311-service-requests-from-2010-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Complaint Type:string>




In [51]:
spark.sql("""
    SELECT `Complaint Type`, COUNT(*) AS count
    FROM nyc311
    WHERE `Complaint Type` IS NOT NULL
    GROUP BY `Complaint Type`
    ORDER BY count DESC
    LIMIT 10
""").show()


+--------------------+-------+
|      Complaint Type|  count|
+--------------------+-------+
|     Illegal Parking|2484920|
| Noise - Residential|2087491|
|    Blocked Driveway|1650587|
|Noise - Street/Si...|1481688|
|For Hire Vehicle ...| 901169|
|    Street Condition| 712470|
|     Noise - Vehicle| 678929|
|        Water System| 557700|
|   Abandoned Vehicle| 511689|
|Street Light Cond...| 497443|
+--------------------+-------+



In [54]:
complaint_df = df.groupBy("Borough").count().orderBy("count", ascending=False)

complaint_df.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#827L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#827L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=606]
      +- HashAggregate(keys=[Borough#628], functions=[count(1)])
         +- Exchange hashpartitioning(Borough#628, 200), ENSURE_REQUIREMENTS, [plan_id=603]
            +- HashAggregate(keys=[Borough#628], functions=[partial_count(1)])
               +- FileScan csv [Borough#628] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Csv folder/311-service-requests-from-2010-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Borough:string>




In [52]:
spark.sql("""
    SELECT Borough, COUNT(*) AS count
    FROM nyc311
    WHERE Borough IS NOT NULL
    GROUP BY Borough
    ORDER BY count DESC
""").show()


+-------------+-------+
|      Borough|  count|
+-------------+-------+
|     BROOKLYN|6804536|
|       QUEENS|5730027|
|    MANHATTAN|4884708|
|        BRONX|3358118|
|STATEN ISLAND| 997615|
|  Unspecified| 184996|
+-------------+-------+

