In [1]:
import datetime as dt
import numpy as np 
import pandas as pd
import time
import argparse
from pathlib import Path
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

In [2]:
sc = SparkContext()
spark = SparkSession(sc)

In [3]:
input_file = './data/2020-02.csv'
# output_path = 'output/'

In [4]:
def get_date_time(date, time):
    #   Format:
    #       28/02/2020,23:58:15,
    #       29/02/2020,0:01:55
    from datetime import datetime    
    datetime_object = datetime.strptime(date+" "+time, '%d/%m/%Y %H:%M:%S')
    return datetime_object

def get_age_bucket(age):
    if age < 18:
        return "00-18"
    elif age >= 18 and age <=34:
        return "18-34"
    elif age >= 35 and age <=44:
        return "35-44"
    elif age >= 45 and age <=54:
        return "45-54"
    elif age >= 55 and age <=64:
        return "55-64"
    else:
        return "65-UP"
    
def parse_ride_records(part_id, list_of_records):
    # Genero_Usuario,Edad_Usuario,Bici,Ciclo_Estacion_Retiro,Fecha_Retiro,Hora_Retiro,Ciclo_Estacion_Arribo,Fecha_Arribo,Hora_Arribo
    # M,44,4357,442,1/2/2020,0:00:38,116,1/2/2020,0:35:17
    if part_id == 0: 
        next(list_of_records) # skipping the header line
    import csv
    reader = csv.reader(list_of_records)
    for row in reader:
        gender = row[0]
        age = int(row[1])
        bike_id = int(row[2])
        station_start = row[3]
        datetime_start = get_date_time(row[4], row[5])
        datetime_end = get_date_time(row[7], row[8])
        station_end = row[6]
        yield (bike_id, gender, get_age_bucket(age), station_start, station_end, datetime_start, datetime_end)

**Questions to Answer**
1. Top 5 stations for starting a ride.
2. Top 5 trips based on start station and end station.
3. Rider statistics based on average riding time by gender and age category.
4. Find the busiest bikes in the CDMX for February 2020. How many times was it used? How many secs was it in use?

In [5]:
def get_rides_rdd(sc, input_file):
    print("Reading input file:", input_file)

    rides_rdd = sc.textFile(input_file, use_unicode=True) \
        .mapPartitionsWithIndex(parse_ride_records) \
        .cache()
    
    print("Number of partitions: ", rides_rdd.getNumPartitions())
    return rides_rdd

def get_top_start_stations(num, rides_rdd):
    results = rides_rdd.map(lambda x: (x[3], x[6]-x[5] ) ) \
        .filter(lambda x: x[1].total_seconds() <= 60 * 60 * 2) \
        .mapValues(lambda x:  1 ) \
        .reduceByKey(lambda x,y: x+y) \
        .map(lambda x: (x[1], x[0])) \
        .top(num, key=lambda x: x) 
    return results

def get_top_routes(num, rides_rdd): 
    results = rides_rdd.map(lambda x: ( (x[3],x[4]), x[6]-x[5] ) ) \
        .filter(lambda x: x[1].total_seconds() <= 60 * 60 * 2) \
        .mapValues(lambda x: (x.total_seconds(), 1) ) \
        .reduceByKey(lambda x,y: ( x[0] + y[0], x[1] + y[1] ) ) \
        .map(lambda x: (x[1][1], ( x[0], x[1][0]/x[1][1] ) ) ) \
        .top(num, key=lambda x: x)   
    return results

def get_stats_by_gender(rides_rdd):
    results = rides_rdd.map(lambda x: (x[1], x[6]-x[5] ) ) \
        .filter(lambda x: x[1].total_seconds() <= 60 * 60 * 2) \
        .mapValues(lambda x: (x.total_seconds(), 1) ) \
        .reduceByKey(lambda x, y: ((x[0] + y[0]), x[1] + y[1]) ) \
        .mapValues(lambda x: (x[0] / x[1], x[1]) ) \
        .collect()
    return results   

def get_stats_by_age(rides_rdd):
    results = rides_rdd.map(lambda x: (x[2], x[6]-x[5] ) ) \
        .filter(lambda x: x[1].total_seconds() <= 60 * 60 * 2) \
        .mapValues(lambda x: (x.total_seconds(), 1) ) \
        .reduceByKey(lambda x, y: ((x[0] + y[0]), x[1] + y[1]) ) \
        .mapValues(lambda x: (x[0] / x[1], x[1]) ) \
        .collect()
    return results   

def get_busy_bees(num, rides_rdd, by_count=False):
    results = rides_rdd.map(lambda x: (x[0], x[6]-x[5] ) ) \
        .filter(lambda x: x[1].total_seconds() <= 60 * 60 * 2) \
        .mapValues(lambda x: (1, x.total_seconds())) \
        .reduceByKey(lambda x, y: ((x[0] + y[0]), x[1] + y[1]) ) \
        .map(lambda x: (x[1], x[0])) 
    
    if by_count: #times bike was ridden
        return results.top(num, key=lambda x: x[0][0])
    else: #by total time ridden
        return results.top(num, key=lambda x: x[0][1])   

