In [97]:
from pyspark.sql import SparkSession
from pyspark.sql import types, functions as F
from datetime import datetime, timedelta
from collections import namedtuple

In [4]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("fhvhv-analyze-rdds") \
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 12:29:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [91]:
trips_df = spark.read.parquet("../data/stage/*")

In [93]:
trips_df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [55]:
def filter_report_date(row):
    """ Gets RDD row and check the date outliers"""
    report_date = datetime(year=2021, month=6, day=15).date()
    pickup_date = row.pickup_datetime.date()
    
    return pickup_date == report_date
    

In [78]:
def transform(row):
    # Keys
    pickup_hour = row.pickup_datetime.replace(second=0, minute=0, microsecond=0)
    pickup_zone = row.PULocationID 
    # Metrics
    trip_duration = (row.dropoff_datetime - row.pickup_datetime).total_seconds()
    trip_count = 1
    
    return ((pickup_hour, pickup_zone), (trip_duration, trip_count))

In [79]:
def calculate_metrics(left_values, right_values):
    left_duration, left_count = left_values
    right_duration, right_count = right_values
    
    trips_duration_sum = left_duration + right_duration
    trips_count_sum = left_count + right_count
    
    return (trips_duration_sum, trips_count_sum)

In [89]:
def unwrap(row):
    TripsStatisticTuple = namedtuple(
        "TripsStatisticTuple", 
        ["pickup_hour", "pickup_zone", "all_trips_duration_hours", "total_trips"]
    )
    return TripsStatisticTuple(row[0][0], row[0][1], round(row[1][0] / 60 / 60, 2), row[1][1])

In [100]:
columns = ['pickup_datetime', 'dropoff_datetime', 'PULocationID']

In [101]:
report_schema = types.StructType([
    types.StructField('pickup_hour', types.TimestampType(), True),
    types.StructField('pickup_zone', types.IntegerType(), True), 
    types.StructField('all_trips_duration_hours', types.DoubleType(), True), 
    types.StructField('total_trips', types.IntegerType(), True)
])

In [102]:
statistic_df = trips_df \
    .select(columns) \
    .rdd \
    .filter(filter_report_date) \
    .map(transform) \
    .reduceByKey(calculate_metrics) \
    .map(unwrap) \
    .toDF(report_schema)

In [104]:
statistic_df.show(24)

+-------------------+-----------+------------------------+-----------+
|        pickup_hour|pickup_zone|all_trips_duration_hours|total_trips|
+-------------------+-----------+------------------------+-----------+
|2021-06-15 08:00:00|        145|                   56.89|        152|
|2021-06-15 15:00:00|        151|                    36.8|         82|
|2021-06-15 15:00:00|        231|                   98.56|        248|
|2021-06-15 12:00:00|          7|                    57.2|        180|
|2021-06-15 08:00:00|         21|                   32.27|        101|
|2021-06-15 01:00:00|         10|                    5.65|         26|
|2021-06-15 19:00:00|         21|                   16.95|         71|
|2021-06-15 19:00:00|        133|                   12.25|         47|
|2021-06-15 19:00:00|         25|                   46.81|        158|
|2021-06-15 22:00:00|        191|                   11.76|         51|
|2021-06-15 11:00:00|        155|                   23.23|         88|
|2021-

In [108]:
statistic_df.orderBy(F.desc("total_trips")).show(24)

+-------------------+-----------+------------------------+-----------+
|        pickup_hour|pickup_zone|all_trips_duration_hours|total_trips|
+-------------------+-----------+------------------------+-----------+
|2021-06-15 18:00:00|        161|                  224.58|        515|
|2021-06-15 17:00:00|        161|                  248.18|        493|
|2021-06-15 23:00:00|        181|                  215.39|        489|
|2021-06-15 18:00:00|        162|                  186.47|        469|
|2021-06-15 21:00:00|        132|                  257.79|        466|
|2021-06-15 14:00:00|        138|                  301.97|        462|
|2021-06-15 21:00:00|        161|                  141.18|        457|
|2021-06-15 19:00:00|        161|                  161.42|        446|
|2021-06-15 19:00:00|         61|                  125.81|        436|
|2021-06-15 00:00:00|        132|                   198.2|        432|
|2021-06-15 18:00:00|         61|                  138.45|        432|
|2021-

In [110]:
trips_df.createOrReplaceTempView("trips")

In [127]:
spark.sql("""

WITH transformed_trips as (

    SELECT 

        PULocationID AS pickup_zone,
        DATE_TRUNC('hour', pickup_datetime) AS pickup_hour,
        UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime) as duration_sec

    FROM trips
    WHERE DATE(pickup_datetime) = '2021-06-15'

)

SELECT 

    pickup_zone,
    pickup_hour,
    
    ROUND(
        SUM(duration_sec) / 60 / 60,
        2
    )             AS all_trips_duration_hour,
    COUNT(*)      AS total_trips

FROM transformed_trips
GROUP BY 1, 2
ORDER BY total_trips DESC

""").show(24)



+-----------+-------------------+-----------------------+-----------+
|pickup_zone|        pickup_hour|all_trips_duration_hour|total_trips|
+-----------+-------------------+-----------------------+-----------+
|        161|2021-06-15 18:00:00|                 224.58|        515|
|        161|2021-06-15 17:00:00|                 248.18|        493|
|        181|2021-06-15 23:00:00|                 215.39|        489|
|        162|2021-06-15 18:00:00|                 186.47|        469|
|        132|2021-06-15 21:00:00|                 257.79|        466|
|        138|2021-06-15 14:00:00|                 301.97|        462|
|        161|2021-06-15 21:00:00|                 141.18|        457|
|        161|2021-06-15 19:00:00|                 161.42|        446|
|         61|2021-06-15 19:00:00|                 125.81|        436|
|         61|2021-06-15 18:00:00|                 138.45|        432|
|        132|2021-06-15 00:00:00|                  198.2|        432|
|        231|2021-06

                                                                                