In [1]:
!echo $SPARK_MASTER_ADDRESS

spark://notch082.ipoib.int.chpc.utah.edu:7077


In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, avg, max, min, sum, col, when, to_timestamp, isnan, coalesce

In [3]:
# Open the Spark Session
spark =SparkSession.builder.master(os.getenv('SPARK_MASTER_ADDRESS')).appName("Spark-application").getOrCreate()
sc = spark._jsc.sc()
n_workers = len([executor.host() for executor in
sc.statusTracker().getExecutorInfos() ]) -1 
print(n_workers)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/29 21:01:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


0


### Read in the CSV data files

In [4]:
rideData = spark.read.csv("bike_share_data/mergedRideData.csv", header=True, inferSchema=True)
bikeShareLocations = spark.read.csv("bike_share_data/Capital_Bikeshare_Locations.csv", header=True, inferSchema=True)

                                                                                

### Convert time col's to timestamps

In [5]:
rideData = rideData.withColumn("started_at", to_timestamp("started_at"))
rideData = rideData.withColumn("ended_at", to_timestamp("ended_at"))

### Compute the average length of a ride

In [6]:
rideData = rideData.withColumn("ride_length", (col("ended_at") - col("started_at")))

### Create binary col for if the ride was done by a member

In [7]:
rideData = rideData.withColumn("member_binary", when(col("member_casual") == "member", 1).otherwise(0))

### Group by start and end station and compute features

In [8]:
start_station_ride_duration = rideData.groupBy("start_station_name").agg({"ride_length": "avg"})
rides_starting_at = rideData.groupBy("start_station_name").count()
rides_starting_at_member = rideData.groupBy("start_station_name").sum("member_binary")

end_station_ride_duration = rideData.groupBy("end_station_name").agg({"ride_length": "avg"})
rides_ending_at = rideData.groupBy("end_station_name").count()
rides_ending_at_member = rideData.groupBy("end_station_name").sum("member_binary")

### Create start and end feature data frames

In [9]:
rides_starting_at_features = start_station_ride_duration \
    .withColumnRenamed("avg(ride_length)", "starting_at_avg_ride_duration") \
    .join(rides_starting_at.withColumnRenamed("count", "count_rides_starting_at"), "start_station_name") \
    .join(rides_starting_at_member.withColumnRenamed('sum(member_binary)', 'count_of_member_rides_starting_at'), "start_station_name")

rides_ending_at_features = end_station_ride_duration \
    .withColumnRenamed("avg(ride_length)", "ending_at_avg_ride_duration") \
    .join(rides_ending_at.withColumnRenamed("count", "count_rides_ending_at"), "end_station_name") \
    .join(rides_ending_at_member.withColumnRenamed('sum(member_binary)', 'count_of_member_rides_ending_at'), "end_station_name")

### Merge the start and end features into one data frame

In [10]:
all_features = rides_starting_at_features.join(rides_ending_at_features, rides_starting_at_features.start_station_name == rides_ending_at_features.end_station_name, how='fullouter')

# create one station name column
all_features = all_features.withColumn("station_name", coalesce(all_features.start_station_name, all_features.end_station_name))

# drop the old station name cols
all_features = all_features.drop(*['end_station_name', 'start_station_name'])

### Clean the bike share stations data set
There are several columns that don't provide any value for our model. Drop those columns:

In [11]:
cols_to_drop = ['STATION_STATUS', 'X', 'Y', 'STATION_TYPE', 'STATION_STATUS', 'LAST_REPORTED', 'IS_INSTALLED', 'IS_RETURNING',
               'REGION_NAME', 'RENTAL_METHODS', 'REGION_ID', 'GIS_ID', 'GIS_LAST_MOD_DTTM']

bikeShareLocations = bikeShareLocations.drop(*cols_to_drop)

### Combine the bike share stations and features data set
This will drop all the stations in the features data set that no longer exist in DC (38 stations were removed in 2023/24).
These stations and their rides will be dropped from the data set.

In [12]:
cleaned_dataset = bikeShareLocations.join(all_features, bikeShareLocations.NAME==all_features.station_name, how='inner')
# remove duplicate station name column
cleaned_dataset = cleaned_dataset.drop("NAME")

### Create a weight for each station

In [13]:
cleaned_dataset = cleaned_dataset.withColumn("weight",
    (col("count_rides_ending_at") - col("count_of_member_rides_ending_at")) +
    (col("count_rides_starting_at") - col("count_of_member_rides_starting_at")) +
    col("count_of_member_rides_ending_at") * 1.25 +
    col("count_of_member_rides_starting_at") * 1.25
)

In [14]:
cleaned_dataset.toPandas().to_csv("bike_share_data/cleaned_data.csv")

                                                                                