In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("adta4240grp1") \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .getOrCreate()

24/04/18 22:23:02 INFO SparkEnv: Registering MapOutputTracker
24/04/18 22:23:02 INFO SparkEnv: Registering BlockManagerMaster
24/04/18 22:23:02 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/04/18 22:23:02 INFO SparkEnv: Registering OutputCommitCoordinator


In [9]:
df = spark.read.csv('gs://adta5240grp1_streaming/combined/311_NYC_SRs.csv', header=True, inferSchema=True)

                                                                                

In [12]:
df.dtypes

[('unique_key', 'int'),
 ('created_date', 'string'),
 ('closed_date', 'string'),
 ('agency', 'string'),
 ('agency_name', 'string'),
 ('complaint_type', 'string'),
 ('descriptor', 'string'),
 ('location_type', 'string'),
 ('incident_zip', 'int'),
 ('incident_address', 'string'),
 ('street_name', 'string'),
 ('cross_street_1', 'string'),
 ('cross_street_2', 'string'),
 ('intersection_street_1', 'string'),
 ('intersection_street_2', 'string'),
 ('address_type', 'string'),
 ('city', 'string'),
 ('landmark', 'string'),
 ('facility_type', 'string'),
 ('status', 'string'),
 ('due_date', 'string'),
 ('resolution_description', 'string'),
 ('resolution_action_updated_date', 'string'),
 ('community_board', 'string'),
 ('bbl', 'bigint'),
 ('borough', 'string'),
 ('x_coordinate_(state_plane)', 'int'),
 ('y_coordinate_(state_plane)', 'int'),
 ('open_data_channel_type', 'string'),
 ('park_facility_name', 'string'),
 ('park_borough', 'string'),
 ('vehicle_type', 'string'),
 ('latitude', 'double'),
 ('

In [13]:
columns_to_drop = ["location","park_borough","y_coordinate_(state_plane)","park_facility_name","x_coordinate_(state_plane)","bbl","community_board","due_date","address_type","agency","cross_street_1","facility_type","intersection_street_1","intersection_street_2","cross_street_2","community_districts", "borough_boundaries", "city_council_districts", "police_precincts", "police_precinct", "taxi_company_borough", "taxi_pick_up_location", "bridge_highway_name", "bridge_highway_direction", "road_ramp", "bridge_highway_segment"]
df = df.drop(*columns_to_drop)

In [14]:
df.dtypes

[('unique_key', 'int'),
 ('created_date', 'string'),
 ('closed_date', 'string'),
 ('agency_name', 'string'),
 ('complaint_type', 'string'),
 ('descriptor', 'string'),
 ('location_type', 'string'),
 ('incident_zip', 'int'),
 ('incident_address', 'string'),
 ('street_name', 'string'),
 ('city', 'string'),
 ('landmark', 'string'),
 ('status', 'string'),
 ('resolution_description', 'string'),
 ('resolution_action_updated_date', 'string'),
 ('borough', 'string'),
 ('open_data_channel_type', 'string'),
 ('vehicle_type', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('zip_codes', 'int')]

In [16]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("new_unique_key", monotonically_increasing_id() + 1)

In [17]:
df = df.drop("unique_key")

# Rename the new 'new_unique_key' column back to 'unique_key'
df = df.withColumnRenamed("new_unique_key", "unique_key")

In [18]:
df.show()

+--------------+--------------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------------+----------------+-----------------+------+----------------------+------------------------------+---------+----------------------+------------+-----------+------------+---------+----------+
|  created_date|   closed_date|         agency_name|      complaint_type|      descriptor|       location_type|incident_zip|    incident_address|      street_name|            city|         landmark|status|resolution_description|resolution_action_updated_date|  borough|open_data_channel_type|vehicle_type|   latitude|   longitude|zip_codes|unique_key|
+--------------+--------------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------------+----------------+-----------------+------+----------------------+------------------------------+---------+----------------------+----

In [19]:
all_columns = df.columns
all_columns.remove('unique_key')
new_column_order = ['unique_key'] + all_columns
df = df.select(new_column_order)
df.show()

+----------+--------------+--------------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------------+----------------+-----------------+------+----------------------+------------------------------+---------+----------------------+------------+-----------+------------+---------+
|unique_key|  created_date|   closed_date|         agency_name|      complaint_type|      descriptor|       location_type|incident_zip|    incident_address|      street_name|            city|         landmark|status|resolution_description|resolution_action_updated_date|  borough|open_data_channel_type|vehicle_type|   latitude|   longitude|zip_codes|
+----------+--------------+--------------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------------+----------------+-----------------+------+----------------------+------------------------------+---------+----------------

In [25]:
df.dtypes

[('unique_key', 'bigint'),
 ('created_date', 'date'),
 ('closed_date', 'date'),
 ('agency_name', 'string'),
 ('complaint_type', 'string'),
 ('descriptor', 'string'),
 ('location_type', 'string'),
 ('incident_zip', 'int'),
 ('incident_address', 'string'),
 ('street_name', 'string'),
 ('city', 'string'),
 ('landmark', 'string'),
 ('status', 'string'),
 ('resolution_description', 'string'),
 ('resolution_action_updated_date', 'string'),
 ('borough', 'string'),
 ('open_data_channel_type', 'string'),
 ('vehicle_type', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('zip_codes', 'int')]

In [24]:
from pyspark.sql.functions import col, to_date, when, desc

df = df.withColumn("created_date", to_date(col("created_date"), "MM/dd/yyyy")) \
       .withColumn("closed_date", to_date(col("closed_date"), "MM/dd/yyyy")) 

In [30]:
df.show()

+----------+------------+-----------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------------+----------------+-----------------+------+----------------------+------------------------------+---------+----------------------+------------+-----------+------------+---------+
|unique_key|created_date|closed_date|         agency_name|      complaint_type|      descriptor|       location_type|incident_zip|    incident_address|      street_name|            city|         landmark|status|resolution_description|resolution_action_updated_date|  borough|open_data_channel_type|vehicle_type|   latitude|   longitude|zip_codes|
+----------+------------+-----------+--------------------+--------------------+----------------+--------------------+------------+--------------------+-----------------+----------------+-----------------+------+----------------------+------------------------------+---------+----------------------+--------

                                                                                

In [29]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [31]:
df.count()

                                                                                

776316

In [38]:
df.dtypes

[('unique_key', 'bigint'),
 ('created_date', 'date'),
 ('closed_date', 'date'),
 ('agency_name', 'string'),
 ('complaint_type', 'string'),
 ('descriptor', 'string'),
 ('location_type', 'string'),
 ('incident_zip', 'int'),
 ('incident_address', 'string'),
 ('street_name', 'string'),
 ('city', 'string'),
 ('landmark', 'string'),
 ('status', 'string'),
 ('resolution_description', 'string'),
 ('resolution_action_updated_date', 'string'),
 ('borough', 'string'),
 ('open_data_channel_type', 'string'),
 ('vehicle_type', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('zip_codes', 'int')]

In [40]:
noise_df = df.select(
    'created_date',
    'closed_date',
    'complaint_type',
    'descriptor',
    'location_type',
    'incident_zip',
    'borough',
    'city',
    'status',
    'open_data_channel_type'
)

In [46]:
import os

# Point to the JSON key file of your service account
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'gs://adtagroup1/key/keys.json'

# Define the GCS bucket pat
output_path = 'gs://adta5240grp1_streaming/transformed/'

# Coalesce the DataFrame into 1 partition
noise_df.coalesce(1).write.csv(output_path, mode='overwrite', header=True)

                                                                                

In [34]:
from pyspark.sql.functions import when, desc

# Count the number of noise complaints by type
complaints_by_type = noise_df.groupBy('complaint_type').count().orderBy(desc('count'))

# Count the number of noise complaints by descriptor
complaints_by_descriptor = noise_df.groupBy('descriptor').count().orderBy(desc('count'))

# Count the number of noise complaints by location type
complaints_by_location_type = noise_df.groupBy('location_type').count().orderBy(desc('count'))

# Count the number of noise complaints by borough
complaints_by_borough = noise_df.groupBy('borough').count().orderBy(desc('count'))

# Count the number of noise complaints by city
complaints_by_city = noise_df.groupBy('city').count().orderBy(desc('count'))

# Count the number of noise complaints by status
complaints_by_status = noise_df.groupBy('status').count().orderBy(desc('count'))

In [35]:
# Count the number of noise complaints by open data channel type
complaints_by_channel_type = noise_df.groupBy('open_data_channel_type').count().orderBy(desc('count'))

In [36]:
# Show the top 5 results for each analysis
complaints_by_type.show(5)
complaints_by_descriptor.show(5)
complaints_by_location_type.show(5)
complaints_by_borough.show(5)
complaints_by_city.show(5)
complaints_by_status.show(5)
complaints_by_channel_type.show(5)

                                                                                

+--------------------+------+
|      complaint_type| count|
+--------------------+------+
| Noise - Residential|372382|
|Noise - Street/Si...|149177|
|  Noise - Commercial| 83541|
|               Noise| 59549|
|     Noise - Vehicle| 54788|
+--------------------+------+
only showing top 5 rows



                                                                                

+----------------+------+
|      descriptor| count|
+----------------+------+
|Loud Music/Party|394961|
|Banging/Pounding|138526|
|    Loud Talking| 70307|
|           Other| 47488|
| Car/Truck Music| 35001|
+----------------+------+
only showing top 5 rows



                                                                                

+--------------------+------+
|       location_type| count|
+--------------------+------+
|Residential Build...|372382|
|     Street/Sidewalk|203965|
|                null| 59549|
|       Above Address| 48731|
|    Store/Commercial| 43355|
+--------------------+------+
only showing top 5 rows



                                                                                

+-------------+------+
|      borough| count|
+-------------+------+
|    MANHATTAN|218319|
|     BROOKLYN|214418|
|       QUEENS|163610|
|        BRONX|162803|
|STATEN ISLAND| 17164|
+-------------+------+
only showing top 5 rows



                                                                                

+-------------+------+
|         city| count|
+-------------+------+
|     NEW YORK|218306|
|     BROOKLYN|214394|
|        BRONX|162776|
|      JAMAICA| 18094|
|STATEN ISLAND| 17162|
+-------------+------+
only showing top 5 rows



                                                                                

+-----------+------+
|     status| count|
+-----------+------+
|     Closed|776286|
|Unspecified|    23|
|    Started|     7|
+-----------+------+





+----------------------+------+
|open_data_channel_type| count|
+----------------------+------+
|                ONLINE|510460|
|                MOBILE|185107|
|                 PHONE| 80713|
|               UNKNOWN|    31|
|                 OTHER|     5|
+----------------------+------+



                                                                                