In [6]:
print("Getting RDD of all rides...", input_file)
rides_rdd = get_rides_rdd(sc, input_file)

Getting RDD of all rides... ./data/2020-02.csv
Reading input file: ./data/2020-02.csv
Number of partitions:  2


In [7]:
start = time.time()
top_stations = get_top_start_stations(5, rides_rdd)
print("Execution Time(secs): ", time.time() - start)
print("Top Starting Stations:")
for entry in top_stations:
    print("Start Stations: {:03d}, Trips: {:03d}".format(int(entry[1]), entry[0]))

Execution Time(secs):  16.89979600906372
Top Starting Stations:
Start Stations: 001, Trips: 6298
Start Stations: 027, Trips: 6201
Start Stations: 271, Trips: 5262
Start Stations: 064, Trips: 4825
Start Stations: 041, Trips: 4621


In [8]:
start = time.time()
top_routes = get_top_routes(5, rides_rdd)
print("Execution Time(secs): ", time.time() - start)
print("Top Bike Routes:")
for entry in top_routes:
    print("From: {:03d}, To: {:03d}, Total Trips: {:03d}, Avg Duration(mins): {:.2f}".format(
        int(entry[1][0][0]), int(entry[1][0][1]), entry[0], entry[1][1]/60))

Execution Time(secs):  5.171604156494141
Top Bike Routes:
From: 033, To: 033, Total Trips: 375, Avg Duration(mins): 30.23
From: 018, To: 001, Total Trips: 319, Avg Duration(mins): 5.58
From: 211, To: 217, Total Trips: 303, Avg Duration(mins): 3.54
From: 449, To: 449, Total Trips: 301, Avg Duration(mins): 15.34
From: 208, To: 206, Total Trips: 297, Avg Duration(mins): 8.53


In [9]:
start = time.time()
gender_stats = get_stats_by_gender(rides_rdd)
print("Execution Time: ", time.time() - start)
print("Ride Profile by Gender: ")
for entry in gender_stats:
    print("Gender: {}, Count: {}, Avg Ride (Mins): {:.2f}".format(entry[0], entry[1][1], entry[1][0]/60 ))

Execution Time:  3.573302745819092
Ride Profile by Gender: 
Gender: M, Count: 509782, Avg Ride (Mins): 13.62
Gender: F, Count: 174808, Avg Ride (Mins): 14.30


In [10]:
start = time.time()
age_stats = get_stats_by_age(rides_rdd)
print("Execution Time: ", time.time() - start)
print("Ride Profile by Gender: ")
for entry in age_stats:
    print("Age: {}, Count: {:06d}, Avg Ride (Mins): {:.2f}".format(entry[0], entry[1][1], entry[1][0]/60 ))

Execution Time:  3.59033465385437
Ride Profile by Gender: 
Age: 35-44, Count: 167715, Avg Ride (Mins): 13.58
Age: 18-34, Count: 392040, Avg Ride (Mins): 14.03
Age: 45-54, Count: 077990, Avg Ride (Mins): 13.30
Age: 55-64, Count: 035831, Avg Ride (Mins): 13.30
Age: 00-18, Count: 001332, Avg Ride (Mins): 12.78
Age: 65-UP, Count: 009682, Avg Ride (Mins): 13.69


In [11]:
start = time.time()
most_used_bikes = get_busy_bees(3, rides_rdd)
print("Execution Time(secs): ", time.time() - start)
print("Busiests Bikes By Total Minutes Ridden: ")
for entry in most_used_bikes: 
    print("ID: {:03d}, Count: {}, Minutes: {:.2f}".format(entry[1], entry[0][0], entry[0][1]/60 ))

print()

start = time.time()
most_used_bikes = get_busy_bees(3, rides_rdd, True)
print("Time Elapsed(secs): ", time.time() - start)
print("Busiests Bikes By Total Rides: ")
for entry in most_used_bikes: 
    print("ID: {:05d}, Count: {}, Minutes: {:.2f}".format(entry[1], entry[0][0], entry[0][1]/60 ))


Execution Time(secs):  3.7268128395080566
Busiests Bikes By Total Minutes Ridden: 
ID: 15100, Count: 125, Minutes: 2916.57
ID: 15059, Count: 125, Minutes: 2808.73
ID: 10810, Count: 208, Minutes: 2785.77

Time Elapsed(secs):  3.939460277557373
Busiests Bikes By Total Rides: 
ID: 10771, Count: 217, Minutes: 1523.15
ID: 10810, Count: 208, Minutes: 2785.77
ID: 07854, Count: 192, Minutes: 2633.07


In [12]:
print("Done") 

Done
