# <center> Exploring the 2017 Ford GoBike Trip Data with PySpark 


# $\color{red}{\text{Content Outline:}}$

 ## 1-calculate distance of each trip using haversine library and add the result to the dataset
 ## 2-calculate the duration in seconds of each trip
 ## 3-by assuming each minute cost 0.35 cent calculate the fee for each trip
 ## 4-calculate the total distance for each bike and list the top 10
 ## 5-calculate the number of trips for each start station list top 10 and find the ratio of using as male or female
 ## 6-make a comparison to find the percentage of usage for customer and subscriber
 ## 7-calculate the age of all users and show the relation between the distance and the age
 ## 8-calculate the total cost for all customers and all subscribers
 ## 9- what is the ratio of payment using cc or app wallet
 ## 10-what is the preferred way to pay for customers and subscriber

 ## 1-calculate distance of each trip using haversine library and add the result to the dataset

In [4]:
import urllib.request
from pyspark.sql import SparkSession
from pyspark.sql.functions import sqrt

# Download the CSV file from the remote URL
url = "https://raw.githubusercontent.com/srjlsd/Exploring-the-2017-Ford-GoBike-Trip-Data-with-PySpark/15ccf0249d3da47c41400ca1cd6c092c308b6287/2017-fordgobike-trip-data.csv"
local_path = "2017-fordgobike-trip-data.csv"
urllib.request.urlretrieve(url, local_path)

# Initialize the SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Read the CSV file using PySpark
df = spark.read.csv(local_path, header=True, inferSchema=True)



In [5]:
#from pyspark.sql.functions import acos, cos, sin, radians, least
#from pyspark.sql.functions import acos, cos, sin, sqrt, lit
from pyspark.sql.functions import sin, cos, sqrt, asin, radians, least, acos, col, lit


def haversine(lat1, long1, lat2, long2):
    lat1, long1, lat2, long2 = radians(lat1), radians(long1), radians(lat2), radians(long2)
    dlat = lat2 - lat1
    dlon = long2 - long1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    m = 6367 * c * 1000
    return m


# Read the dataset into a Spark DataFrame
df = spark.read.csv(r'E:\Learning Pandas\Data_Manupilation\2017-fordgobike-tripdata.csv', header=True, inferSchema=True)

# Calculate the distance for each trip using the Haversine function
df = df.withColumn('distance_m', haversine('start_station_latitude', 'start_station_longitude', 'end_station_latitude', 'end_station_longitude'))

# Show the first 10 rows of the updated DataFrame
df.show(10)


+----------+--------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+------------------+
|start_time|end_time|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bike_id| user_type|member_birth_year|member_gender|     pyment|        distance_m|
+----------+--------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+-------+----------+-----------------+-------------+-----------+------------------+
|   57:39.7| 12:50.2|              74|Laguna St at Haye...|           37.77643482|            -122.426244|            43|San Francisco Pub...|          37.7787677|         -122.4159292|     96|  C

In [6]:
df.select('distance_m').show(10)

+------------------+
|        distance_m|
+------------------+
| 942.3373818591812|
|3067.7986569378118|
|               0.0|
|1045.9655184811572|
|  635.939861076868|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
+------------------+
only showing top 10 rows



In [7]:
from pyspark.sql.functions import percentile_approx

# Get the median trip distance
median_distance = df.selectExpr("percentile_approx(distance_m, 0.5)").collect()[0][0]

# Print the result
print("Median distance: {:.2f} meters".format(median_distance))

Median distance: 1398.85 meters


 ## 2-calculate the duration in seconds of each trip


In [5]:
from pyspark.sql.functions import expr, when, col

df = df.withColumn('start_time', expr("to_timestamp(start_time, 'mm:ss.S')"))
df = df.withColumn('end_time', expr("to_timestamp(end_time, 'mm:ss.S')"))

df = df.withColumn('duration_sec',
                   when(col('end_time') < col('start_time'),
                        (col('end_time').cast('long') + 3600 - col('start_time').cast('long')))
                   .otherwise(col('end_time').cast('long') - col('start_time').cast('long')))


In [6]:
df.printSchema() # _c0 is unneccery column

