In [111]:
from typing import List

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, udf

In [2]:
spark = SparkSession.builder.appName("CitiBike").getOrCreate()

Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-azure added as a dependency
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1c68ae2c-ca1c-49b1-ad7f-9c0676463eec;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-azure;3.2.1 in central
	found org.apache.httpcomponents#httpclient;4.5.6 in central
	found org.apache.httpcomponents#httpcore;4.4.10 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.11 in central
	found com.microsoft.azure#azure-storage;7.0.0 in central
	found com.fasterxml.jackson.core#jackson-core;2.9.8 in central
	found org.slf4j#slf4j-api;1.7.25 in central
	found com.microsoft.azure#azure-keyvault-core;1.0.0 in central
	found com.google.guava#guava;27.0-jre in central
	found com.google.guava#failureaccess;1.0 in central
	found com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.checkerframework#checker-qual;2.5.2 in central
	found com.google.errorprone#error_prone_annotations;2.2.0 in central
	found com.google.j2objc#j2objc-annotations;1.1 in central
	found org.codehaus.mojo#animal-sniffer-annota

In [3]:
df = spark.read.format("csv").option("header", True).load("./data/citibike.csv")

In [4]:
df.printSchema()

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: string (nullable = true)



In [5]:
def sanitize_columns(columns: List[str]) -> List[str]:
    return [column.replace(" ", "_") for column in columns]

In [6]:
renamed_columns = sanitize_columns(df.columns)

In [7]:
df = df.toDF(*renamed_columns)

In [8]:
df.printSchema()

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: string (nullable = true)
 |-- start_station_longitude: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: string (nullable = true)
 |-- end_station_longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- gender: string (nullable = true)



In [15]:
df.head()

Row(tripduration='364', starttime='2017-07-01 00:00:00', stoptime='2017-07-01 00:06:05', start_station_id='539', start_station_name='Metropolitan Ave & Bedford Ave', start_station_latitude='40.71534825', start_station_longitude='-73.96024116', end_station_id='3107', end_station_name='Bedford Ave & Nassau Ave', end_station_latitude='40.72311651', end_station_longitude='-73.95212324', bikeid='14744', usertype='Subscriber', birth_year='1986', gender='1')

In [16]:
METERS_PER_FOOT = 0.3048
FEET_PER_MILE = 5280
EARTH_RADIUS_IN_METERS = 6371e3
METERS_PER_MILE = METERS_PER_FOOT * FEET_PER_MILE

In [23]:
!pip install haversine

Collecting haversine
  Downloading haversine-2.5.1-py2.py3-none-any.whl (6.1 kB)
Installing collected packages: haversine
Successfully installed haversine-2.5.1


In [116]:
from haversine import haversine, Unit

from math import radians, cos, sin, asin, sqrt

In [136]:
@udf(returnType=DoubleType())
def calculate_distance(start_latitude:float, start_longitude:float, end_latitude:float, end_longitude:float):
    
    start_latitude_rad, start_longitude_rad, end_latitude_rad, end_longitude_rad = map(radians, [start_latitude, start_longitude, end_latitude, end_longitude])

    d_long = end_longitude_rad - start_longitude_rad
    d_lat = end_latitude_rad - start_latitude_rad
    
    distance = sin(d_lat/2)**2 + cos(start_latitude_rad) * cos(end_latitude_rad) * sin(d_long/2)**2
    
    c = 2*asin(sqrt(distance))
    
    return (c * EARTH_RADIUS_IN_METERS) / METERS_PER_MILE

In [137]:
@udf(returnType=DoubleType())
def calculate(start_latitude:float, start_longitude:float, end_latitude:float, end_longitude:float):
    start = (start_latitude, start_longitude)
    end = (end_latitude, end_longitude)
    
    distance = haversine(start, end, unit=Unit.MILES)
    return distance

In [141]:
def compute_distance(_spark: SparkSession, dataframe: DataFrame) -> DataFrame:
    df = dataframe.withColumn("distance", calculate(col("start_station_latitude").cast(DoubleType()), col("start_station_longitude").cast(DoubleType()),
                                                             col("end_station_latitude").cast(DoubleType()), col("end_station_longitude").cast(DoubleType())))
    return df

In [139]:
dataset_with_distances = compute_distance(spark, df)

In [140]:
dataset_with_distances.toPandas()

Unnamed: 0,tripduration,starttime,stoptime,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bikeid,usertype,birth_year,gender,distance
0,364,2017-07-01 00:00:00,2017-07-01 00:06:05,539,Metropolitan Ave & Bedford Ave,40.71534825,-73.96024116,3107,Bedford Ave & Nassau Ave,40.72311651,-73.95212324,14744,Subscriber,1986,1,0.684692
1,2142,2017-07-01 00:00:03,2017-07-01 00:35:46,293,Lafayette St & E 8 St,40.73020660529954,-73.99102628231049,3425,2 Ave & E 104 St,40.7892105,-73.94370784,19587,Subscriber,1981,1,4.769980
2,328,2017-07-01 00:00:08,2017-07-01 00:05:37,3242,Schermerhorn St & Court St,40.69102925677968,-73.99183362722397,3397,Court St & Nelson St,40.6763947,-73.99869893,27937,Subscriber,1984,2,1.073226
3,2530,2017-07-01 00:00:11,2017-07-01 00:42:22,2002,Wythe Ave & Metropolitan Ave,40.716887,-73.963198,398,Atlantic Ave & Furman St,40.69165183,-73.9999786,26066,Subscriber,1985,1,2.598375
4,2534,2017-07-01 00:00:15,2017-07-01 00:42:29,2002,Wythe Ave & Metropolitan Ave,40.716887,-73.963198,398,Atlantic Ave & Furman St,40.69165183,-73.9999786,29408,Subscriber,1982,2,2.598375
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3595,419,2017-07-01 08:53:41,2017-07-01 09:00:41,522,E 51 St & Lexington Ave,40.75714758,-73.97207836,485,W 37 St & 5 Ave,40.75038009,-73.98338988,27623,Subscriber,1993,2,0.754422
3596,852,2017-07-01 08:53:45,2017-07-01 09:07:58,3167,Amsterdam Ave & W 73 St,40.77966809007312,-73.98093044757842,3362,Madison Ave & E 82 St,40.7781314,-73.96069399,27415,Subscriber,1990,1,1.064080
3597,364,2017-07-01 08:53:47,2017-07-01 08:59:52,3321,Clinton St & Union St,40.6831164,-73.99785267,3402,Court St & State St,40.6902375,-73.99203074,20702,Subscriber,1982,1,0.578900
3598,231,2017-07-01 08:53:48,2017-07-01 08:57:40,3463,E 16 St & Irving Pl,40.735367055605394,-73.98797392845154,253,W 13 St & 5 Ave,40.73543934,-73.99453948,15787,Subscriber,1988,1,0.343770
