# Exploratory Data Analysis of bike rentals database with Pyspark
# Krystian Warda

In [3]:
import findspark
findspark.init('/home/krystian/spark-3.1.2-bin-hadoop3.2')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster1').getOrCreate()

In [4]:
df = spark.read \
    .options(header=True, inferSchema=True) \
    .csv(f"/home/krystian/data/2017-fordgobike-tripdata.csv")

In [5]:
from pyspark.sql.functions import max, min, datediff, format_number, collect_list, struct, mean, when, size, collect_set, count, to_date, countDistinct, current_date, sumDistinct, col, year, date_format, month, dayofmonth
from pyspark.sql import Window
import pyspark.sql.functions as F

In [6]:
df.columns

['duration_sec',
 'start_time',
 'end_time',
 'start_station_id',
 'start_station_name',
 'start_station_latitude',
 'start_station_longitude',
 'end_station_id',
 'end_station_name',
 'end_station_latitude',
 'end_station_longitude',
 'bike_id',
 'user_type',
 'member_birth_year',
 'member_gender']

In [7]:
print((df.count(), len(df.columns)))

(519700, 15)


[datetime](https://docs.python.org/3/library/datetime.html#module-datetime)

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

# Distribution of the variable "member_gender"

In [8]:
df.groupBy('member_gender').agg(
    F.expr("count(member_gender)").alias("No of records"),
).show()

+-------------+-------------+
|member_gender|No of records|
+-------------+-------------+
|         null|            0|
|       Female|        98621|
|        Other|         6299|
|         Male|       348318|
+-------------+-------------+



# The minimum, maximum and average age of bicycle rentals

In [9]:
df = df.withColumn("Age", 2017 - df.member_birth_year).alias("Age")
df.groupBy('member_gender').agg(
    F.expr("avg(Age)").alias("AVG Age"),
    F.expr("min(Age)").alias("MIN Age"),
    F.expr("max(Age)").alias("MAX Age")
).withColumn('AVG Age', F.round(F.col('AVG Age'), 0).cast('integer')).show()

+-------------+-------+-------+-------+
|member_gender|AVG Age|MIN Age|MAX Age|
+-------------+-------+-------+-------+
|         null|   null|   null|   null|
|       Female|     35|     18|    117|
|        Other|     37|     18|    117|
|         Male|     37|     18|    131|
+-------------+-------+-------+-------+



# The number of unique bikes

In [10]:
df.select("bike_id").distinct().count()

3673

# Number of unique bike stations

In [11]:
df.dropDuplicates(['start_station_name','end_station_name'])\
.select('start_station_name').distinct().count()

272

# Which bike was rented the longest and which was the shortest during the analyzed period (and for how long)

In [12]:
aggData = df.groupBy('bike_id').agg(
    F.expr("count(bike_id)").alias("Total No of rents"),
    F.expr("avg(duration_sec)").alias("AVG Rent Time"),
    F.expr("min(duration_sec)").alias("MIN Rent Time"),
    F.expr("max(duration_sec)").alias("MAX Rent Time")
).orderBy("MAX Rent Time",ascending=False)
aggData.show(2)

+-------+-----------------+------------------+-------------+-------------+
|bike_id|Total No of rents|     AVG Rent Time|MIN Rent Time|MAX Rent Time|
+-------+-----------------+------------------+-------------+-------------+
|   2231|               95|2938.7684210526318|          126|        86369|
|    229|              237| 1189.029535864979|           92|        86355|
+-------+-----------------+------------------+-------------+-------------+
only showing top 2 rows



In [13]:
aggData.orderBy("Total No of rents",ascending=False).show(2)

+-------+-----------------+------------------+-------------+-------------+
|bike_id|Total No of rents|     AVG Rent Time|MIN Rent Time|MAX Rent Time|
+-------+-----------------+------------------+-------------+-------------+
|     68|              457| 903.3282275711159|           85|        22318|
|   2178|              426|1258.5821596244132|           76|        32086|
+-------+-----------------+------------------+-------------+-------------+
only showing top 2 rows



In [14]:
aggData.orderBy("MIN Rent Time",ascending=True).show(2)

+-------+-----------------+------------------+-------------+-------------+
|bike_id|Total No of rents|     AVG Rent Time|MIN Rent Time|MAX Rent Time|
+-------+-----------------+------------------+-------------+-------------+
|   2580|              274|1079.7554744525548|           61|        27059|
|   1238|              110|            1040.8|           61|        16288|
+-------+-----------------+------------------+-------------+-------------+
only showing top 2 rows



In [15]:
aggData.orderBy("AVG Rent Time",ascending=False).show(2)

+-------+-----------------+------------------+-------------+-------------+
|bike_id|Total No of rents|     AVG Rent Time|MIN Rent Time|MAX Rent Time|
+-------+-----------------+------------------+-------------+-------------+
|   3730|                8|         11461.125|          598|        79841|
|   3693|                7|11434.714285714286|          127|        74203|
+-------+-----------------+------------------+-------------+-------------+
only showing top 2 rows



# Average duration of a single loan

In [16]:
df.select(mean('duration_sec').alias("AVG rent time")).show()

+------------------+
|     AVG rent time|
+------------------+
|1099.0095208774294|
+------------------+



# Between which stations there was the greatest traffic

In [17]:
df.groupBy('start_station_name', 'end_station_name').agg(
    F.expr("count(bike_id)").alias("Total No of connections"))\
.orderBy("Total No of connections",ascending=False).show(1, False)

+--------------------------------------------------+-----------------------------+-----------------------+
|start_station_name                                |end_station_name             |Total No of connections|
+--------------------------------------------------+-----------------------------+-----------------------+
|San Francisco Ferry Building (Harry Bridges Plaza)|The Embarcadero at Sansome St|3344                   |
+--------------------------------------------------+-----------------------------+-----------------------+
only showing top 1 row



# At what time during the day were the most bicycles rented

In [18]:
df.withColumn("Ex Time of Rent", date_format('start_time', 'HH:mm'))\
.select("Ex Time of Rent")\
.groupBy('Ex Time of Rent').count()\
.orderBy("count",ascending=False).show(1) 


+---------------+-----+
|Ex Time of Rent|count|
+---------------+-----+
|          17:09| 1208|
+---------------+-----+
only showing top 1 row



# The average number of rentals for individual days of the week

In [19]:
df.withColumn("Rent day", date_format('start_time', 'EEEE')).select('Rent day')\
.groupBy('Rent day').count()\
.orderBy("count",ascending=False).show() 

+---------+-----+
| Rent day|count|
+---------+-----+
|  Tuesday|87865|
|Wednesday|87752|
| Thursday|85243|
|   Monday|81410|
|   Friday|81165|
| Saturday|50874|
|   Sunday|45391|
+---------+-----+



# The average number of rentals for individual months

In [20]:
df.withColumn("Rent month", date_format('start_time', 'MMMM'))\
.select('Rent month').groupBy('Rent month').count()\
.orderBy("count",ascending=False).show() 

+----------+------+
|Rent month| count|
+----------+------+
|   October|108937|
| September| 98558|
|  November| 95612|
|  December| 86539|
|    August| 83292|
|      July| 44073|
|      June|  2689|
+----------+------+



# RDD dataDaily containing data aggregated down to the level of the day. Each day of the year (element in RDD) is to contain the following information:

- 'date' : Date 
- 'avg_duration_sec' : Average duration of rentals for the day
- 'n_trips' : liczba wypożyczeń danego dnia
- 'n_bikes' : number of rentals on a given day
- 'n_subscriber' : number of rentals made by subscribers on a given day


In [33]:
dataDaily = df.withColumn("date", date_format('start_time', 'D'))\
.groupBy('date').agg(
    F.expr("avg(duration_sec)").alias("avg_duration_sec"),   
    F.expr("count(bike_id)").alias("n_trips"),
    countDistinct("bike_id").alias("n_bikes"),
    count(when(col("user_type") == "Subscriber", True)).alias("n_subscriber"))\
.withColumn('avg_duration_sec', F.round(F.col('avg_duration_sec'), 0).cast('integer'))\
.orderBy("date",ascending=True) 

dataDaily.show(5)

+----+----------------+-------+-------+------------+
|date|avg_duration_sec|n_trips|n_bikes|n_subscriber|
+----+----------------+-------+-------+------------+
| 179|            1131|    632|    299|         530|
| 180|            1077|   1019|    375|         885|
| 181|            1013|   1038|    392|         824|
| 182|            3204|    475|    255|         189|
| 183|            3082|    523|    257|         153|
+----+----------------+-------+-------+------------+
only showing top 5 rows



# Number of unique combinations of stations (x -> y == y -> x) for the day

In [34]:
df2 = df.withColumn("sl2", when(df['end_station_id'] < df['start_station_id'],  df['end_station_id'])\
.otherwise(df['start_station_id']))\
.withColumn("el2", when(df['end_station_id'] > df['start_station_id'],  df['end_station_id'])\
.otherwise(df['start_station_id']))\
.drop("start_station_id", "end_station_id")

df2_agg = df2.withColumn("date", date_format('start_time', 'D'))\
.groupBy('date').agg(collect_set(struct(col('sl2'), col('el2'))).alias("n_routes")) 

df2_agg.select("date", size("n_routes")).alias("n_routes")\
.orderBy("date",ascending=True).show(5)

+----+--------------+
|date|size(n_routes)|
+----+--------------+
| 179|           292|
| 180|           384|
| 181|           390|
| 182|           208|
| 183|           190|
+----+--------------+
only showing top 5 rows



# RDD key-value `bikeDaily` containing one element for each` "bike_id" `. The values ​​in the RDD are to be lists of the total daily use of a given bike in seconds (items of the list in chronological order).

In [32]:
bikeDaily = df.withColumn("date", date_format('start_time', 'D'))\
.groupBy('bike_id').pivot('date').sum("duration_sec")

bikeDaily.select('bike_id','259','260','261','262','263',).show(3)

+-------+-----+-----+----+----+----+
|bike_id|  259|  260| 261| 262| 263|
+-------+-----+-----+----+----+----+
|   2866| 1304| null|null| 596|1805|
|   2122|13834|  799|null|null|null|
|   2659| 3190|17041|1151| 550|2052|
+-------+-----+-----+----+----+----+
only showing top 3 rows

