In [25]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("bike").getOrCreate()
df = spark.read.option("header","true").csv("2017-fordgobike-tripdata.csv")

In [26]:
type(df)

pyspark.sql.dataframe.DataFrame

In [2]:
df.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: string (nullable = true)
 |-- start_station_longitude: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: string (nullable = true)
 |-- end_station_longitude: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: string (nullable = true)
 |-- member_gender: string (nullable = true)
 |-- pyment: string (nullable = true)



In [2]:
# !pip install haversine

Collecting haversine
  Downloading haversine-2.7.0-py2.py3-none-any.whl (6.9 kB)
Installing collected packages: haversine
Successfully installed haversine-2.7.0


<ul>
<li>calculate distance of each trip using haversine library and add the result to the dataset</li>
<li>calculate the duration in seconds of each trip</li>
<li>by assuming each minute cost 0.35 cent calculate the fee for each trip</li>
</ul>

## 1-calculate distance of each trip using haversine library and add the result to the dataset

In [6]:
from haversine import haversine, Unit

haversine([45.7597, 4.8422], [48.8567, 2.3508])

392.2172595594006

In [35]:
import pyspark.pandas as ps

def getDistance(start_lat, start_long, end_lat, end_long):
    return round(
        haversine(
            [ float(start_lat), float(start_long) ],
            [ float(end_lat), float(end_long) ],
            unit=Unit.METERS
        ), 2
    )


In [36]:
from pyspark.sql.functions import udf

getDistanceUDF = udf(lambda a,b,c,d : getDistance(a,b,c,d) )
df = df.withColumn("Distance", getDistanceUDF(df["start_station_latitude"], df["start_station_longitude"], df["end_station_latitude"], df["end_station_longitude"]))

In [37]:
df.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: string (nullable = true)
 |-- start_station_longitude: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: string (nullable = true)
 |-- end_station_longitude: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: string (nullable = true)
 |-- member_gender: string (nullable = true)
 |-- pyment: string (nullable = true)
 |-- Distance: string (nullable = true)



In [40]:
df.select("Distance").show()

+--------+
|Distance|
+--------+
|  942.93|
| 3069.73|
|     0.0|
| 1046.62|
|  636.34|
|     0.0|
|     0.0|
|     0.0|
|     0.0|
|     0.0|
|   771.3|
|   771.3|
| 1517.35|
| 1517.35|
|  1422.6|
| 1050.26|
| 1050.26|
| 2856.34|
| 2856.34|
| 2859.25|
+--------+
only showing top 20 rows



## 2-calculate the duration in seconds of each trip

In [73]:

from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

df = df.withColumn("starttime", unix_timestamp(col("start_time"), "mm:ss.S").cast(TimestampType()))
df = df.withColumn("endtime", unix_timestamp(col("end_time"), "mm:ss.S").cast(TimestampType()))

df = df.withColumn("duration", expr("if(\
(unix_timestamp(endtime) - unix_timestamp(starttime)) >= 0, \
(unix_timestamp(endtime) - unix_timestamp(starttime)), \
(unix_timestamp(endtime) + 3600 - unix_timestamp(starttime)))").cast("integer"))
df.select(["starttime","endtime","duration"]).show()


+-------------------+-------------------+--------+
|          starttime|            endtime|duration|
+-------------------+-------------------+--------+
|1970-01-01 00:57:40|1970-01-01 00:12:51|     911|
|1970-01-01 00:56:35|1970-01-01 00:49:56|    3201|
|1970-01-01 00:45:49|1970-01-01 00:28:37|    2568|
|1970-01-01 00:31:11|1970-01-01 00:47:24|     973|
|1970-01-01 00:23:14|1970-01-01 00:29:58|     404|
|1970-01-01 00:51:01|1970-01-01 00:24:48|    2027|
|1970-01-01 00:49:29|1970-01-01 00:04:36|     907|
|1970-01-01 00:46:38|1970-01-01 00:58:52|     734|
|1970-01-01 00:37:08|1970-01-01 00:46:19|     551|
|1970-01-01 00:35:39|1970-01-01 00:46:18|     639|
|1970-01-01 00:46:33|1970-01-01 00:41:25|    3292|
|1970-01-01 00:48:12|1970-01-01 00:41:10|    3178|
|1970-01-01 00:52:56|1970-01-01 00:29:19|    2183|
|1970-01-01 00:52:56|1970-01-01 00:29:07|    2171|
|1970-01-01 00:35:24|1970-01-01 00:20:21|    2697|
|1970-01-01 00:53:39|1970-01-01 00:19:23|    1544|
|1970-01-01 00:54:41|1970-01-01

## 3-by assuming each minute cost 0.35 cent calculate the fee for each trip

In [70]:

df = df.withColumn("duration", expr("if(\
(unix_timestamp(endtime) - unix_timestamp(starttime)) >= 0, \
(unix_timestamp(endtime) - unix_timestamp(starttime)), \
(unix_timestamp(endtime) + 3600 - unix_timestamp(starttime)))").cast("timestamp"))

df = df.withColumn("duration_second", minute(col("duration")))
df = df.withColumn("fee_of_trip", bround(col("duration_second") * 0.35, 2))
df.select(["duration_second","fee_of_trip"]).show()

+---------------+-----------+
|duration_second|fee_of_trip|
+---------------+-----------+
|             15|       5.25|
|             53|      18.55|
|             42|       14.7|
|             16|        5.6|
|              6|        2.1|
|             33|      11.55|
|             15|       5.25|
|             12|        4.2|
|              9|       3.15|
|             10|        3.5|
|             54|       18.9|
|             52|       18.2|
|             36|       12.6|
|             36|       12.6|
|             44|       15.4|
|             25|       8.75|
|             24|        8.4|
|             23|       8.05|
|             25|       8.75|
|             20|        7.0|
+---------------+-----------+
only showing top 20 rows

