In [60]:
import pyspark.sql
from pyspark.sql.functions import col, count, isnan, to_timestamp, mean, max, min, lit, when
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession

# Sources 
- https://mungingdata.com/python/writing-parquet-pandas-pyspark-koalas/
- https://sparkbyexamples.com/pyspark/pyspark-dataframe-shape/
- https://sparkbyexamples.com/pyspark/pyspark-drop-column-from-dataframe/
- https://www.programmingfunda.com/how-to-count-null-and-nan-values-in-each-column-in-pyspark-dataframe/
- https://stackoverflow.com/questions/59969378/how-do-i-calculate-the-percentage-of-none-or-nan-values-in-pyspark
- https://sparkbyexamples.com/spark/show-top-n-rows-in-spark-pyspark/?expand_article=1
- https://www.geeksforgeeks.org/pyspark-count-distinct-from-dataframe/

In [61]:
# creating spark sessions
spark = SparkSession.builder.getOrCreate()

In [62]:
# reading subway data
df = spark.read.csv("MTA_Subway_Hourly_Ridership__Beginning_February_2022.csv", header=True, inferSchema=True)



                                                                                

In [63]:
# first 5 rows 
df.show(n = 5)

+--------------------+------------------+----------------+-------+------+--------------+---------+---------+---------+---------+--------------------+------------------------+------------------+--------+
|   transit_timestamp|station_complex_id| station_complex|borough|routes|payment_method|ridership|transfers| latitude|longitude|        Georeference|NYS Municipal Boundaries|New York Zip Codes|Counties|
+--------------------+------------------+----------------+-------+------+--------------+---------+---------+---------+---------+--------------------+------------------------+------------------+--------+
|03/03/2022 10:00:...|              N535|Carroll St (F,G)|     BK|   F,G|           all|        1|        0|  40.6803|-73.99505|POINT (-73.99505 ...|                     894|               814|    2090|
|02/06/2022 10:00:...|             N408A|   Nassau Av (G)|     BK|     G|           all|       78|        0|40.724636|-73.95128|POINT (-73.95128 ...|                     894|              

In [64]:
# number of in the station complex id column
for column in df.columns:
    distinct_values = df.select(column).distinct().count()
    print(f"{column} has {distinct_values} distinct values")

                                                                                

transit_timestamp has 11951 distinct values


                                                                                

station_complex_id has 423 distinct values


                                                                                

station_complex has 423 distinct values


                                                                                

borough has 4 distinct values


                                                                                

routes has 92 distinct values


                                                                                

payment_method has 1 distinct values


                                                                                

ridership has 10803 distinct values


                                                                                

transfers has 1815 distinct values


                                                                                

latitude has 422 distinct values


                                                                                

longitude has 422 distinct values


                                                                                

Georeference has 423 distinct values


                                                                                

NYS Municipal Boundaries has 4 distinct values


                                                                                

New York Zip Codes has 120 distinct values




Counties has 5 distinct values


                                                                                

### Columns to drop 
- transfers: irrelevant
- station_complex: redundant, use station_complex_id instead since they're are both identifiers for each station
- routes - irrelevant: multiple values for some rows, may be complicated to create dummies
- payment method - irrelevant
- counties - irrelevant and redundat - can use borough instead since the values are more meaningful e.g. M represents Manhattan
- NYS Municipal Boundaries - irrelevant  
- Zip codes - irrelevant, incomplete as one of the some rows are null, subway complex ID can be used to represent this instead

In [65]:
# dropping columns
irrelevant_columns = ["routes", "payment_method", "transfers", "NYS Municipal Boundaries", "New York Zip Codes", "Counties", "station_complex"]
df = df.drop(*irrelevant_columns)

In [66]:
# first 5 rows 
df.show(n = 5)

+--------------------+------------------+-------+---------+---------+---------+--------------------+
|   transit_timestamp|station_complex_id|borough|ridership| latitude|longitude|        Georeference|
+--------------------+------------------+-------+---------+---------+---------+--------------------+
|03/03/2022 10:00:...|              N535|     BK|        1|  40.6803|-73.99505|POINT (-73.99505 ...|
|02/06/2022 10:00:...|             N408A|     BK|       78|40.724636|-73.95128|POINT (-73.95128 ...|
|02/06/2022 11:00:...|             N408A|     BK|       82|40.724636|-73.95128|POINT (-73.95128 ...|
|07/02/2022 11:00:...|             N408A|     BK|       87|40.724636|-73.95128|POINT (-73.95128 ...|
|07/03/2022 04:00:...|             N408A|     BK|       16|40.724636|-73.95128|POINT (-73.95128 ...|
+--------------------+------------------+-------+---------+---------+---------+--------------------+
only showing top 5 rows



In [67]:
# data frame shape 
rows = df.count()
columns = len(df.columns)
print(f"There are {rows} rows and {columns} columns in this data frame")

There are 4956086 rows and 7 columns in this data frame


[Stage 419:>                                                        (0 + 8) / 8]                                                                                

In [68]:
# distinct boroughs
df.select("borough").distinct().show()

[Stage 422:>                                                        (0 + 8) / 8]

+-------+
|borough|
+-------+
|      Q|
|     BX|
|      M|
|     BK|
+-------+





In [69]:
## Filtering for stations in Manhattan
df = df.filter(col("borough") == "M")

In [70]:
# shape of filtered data frame
rows = df.count()
columns = len(df.columns)
print(f"There are {rows} rows and {columns} columns in this data frame")

[Stage 425:>                                                        (0 + 8) / 8]

There are 1418979 rows and 7 columns in this data frame


                                                                                

In [71]:
# first 5 rows of filtered data frame
df.show( n = 5)

+--------------------+------------------+-------+---------+---------+----------+--------------------+
|   transit_timestamp|station_complex_id|borough|ridership| latitude| longitude|        Georeference|
+--------------------+------------------+-------+---------+---------+----------+--------------------+
|02/05/2023 10:00:...|              R252|      M|       56|  40.7906| -73.94748|POINT (-73.94748 ...|
|09/17/2022 11:00:...|              R252|      M|      333|  40.7906| -73.94748|POINT (-73.94748 ...|
|05/14/2022 10:00:...|              R170|      M|      239|40.799446|-73.968376|POINT (-73.968376...|
|05/03/2022 10:00:...|              H007|      M|      470|40.730953| -73.98163|POINT (-73.98163 ...|
|01/28/2023 06:00:...|              H007|      M|     1450|40.730953| -73.98163|POINT (-73.98163 ...|
+--------------------+------------------+-------+---------+---------+----------+--------------------+
only showing top 5 rows



In [72]:
# number of null rows in each column
df.select([count(when(col(column_name).isNull(), column_name)).alias(column_name) for column_name in df.columns]).show()




+-----------------+------------------+-------+---------+--------+---------+------------+
|transit_timestamp|station_complex_id|borough|ridership|latitude|longitude|Georeference|
+-----------------+------------------+-------+---------+--------+---------+------------+
|                0|                 0|      0|        0|       0|        0|           0|
+-----------------+------------------+-------+---------+--------+---------+------------+



                                                                                

- There are no null rows in this data frame

# Analysing Columns
## Sources
- https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/
- https://stackoverflow.com/questions/62602720/string-to-date-migration-from-spark-2-0-to-3-0-gives-fail-to-recognize-eee-mmm
- https://sparkbyexamples.com/pyspark/pyspark-max-different-methods-explained/?expand_article=1&expand_article=1

## Timestamp

In [73]:
# setting parser to legacy mode so code works like it did befor spark upgrade
spark.conf.set("spark.sql.leagacy.timeParserPolicy", "LEGACY")
# converting timestamp column to timestamp data types
df= df.withColumn("transit_timestamp", to_timestamp(col("transit_timestamp"), "MM/dd/yyyy hh:mm:ss a"))



In [45]:
df.dtypes

[('transit_timestamp', 'timestamp'),
 ('station_complex_id', 'string'),
 ('borough', 'string'),
 ('ridership', 'int'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('Georeference', 'string')]

In [48]:
# timestamp range
timestamp_range = df.select(max("transit_timestamp").alias("max transit timestamp"), 
                                min("transit_timestamp").alias("min transit timestamp")
                                  ).show()



+---------------------+---------------------+
|max transit timestamp|min transit timestamp|
+---------------------+---------------------+
|  2023-06-14 00:00:00|  2022-02-01 00:00:00|
+---------------------+---------------------+



                                                                                

- Some rows are outside the permitted timestamp range

In [46]:
# correct timestamp range
start_timestamp = "2022-02-01 00:00:00"
end_timestamp = "2023-03-10 23:59:59"

In [49]:
correct_timestamp = df.filter(
    (col("transit_timestamp") >= start_timestamp) & (col("transit_timestamp") <= end_timestamp)

)
correct_timestamp_rows = correct_timestamp.count()
print(f" There are {correct_timestamp_rows} within the permittted range")



 There are 1148138 within the permittted range


                                                                                

In [50]:
# total rows 
rows = df.count()
# percentage of rows within the permitted timestamp range
correct_timestamp_percent = (correct_timestamp_rows/rows) * 100
print(f"The percentage of rows with the permitted timestamp range is {correct_timestamp_percent}%")

[Stage 313:>                                                        (0 + 8) / 8]

The percentage of rows with the permitted timestamp range is 80.91296629477955%


                                                                                

- Over 80% of the rows are within the permitted timestamp range

### Cutting off rows outside permitted timestamp so that the data aligns with that datetime columns of the taxi data

In [51]:
df = df.filter(
    (col("transit_timestamp") >= start_timestamp) & (col("transit_timestamp") <= end_timestamp)

)

In [52]:
# shape of filtered data frame 
rows = df.count()
columns = len(df.columns)
print(f"There are {rows} rows and {columns} columns in this data frame")



There are 1148138 rows and 7 columns in this data frame




In [53]:
# timestamp range
timestamp_range = df.select(max("transit_timestamp").alias("max transit timestamp"), 
                                min("transit_timestamp").alias("min transit timestamp")
                                  ).show()



+---------------------+---------------------+
|max transit timestamp|min transit timestamp|
+---------------------+---------------------+
|  2023-03-10 23:00:00|  2022-02-01 00:00:00|
+---------------------+---------------------+



                                                                                

In [54]:
# first 5 rows of filtered data frame
df.show(n = 5)

+-------------------+------------------+-------+---------+---------+----------+--------------------+
|  transit_timestamp|station_complex_id|borough|ridership| latitude| longitude|        Georeference|
+-------------------+------------------+-------+---------+---------+----------+--------------------+
|2023-02-05 22:00:00|              R252|      M|       56|  40.7906| -73.94748|POINT (-73.94748 ...|
|2022-09-17 11:00:00|              R252|      M|      333|  40.7906| -73.94748|POINT (-73.94748 ...|
|2022-05-14 22:00:00|              R170|      M|      239|40.799446|-73.968376|POINT (-73.968376...|
|2022-05-03 22:00:00|              H007|      M|      470|40.730953| -73.98163|POINT (-73.98163 ...|
|2023-01-28 18:00:00|              H007|      M|     1450|40.730953| -73.98163|POINT (-73.98163 ...|
+-------------------+------------------+-------+---------+---------+----------+--------------------+
only showing top 5 rows



## Ridership

In [55]:
# ridership range
ridership_range = timestamp = df.select(max("ridership").alias("max ridership"), 
                                min("ridership").alias("min ridership")
                                  ).show()



+-------------+-------------+
|max ridership|min ridership|
+-------------+-------------+
|        22687|            1|
+-------------+-------------+





- As show above minimum ridership is within the permitted range. The maximum is complex to caluculate as each station has multiple routes which operate multiple times within one hour. However a maxium ridership of 24,840 is acceptable as the NYC subway has millions of daily riders
- soucrce given below
- https://new.mta.info/agency/new-york-city-transit/subway-bus-ridership-2021

In [56]:
# final data frame shape
rows = df.count()
columns = len(df.columns)
print(f"There are {rows} rows and {columns} columns in this data frame")



There are 1148138 rows and 7 columns in this data frame


                                                                                

In [58]:
# saving as a csv file 
df.coalesce(1).write.csv("cleaned_subway.csv", header = True)

                                                                                

In [59]:
# saving as a parquet file 
df.coalesce(1).write.parquet("cleaned_subway.parquet")

                                                                                