In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType, LongType

In [112]:
from pyspark.sql import functions as F

In [27]:
from pyspark.sql import Row
from pyspark.sql.types import *

In [2]:
def create_spark_session():
    """
    creates sparks session 
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    return spark

In [12]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [13]:
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [38]:
spark.version

'3.0.1'

# Load Data From Local

In [19]:
df = spark.read.csv("cv19_data/202011-citibike-tripdata.csv",header=True)

In [23]:
df.limit(5).toPandas()

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
0,521,2020-11-01 00:00:07.0150,2020-11-01 00:08:48.3010,3467,W Broadway & Spring St,40.72494672359416,-74.00165855884552,350,Clinton St & Grand St,40.71559509,-73.9870295,40405,Subscriber,1989,1
1,492,2020-11-01 00:00:10.8080,2020-11-01 00:08:23.3170,3557,40 Ave & 9 St,40.75742,-73.945133,3557,40 Ave & 9 St,40.75742,-73.945133,46504,Subscriber,1970,2
2,1979,2020-11-01 00:00:14.7040,2020-11-01 00:33:14.1640,3085,Roebling St & N 4 St,40.71469,-73.95739,3854,Morgan Ave & Maspeth Ave,40.716657,-73.93637,37452,Subscriber,1989,2
3,2382,2020-11-01 00:00:14.7070,2020-11-01 00:39:57.0030,3783,Cliff St & Fulton St,40.70838,-74.00495,3167,Amsterdam Ave & W 73 St,40.77966809007312,-73.98093044757842,40417,Subscriber,1981,1
4,166,2020-11-01 00:00:15.9690,2020-11-01 00:03:02.8980,422,W 59 St & 10 Ave,40.770513,-73.988038,3175,W 70 St & Amsterdam Ave,40.77748046,-73.98288594,35776,Subscriber,1990,1


# Fix Columns

In [33]:
df.printSchema()

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: string (nullable = true)



In [31]:
df.schema.names

['tripduration',
 'starttime',
 'stoptime',
 'start station id',
 'start station name',
 'start station latitude',
 'start station longitude',
 'end station id',
 'end station name',
 'end station latitude',
 'end station longitude',
 'bikeid',
 'usertype',
 'birth year',
 'gender']

In [35]:
from pyspark.sql.functions import to_timestamp

In [42]:
df["stoptime"].cast(DateType())

Column<b'CAST(stoptime AS DATE)'>

In [48]:
# turns string into timestamp
df.limit(5).select(df.stoptime.cast(TimestampType()).alias('datetime')).collect()

[Row(datetime=datetime.datetime(2020, 11, 1, 0, 8, 48, 301000)),
 Row(datetime=datetime.datetime(2020, 11, 1, 0, 8, 23, 317000)),
 Row(datetime=datetime.datetime(2020, 11, 1, 0, 33, 14, 164000)),
 Row(datetime=datetime.datetime(2020, 11, 1, 0, 39, 57, 3000)),
 Row(datetime=datetime.datetime(2020, 11, 1, 0, 3, 2, 898000))]

In [51]:
# turns string into timestamp
df.select(df.stoptime.cast(TimestampType()).collect())

TypeError: 'Column' object is not callable

In [56]:
# save column
df = df.withColumn("stoptime_ts",to_timestamp(col("stoptime")))

In [58]:
df.printSchema()

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- stoptime_ts: timestamp (nullable = true)



### Get Column With Date

In [70]:
# Add column to pyspark dataframe
# https://towardsdatascience.com/5-ways-to-add-a-new-column-in-a-pyspark-dataframe-4e75c2fd8c08

In [91]:
# correct code
df.select(date_format('stoptime_ts', 'MM/dd/yyy').alias('date')).collect()

In [88]:
from pyspark.sql.functions import lit

In [96]:
# create date column
df = df.withColumn("stoptime_date",date_format('stoptime_ts','MM/dd/yyy'))

### Get Column With Time

In [106]:
# create time column
df = df.withColumn('stoptime_time', date_format('stoptime_ts', 'HH:mm:ss'))

### Get Zip Code From Latitude and Longitude

In [206]:
df.select(F.countDistinct("end station name")).show()

+--------------------------------+
|count(DISTINCT end station name)|
+--------------------------------+
|                            1161|
+--------------------------------+



In [132]:
# get list of unique station names
station_names = [i for i in df.select('end station name').distinct().collect()]

In [207]:
len(station_names)

1161

In [144]:
# turn list of row objects into list of names
station_names_clean = [station_names[i]['end station name'] for i in range(len(station_names))]

In [208]:
len(station_names_clean)

1161

In [185]:
# Get the first row of matching station name latitude
df[df['end station name'] == station_names_clean[0]].first()['end station latitude']

'40.6686273'

In [186]:
# Get the first row of matching station name longitude
df[df['end station name'] == station_names_clean[0]].first()['end station longitude']

'-73.98700053'

In [199]:
# create empty list of stations
station_list = []

# create dictionary with name, latitude, and longitude
for i in station_names_clean:
    station_dict= {"station_name": i,
    "station_latitude": df[df['end station name'] == i].first()['end station latitude'],
    "station_longitude": df[df['end station name'] == i].first()['end station longitude']}
    station_list.append(station_dict)

In [201]:
station_table = pd.DataFrame(station_list)

In [205]:
station_table.to_csv("station_table.csv")

In [209]:
station_table

Unnamed: 0,station_name,station_latitude,station_longitude
0,10 St & 5 Ave,40.6686273,-73.98700053
1,College Ave & E 170 St,40.837576,-73.910489
2,45 Rd & 11 St,40.74708586,-73.94977234
3,N 11 St & Kent Ave,40.72248188638219,-73.95921930670738
4,E 141 St & Jackson Ave,40.806896,-73.911837
...,...,...,...
1156,Crescent St & 34 Ave,40.76108,-73.930562
1157,6 Ave & Canal St,40.72243797,-74.00566443
1158,Kosciuszko St & Tompkins Ave,40.69128258,-73.9452416
1159,37 Ave & 35 St,40.7531106,-73.9279917


### Convert Lat and Longitude to Address

In [116]:
from geopy.geocoders import Nominatim

In [117]:
geolocator = Nominatim(user_agent="my-application-john")
location = geolocator.reverse("40.71559509, -73.9870295")

In [214]:
location.raw['address']['postcode']

'10002'

In [240]:
# reset index to manipulate data by column name
station_table.reset_index(inplace=True)

In [248]:
# drop old index column
station_table.drop(columns="index", inplace=True)

In [252]:
# crate column named zip
station_table["zip"]=0

In [257]:
station_table.loc[0]['station_latitude']

'40.6686273'

In [258]:
station_table.loc[0]['station_longitude']

'-73.98700053'

In [260]:
# test conversion
lat = station_table.loc[1]['station_latitude']
long = station_table.loc[1]['station_longitude']
location = geolocator.reverse(f"{lat}, {long}")

In [267]:
location.raw

{'place_id': 32318567,
 'licence': 'Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright',
 'osm_type': 'node',
 'osm_id': 2806671743,
 'lat': '40.8376939',
 'lon': '-73.9103456',
 'display_name': '1400, College Avenue, The Bronx, Bronx County, New York, 10456, United States',
 'address': {'house_number': '1400',
  'road': 'College Avenue',
  'suburb': 'The Bronx',
  'neighbourhood': 'Bronx County',
  'city': 'New York',
  'state': 'New York',
  'postcode': '10456',
  'country': 'United States',
  'country_code': 'us'},
 'boundingbox': ['40.8376439', '40.8377439', '-73.9103956', '-73.9102956']}

In [272]:
location.raw['address']['postcode']

'10456'

In [277]:
station_table

Unnamed: 0,station_name,station_latitude,station_longitude,zip
0,10 St & 5 Ave,40.6686273,-73.98700053,0
1,College Ave & E 170 St,40.837576,-73.910489,0
2,45 Rd & 11 St,40.74708586,-73.94977234,0
3,N 11 St & Kent Ave,40.72248188638219,-73.95921930670738,0
4,E 141 St & Jackson Ave,40.806896,-73.911837,0
...,...,...,...,...
1156,Crescent St & 34 Ave,40.76108,-73.930562,0
1157,6 Ave & Canal St,40.72243797,-74.00566443,0
1158,Kosciuszko St & Tompkins Ave,40.69128258,-73.9452416,0
1159,37 Ave & 35 St,40.7531106,-73.9279917,0


In [284]:
len(stations_list_with_zip)

274

In [286]:
# stations_list_with_zip = []
stations_list_with_address = []

In [285]:
range(len(station_table))[274:]

range(274, 1161)

In [292]:
len(stations_list_with_zip)

1161

In [294]:
len(stations_list_with_address)

4

In [291]:
# put everything together
for i in range(len(station_table))[1066:]:
    try:
        name = station_table.loc[i]['station_name']
        lat = station_table.loc[i]['station_latitude']
        long = station_table.loc[i]['station_longitude']
        location = geolocator.reverse(f"{lat}, {long}")
        zip_code = location.raw['address']['postcode']
        station_dictionary = {
            "station_name": name,
            "station_latitude": lat,
            "station_longitude": long,
            "station_zip": zip_code
        }
        stations_list_with_zip.append(station_dictionary)
    except:
        name = station_table.loc[i]['station_name']
        lat = station_table.loc[i]['station_latitude']
        long = station_table.loc[i]['station_longitude']
        location = geolocator.reverse(f"{lat}, {long}")
        address = location.raw['address']
        station_dictionary = {
            "station_name": name,
            "station_latitude": lat,
            "station_longitude": long,
            "station_address": address}
        stations_list_with_address.append(station_dictionary)
        
    
    
    

In [297]:
stations_table_w_zip = pd.DataFrame(stations_list_with_zip)

In [298]:
stations_table_w_zip.to_csv("stations_table_w_zip.csv")

In [324]:
type(stations_table_w_zip["station_zip"][0])

str

In [315]:
test_df['station_zip']=0

In [326]:
test_df['station_zip'][0]="10043"

0    10043
1        0
2        0
3        0
Name: station_zip, dtype: int64

In [330]:
test_df['station_zip'][1]="10019"

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['station_zip'][1]="10019"


In [332]:
test_df['station_zip'][2]="10019"

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['station_zip'][2]="10019"


In [333]:
test_df['station_zip'][3]="10003"

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['station_zip'][3]="10003"


In [306]:
test_df = pd.DataFrame(stations_list_with_address)

In [337]:
test_df

Unnamed: 0,station_name,station_latitude,station_longitude,station_address,station_zip
0,South St & Gouverneur Ln,40.70355377,-74.00670227,"{'man_made': 'Pier 11', 'road': 'Pier 11', 'ne...",10043
1,W 55 St & 6 Ave,40.763189,-73.978434,"{'neighbourhood': 'Midtown', 'suburb': 'Manhat...",10019
2,W 52 St & 6 Ave,40.76132983124814,-73.97982001304626,"{'neighbourhood': 'Theater District', 'suburb'...",10019
3,Cooper Square & Astor Pl,40.72951496224949,-73.99075269699097,"{'neighbourhood': 'East Village', 'suburb': 'M...",10003


In [339]:
test_df.drop(columns=["station_address"], inplace=True)

In [340]:
test_df

Unnamed: 0,station_name,station_latitude,station_longitude,station_zip
0,South St & Gouverneur Ln,40.70355377,-74.00670227,10043
1,W 55 St & 6 Ave,40.763189,-73.978434,10019
2,W 52 St & 6 Ave,40.76132983124814,-73.97982001304626,10019
3,Cooper Square & Astor Pl,40.72951496224949,-73.99075269699097,10003


# Test

In [119]:
df.limit(5).toPandas()

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender,stoptime_ts,stoptime_date,stoptime_time
0,521,2020-11-01 00:00:07.0150,2020-11-01 00:08:48.3010,3467,W Broadway & Spring St,40.72494672359416,-74.00165855884552,350,Clinton St & Grand St,40.71559509,-73.9870295,40405,Subscriber,1989,1,2020-11-01 00:08:48.301,11/01/2020,00:08:48
1,492,2020-11-01 00:00:10.8080,2020-11-01 00:08:23.3170,3557,40 Ave & 9 St,40.75742,-73.945133,3557,40 Ave & 9 St,40.75742,-73.945133,46504,Subscriber,1970,2,2020-11-01 00:08:23.317,11/01/2020,00:08:23
2,1979,2020-11-01 00:00:14.7040,2020-11-01 00:33:14.1640,3085,Roebling St & N 4 St,40.71469,-73.95739,3854,Morgan Ave & Maspeth Ave,40.716657,-73.93637,37452,Subscriber,1989,2,2020-11-01 00:33:14.164,11/01/2020,00:33:14
3,2382,2020-11-01 00:00:14.7070,2020-11-01 00:39:57.0030,3783,Cliff St & Fulton St,40.70838,-74.00495,3167,Amsterdam Ave & W 73 St,40.77966809007312,-73.98093044757842,40417,Subscriber,1981,1,2020-11-01 00:39:57.003,11/01/2020,00:39:57
4,166,2020-11-01 00:00:15.9690,2020-11-01 00:03:02.8980,422,W 59 St & 10 Ave,40.770513,-73.988038,3175,W 70 St & Amsterdam Ave,40.77748046,-73.98288594,35776,Subscriber,1990,1,2020-11-01 00:03:02.898,11/01/2020,00:03:02


In [None]:
# long way
# https://stackoverflow.com/questions/48164206/pyspark-adding-a-column-from-a-list-of-values-using-a-udf