root
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: double (nullable = true)
 |-- start_station_longitude: double (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: double (nullable = true)
 |-- end_station_longitude: double (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- member_birth_year: integer (nullable = true)
 |-- member_gender: string (nullable = true)
 |-- pyment: string (nullable = true)
 |-- distance_m: double (nullable = true)
 |-- duration_sec: long (nullable = true)



In [7]:
df.select('duration_sec').show(10)

+------------+
|duration_sec|
+------------+
|         911|
|        3201|
|        2568|
|         973|
|         403|
|        2027|
|         907|
|         734|
|         551|
|         639|
+------------+
only showing top 10 rows



In [8]:
from pyspark.sql.functions import avg

# Calculate the average trip duration in seconds
avg_duration = df.agg(avg("duration_sec")).collect()[0][0]

# Print the result
print("The average trip duration is {:.2f} seconds".format(avg_duration))


The average trip duration is 738.29 seconds


In [9]:
from pyspark.sql.functions import col

# Get summary statistics for trip duration column
trip_duration_stats = df.select("duration_sec").describe().toPandas()

# Print the summary statistics
print(trip_duration_stats)


  summary       duration_sec
0   count             519700
1    mean  738.2905522416779
2  stddev  542.5280638280457
3     min                  0
4     max               3600


In [10]:
from pyspark.sql.functions import col

# Get summary statistics for trip duration column
trip_duration_stats = df.select("duration_sec").describe().toPandas()

# Print the summary statistics
print(trip_duration_stats)


  summary       duration_sec
0   count             519700
1    mean  738.2905522416779
2  stddev  542.5280638280457
3     min                  0
4     max               3600


## 3-by assuming each minute cost 0.35 cent calculate the fee for each trip

In [12]:
from pyspark.sql.functions import round

df = df.withColumn('fee', round(df.duration_sec/60 * 0.35, 2))

In [13]:
df.select('fee').show(10)

+-----+
|  fee|
+-----+
| 5.31|
|18.67|
|14.98|
| 5.68|
| 2.35|
|11.82|
| 5.29|
| 4.28|
| 3.21|
| 3.73|
+-----+
only showing top 10 rows



In [14]:
from pyspark.sql.functions import avg

# Calculate the average fee
avg_fee = df.agg(avg("fee")).collect()[0][0]

# Print the result
print("The average fee is:", avg_fee)




The average fee is: 4.306755609005224


In [15]:
from pyspark.sql.functions import col

# Get summary statistics for trip fee column
fee_description = df.select("fee").describe().toPandas()

# Print the summary statistics
print(fee_description)

  summary                 fee
0   count              519700
1    mean   4.306755609005224
2  stddev  3.1647523390669736
3     min                 0.0
4     max                21.0


## 4-calculate the total distance for each bike and list the top 10

In [16]:
# Group the data by bike_id and calculate the total distance for each bike
grouped_data = df.groupBy("bike_id").agg({"distance_m": "sum"})

# Rename the column from sum(Distance_in_meter) to total_distance
grouped_data = grouped_data.withColumnRenamed("sum(distance_m)", "total_distance")

# Round the total_distance column to 2 decimal places
grouped_data = grouped_data.withColumn("total_distance", round("total_distance", 2))

# Sort the data by total_distance in descending order and select the top 10
top_10 = grouped_data.sort(grouped_data.total_distance.desc()).limit(10)

# Show the results
top_10.show()



+-------+--------------+
|bike_id|total_distance|
+-------+--------------+
|     68|     742901.74|
|   2178|     720728.46|
|    256|     671493.31|
|    235|     669740.32|
|   2049|     656414.81|
|    441|      656229.1|
|   2226|      647415.6|
|    796|      646460.7|
|    190|     639891.29|
|   2365|     639010.34|
+-------+--------------+



In [24]:
import pandas as pd

# Convert the PySpark dataframe to a pandas dataframe
pandas_df = top_10.toPandas()

# Define custom style function
def style_table(df):
    return df.style.set_properties(**{'background-color': 'grey',
                                      'color': 'black',
                                      'font-weight': 'bold',
                                      'text-align': 'center'})\
                    .set_table_styles([{'selector': 'th',
                                        'props': [('border', '1px solid black'),
                                                  ('padding', '6px')]}])

# Apply custom styles
styled_table = style_table(pandas_df)

# Display the table
display(styled_table)


Unnamed: 0,bike_id,total_distance
0,68,742901.74
1,2178,720728.46
2,256,671493.31
3,235,669740.32
4,2049,656414.81
5,441,656229.1
6,2226,647415.6
7,796,646460.7
8,190,639891.29
9,2365,639010.34


## 5-calculate the number of trips for each start station list top 10 and find the ratio of using as male or female

In [27]:
from pyspark.sql.functions import count, sum
from pyspark.sql.window import Window

# Increase the column width
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", "500")
spark.conf.set("spark.sql.repl.eagerEval.maxNumCols", "500")

# Group the data by start station and count the number of trips
grouped_data = df.groupBy("start_station_name").agg(count("start_station_name").alias("number_of_trips"))

# Sort the data by number_of_trips in descending order and select the top 10
top_10 = grouped_data.sort(grouped_data.number_of_trips.desc()).limit(10)

# Show the results
top_10.show(truncate=False)


+---------------------------------------------------------+---------------+
|start_station_name                                       |number_of_trips|
+---------------------------------------------------------+---------------+
|San Francisco Ferry Building (Harry Bridges Plaza)       |15187          |
|The Embarcadero at Sansome St                            |13664          |
|San Francisco Caltrain (Townsend St at 4th St)           |12546          |
|San Francisco Caltrain Station 2  (Townsend St at 4th St)|12055          |
|Market St at 10th St                                     |11960          |
|Montgomery St BART Station (Market St at 2nd St)         |11334          |
|Berry St at 4th St                                       |10956          |
|Powell St BART Station (Market St at 4th St)             |10142          |
|Howard St at Beale St                                    |9926           |
|Steuart St at Market St                                  |9347           |
+-----------

In [28]:
import pandas as pd

# Convert the PySpark dataframe to a pandas dataframe
pandas_df = top_10.toPandas()

# Define custom style function
def style_table(df):
    return df.style.set_properties(**{'background-color': 'grey',
                                      'color': 'black',
                                      'font-weight': 'bold',
                                      'text-align': 'center'})\
                    .set_table_styles([{'selector': 'th',
                                        'props': [('border', '1px solid black'),
                                                  ('padding', '6px')]}])

# Apply custom styles
styled_table = style_table(pandas_df)

# Display the table
display(styled_table)

Unnamed: 0,start_station_name,number_of_trips
0,San Francisco Ferry Building (Harry Bridges Plaza),15187
1,The Embarcadero at Sansome St,13664
2,San Francisco Caltrain (Townsend St at 4th St),12546
3,San Francisco Caltrain Station 2 (Townsend St at 4th St),12055
4,Market St at 10th St,11960
5,Montgomery St BART Station (Market St at 2nd St),11334
6,Berry St at 4th St,10956
7,Powell St BART Station (Market St at 4th St),10142
8,Howard St at Beale St,9926
9,Steuart St at Market St,9347


In [26]:
from pyspark.sql.functions import count, sum, col, round
from pyspark.sql.window import Window

# Group the data by member_gender and calculate the count for each gender
gender_count = df.groupBy("member_gender").agg(count("member_gender").alias("gender_count"))

# Calculate the total count of all genders
total_count = gender_count.agg(sum("gender_count")).collect()[0][0]

# Calculate the ratio of each gender and round to 2 decimal places
gender_ratio = gender_count.withColumn("ratio", round(col("gender_count") / total_count, 2))

# Show the results
gender_ratio.show()



+-------------+------------+-----+
|member_gender|gender_count|ratio|
+-------------+------------+-----+
|         null|           0|  0.0|
|       Female|       98621| 0.22|
|        Other|        6299| 0.01|
|         Male|      348318| 0.77|
+-------------+------------+-----+



In [29]:
pandas_df = gender_ratio.toPandas()
# Apply custom styles
styled_table = style_table(pandas_df)
# Display the table
display(styled_table)

Unnamed: 0,member_gender,gender_count,ratio
0,,0,0.0
1,Female,98621,0.22
2,Other,6299,0.01
3,Male,348318,0.77


 ## 6-make a comparison to find the percentage of usage for customer and subscriber


In [30]:
percent_user = df.groupBy('user_type').agg(count('*').alias('total'))

percent_user.withColumn('percantage',round(col('total')/ df.count()*100)).show()

+----------+------+----------+
| user_type| total|percantage|
+----------+------+----------+
|Subscriber|409230|      79.0|
|  Customer|110470|      21.0|
+----------+------+----------+



In [31]:
pandas_df = gender_ratio.toPandas()
# Apply custom styles
styled_table = style_table(pandas_df)
# Display the table
display(styled_table)

Unnamed: 0,user_type,total
0,Subscriber,409230
1,Customer,110470


 ## 7-calculate the age of all users and show the relation between the distance and the age

In [None]:
# calculate the age of each user

from pyspark.sql import functions as F

df1 = df.withColumn("age", 2017 - F.col("member_birth_year"))

In [None]:
df_age = df1.select(col('age'),('distance_m')).show(10)
df_age

In [None]:
#find the correlation between age and total distance

from pyspark.sql.functions import *
df1.stat.corr("age","distance_m")

In [None]:

# Show the relation between the distance and the age
df1.groupBy("age").agg({"distance_m": "mean"}).show()

 ## 8-calculate the total cost for all customers and all subscribers

In [33]:
df = df.withColumn('total_paid', 
                   round((df.distance_m/ 1000) * .35)) # assume 35 cent for every meter


df.groupBy("user_type").agg(sum('total_paid')
                            .alias('total_paid_sum')).show()

+----------+--------------+
| user_type|total_paid_sum|
+----------+--------------+
|Subscriber|      201784.0|
|  Customer|       58410.0|
+----------+--------------+



 ## 9- what is the ratio of payment using cc or app wallet

In [35]:
# what is the ratio of payment using cc or app wallet

df.groupBy('pyment').agg(count('*').alias('number of usage')).show()

+-----------+---------------+
|     pyment|number of usage|
+-----------+---------------+
| app wallet|         260061|
|credit card|         259639|
+-----------+---------------+



 ## 10-what is the preferred way to pay for customers and subscriber

In [36]:
df.groupBy('pyment').agg(count('*').alias('total_on_type'))\
.withColumn('perc', col('total_on_type') / df.count()).show()

+-----------+-------------+-------------------+
|     pyment|total_on_type|               perc|
+-----------+-------------+-------------------+
| app wallet|       260061| 0.5004060034635367|
|credit card|       259639|0.49959399653646336|
+-----------+-------------+-------------------+



In [38]:
#df.groupBy('user_type', 'pyment').agg(count('*').alias('row_count')).withColumnRenamed('pyment', 'payment').show()