# DEVELHOPE GROUP PROJECT

### FORD GOBIKE TRIPS
The dataset contains trip information for every trip taken on the system, including the start and end station, the trip duration,and user information such as age and gender.


In [1]:
import findspark
findspark.init()

In [2]:
findspark.find()

'c:\\Users\\zakria\\miniconda3\\lib\\site-packages\\pyspark'

In [3]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,count,isnan,to_timestamp,unix_timestamp

In [5]:
spark = SparkSession.builder.master("local[*]").appName("Pyspark_Project").getOrCreate()
spark

In [6]:
df=spark.read.options(header='True').csv("2017-fordgobike-tripdata.csv")

In [7]:
df.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: string (nullable = true)
 |-- start_station_longitude: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: string (nullable = true)
 |-- end_station_longitude: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: string (nullable = true)
 |-- member_gender: string (nullable = true)
 |-- pyment: string (nullable = true)



In [8]:
df.schema.names

['start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'member_birth_year',
 'member_gender',
 'pyment']

In [9]:
df=df.dropna()

### Calculating the distance covered for each ride from start to end station using haversine library

In [10]:
from haversine import haversine, Unit

haversine([45.7597, 4.8422], [48.8567, 2.3508])

392.2172595594006

In [11]:
import pyspark.pandas as ps

def getDistance(start_lat, start_long, end_lat, end_long):
    return round(
        haversine(
            [ float(start_lat), float(start_long) ],
            [ float(end_lat), float(end_long) ],
            unit=Unit.METERS
        ), 2
    )



In [12]:
from pyspark.sql.functions import udf

getDistanceUDF = udf(lambda a,b,c,d : getDistance(a,b,c,d) )
df = df.withColumn("Distance", getDistanceUDF(df["start_station_latitude"], df["start_station_longitude"], df["end_station_latitude"], df["end_station_longitude"]))

In [13]:
df.select('Distance').show()

+--------+
|Distance|
+--------+
|  942.93|
| 3069.73|
|  636.34|
|     0.0|
| 1517.35|
| 1517.35|
| 1050.26|
| 1050.26|
| 2856.34|
| 2859.25|
|  884.97|
| 1808.37|
|  623.17|
|  623.17|
| 1790.15|
|  1860.3|
|  344.66|
|  912.03|
|  751.84|
| 1961.31|
+--------+
only showing top 20 rows



### Calculating the duration in seconds for each trip using start and end times of each ride 

In [14]:
from pyspark.sql.functions import col, to_timestamp, abs, expr,year

df = df.withColumn("start_time", to_timestamp(col("start_time"), "mm:ss.SSSS").cast("timestamp"))
df = df.withColumn("end_time", to_timestamp(col("end_time"), "mm:ss.SSSS").cast("timestamp"))
df = df.withColumn("trip_duration_seconds", expr("abs(unix_timestamp(end_time) - unix_timestamp(start_time))"))

In [17]:
df.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: string (nullable = true)
 |-- start_station_longitude: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: string (nullable = true)
 |-- end_station_longitude: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: string (nullable = true)
 |-- member_gender: string (nullable = true)
 |-- pyment: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- trip_duration_seconds: long (nullable = true)



In [18]:
df.select('trip_duration_seconds').show(truncate=False)

+---------------------+
|trip_duration_seconds|
+---------------------+
|2689                 |
|399                  |
|404                  |
|2693                 |
|1417                 |
|1429                 |
|2056                 |
|2126                 |
|2068                 |
|2383                 |
|3214                 |
|3178                 |
|871                  |
|734                  |
|782                  |
|475                  |
|152                  |
|250                  |
|243                  |
|833                  |
+---------------------+
only showing top 20 rows



### Calcualting the fee in cents for each ride using trip duration and supposing 0.35 cents per minute of distance traveled.

In [19]:
df = df.withColumn("fee", expr("trip_duration_seconds * 0.35 / 60"))

In [20]:
df.select('fee').show()

+---------+
|      fee|
+---------+
|15.685833|
| 2.327500|
| 2.356667|
|15.709167|
| 8.265833|
| 8.335833|
|11.993333|
|12.401667|
|12.063333|
|13.900833|
|18.748333|
|18.538333|
| 5.080833|
| 4.281667|
| 4.561667|
| 2.770833|
| 0.886667|
| 1.458333|
| 1.417500|
| 4.859167|
+---------+
only showing top 20 rows

