In [65]:
from urllib import request
from io import StringIO
import csv
import os
import pandas as pd
from urllib.request import urlretrieve


In [67]:
# Read Turnstile data from mta.info

datetime=200425

try:
    os.mkdir("output")
    os.mkdir("sources")
except:
    pass

url='http://web.mta.info/developers/data/nyct/turnstile/turnstile_'+str(datetime)+'.txt'

desfile="sources/turnstile_"+str(datetime)+".txt"
print("Read from url:%s \r\ndesfile:%s" % (url,desfile))

urlretrieve(url, desfile)

Read from url:http://web.mta.info/developers/data/nyct/turnstile/turnstile_200425.txt 
desfile:sources/turnstile_200425.txt


('sources/turnstile_200425.txt', <http.client.HTTPMessage at 0x7f9995e20898>)

In [68]:
from pyspark import SparkContext 
from pyspark.sql import SQLContext
import pyspark.sql.functions as F


In [69]:
# Clean data in turnstile.txt. Calculate the ridership for each station.

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('turnstile_200425.txt')

df  = df.withColumnRenamed("EXITS                                                               ", "EXITS")

df = df.withColumn('result', sum(df[col] for col in ['ENTRIES','EXITS']).cast('int'))
df = df.groupBy('STATION','DIVISION').sum('result').withColumnRenamed("sum(result)", "RIDESHIP (ENTRIES+EXITS)")

df = df.withColumn("STATION_low", F.lower(df.STATION))
df.show()

df.toPandas().to_csv('output/rideship.csv',mode='a', header=False)


+---------------+--------+------------------------+---------------+
|        STATION|DIVISION|RIDESHIP (ENTRIES+EXITS)|    STATION_low|
+---------------+--------+------------------------+---------------+
|    ATLANTIC AV|     BMT|               227760079|    atlantic av|
|   81 ST-MUSEUM|     IND|              3332713649|   81 st-museum|
|BEDFORD PK BLVD|     IRT|               200125765|bedford pk blvd|
|    SARATOGA AV|     IRT|              5388787665|    saratoga av|
|CONEY IS-STILLW|     BMT|             42117196805|coney is-stillw|
|  EAST BROADWAY|     IND|              8487077797|  east broadway|
| EXCHANGE PLACE|     PTH|               251225554| exchange place|
|          96 ST|     IND|              2519334106|          96 st|
|         168 ST|     IRT|              1090784537|         168 st|
|      BERGEN ST|     IRT|              1253457672|      bergen st|
| CATHEDRAL PKWY|     IRT|              1498803682| cathedral pkwy|
|          65 ST|     IND|              16502455

In [70]:
# Clean data in Station.csv. As each station may have several entrances, So calculate the average Latitude and Longitude for these 
# stations with several entrances.

df2 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Stations.csv')
df2 = df2.groupBy('Division','Stop Name').mean('GTFS Latitude','GTFS Longitude').withColumnRenamed("avg(GTFS Latitude)", "GTFS Latitude").withColumnRenamed("avg(GTFS Longitude)", "GTFS Longitude").withColumnRenamed("Stop Name", "STATION")

df2 = df2.withColumn("STATION_low", F.lower(df2.STATION))
df2.show()

df2.toPandas().to_csv('output/stationLocation.csv')


+--------+--------------------+------------------+------------------+--------------------+
|Division|             STATION|     GTFS Latitude|    GTFS Longitude|         STATION_low|
+--------+--------------------+------------------+------------------+--------------------+
|     IRT|         New Lots Av|         40.666235|        -73.884079|         new lots av|
|     IRT|               28 St|        40.7451425|-73.98881449999999|               28 st|
|     IRT|Eastern Pkwy - Br...|         40.671987|        -73.964375|eastern pkwy - br...|
|     BMT|  Lexington Av/59 St|          40.76266|        -73.967258|  lexington av/59 st|
|     BMT|               18 Av|        40.6143125|-73.99607499999999|               18 av|
|     IND|                7 Av|40.714566500000004|-73.98097100000001|                7 av|
|     IND|           Ditmas Av|         40.636119|        -73.978172|           ditmas av|
|     IRT|         Winthrop St|         40.656652|          -73.9502|         winthrop st|

In [71]:
ridership_df = df
ridership_df.dtypes

[('STATION', 'string'),
 ('DIVISION', 'string'),
 ('RIDESHIP (ENTRIES+EXITS)', 'bigint'),
 ('STATION_low', 'string')]

In [72]:
location_df = df2
location_df.dtypes

[('Division', 'string'),
 ('STATION', 'string'),
 ('GTFS Latitude', 'double'),
 ('GTFS Longitude', 'double'),
 ('STATION_low', 'string')]

In [73]:
# Find the station names that are same in two lists.

first_left_join = df2.join(df, (df.DIVISION == df2.Division) & (df.STATION_low == df2.STATION_low),how='left')

first_left_join.show()
first_left_join.toPandas().to_csv('output/left_join.csv')


+--------+--------------------+------------------+------------------+--------------------+---------------+--------+------------------------+---------------+
|Division|             STATION|     GTFS Latitude|    GTFS Longitude|         STATION_low|        STATION|DIVISION|RIDESHIP (ENTRIES+EXITS)|    STATION_low|
+--------+--------------------+------------------+------------------+--------------------+---------------+--------+------------------------+---------------+
|     IRT|         New Lots Av|         40.666235|        -73.884079|         new lots av|    NEW LOTS AV|     IRT|              2814687477|    new lots av|
|     IRT|               28 St|        40.7451425|-73.98881449999999|               28 st|          28 ST|     IRT|            101973516545|          28 st|
|     IRT|Eastern Pkwy - Br...|         40.671987|        -73.964375|eastern pkwy - br...|           null|    null|                    null|           null|
|     BMT|  Lexington Av/59 St|          40.76266|        