# Taxi Exercise

## Prerrequisites

Install Java and Spark in VM

In [1]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.1
!wget -q https://apache.osuosl.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop2.tgz

In [2]:
# unzip it
!tar xf spark-3.3.1-bin-hadoop2.tgz

In [3]:
!pip install -q findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop2"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

Start Spark Session 

---

In [5]:
import findspark
findspark.init("spark-3.3.1-bin-hadoop2")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .appName("Joins") \
        .master("local[*]") \
        .config("spark.ui.port", "4500") \
        .getOrCreate()

spark.version

'3.3.1'

In [6]:
spark

In [7]:
# Import sql functions
from pyspark.sql.functions import *

In [9]:
!mkdir -p dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2022/master/datasets/taxi_data.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2022/master/datasets/taxi_zones.csv -P /dataset

Load the datasets

In [10]:
taxiDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/dataset/taxi_data.csv")

taxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [11]:
taxiDF.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2018-01-24 23:02:56|  2018-01-24 23:10:58|              1|         2.02|         1|                 N|          48|         107|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8|
|       2| 2018-01-24 23:57:13|  2018-01-25 00:2

In [12]:
taxiZonesDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/dataset/taxi_zones.csv")

taxiZonesDF.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [13]:
taxiZonesDF.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



## Exercise

In this exercise we will be working with two DFs. The first one, taxiDf holds info about taxi rides per 2018 year. And the second, taxiZonesDF, have info about the Zones. Please load the DFs and print the schemas and two (or more) rows for more detailed info.

The aim of the exercise is to answer the questions listed below.

**Questions:**

 1. Which zones have the most pickups/dropoffs overall? Note there are many PULocationIDs per Zone?
 2. What are the peak hours for taxi?
 3. How are the trips distributed by length? Show stats like mean, max, min, etc. 
    Then get the total trips for less/more than 30 km. Why are people taking the cab? For long or short trips?
    You can also try the same with different distances. Which is the expected value for threshold is we want to obtain more or less the same trips in long/short counting?
 4. What are the peak hours for long/short trips?
 5. What are the top 3 pickup/dropoff zones for long/short trips?
 6. How are people paying for the ride, on long/short trips? Hint: the information about how good is the payment is in RatecodeID column.
 7. How is the payment type (RatecodeId) evolving with time (in days)? Hint: use the column with pickup time info.
    Get the same info but with avg of ratecode and total trips per day.

### Question 1

In [14]:
pickupsByTaxiZoneDF = taxiDF.groupBy("PULocationID") \
    .agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("PULocationID") == col("LocationID")) \
    .drop("LocationID", "service_zone") \
    .orderBy(col("totalTrips").desc())

pickupsByTaxiZoneDF.show(3)

+------------+----------+---------+--------------------+
|PULocationID|totalTrips|  Borough|                Zone|
+------------+----------+---------+--------------------+
|         237|     15945|Manhattan|Upper East Side S...|
|         161|     15255|Manhattan|      Midtown Center|
|         236|     13767|Manhattan|Upper East Side N...|
+------------+----------+---------+--------------------+
only showing top 3 rows



In [15]:
# 1b - group by borough (city)
pickupsByBoroughDF = pickupsByTaxiZoneDF.groupBy(col("Borough")) \
    .agg(sum(col("totalTrips")).alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByBoroughDF.show(2)

+---------+----------+
|  Borough|totalTrips|
+---------+----------+
|Manhattan|    304266|
|   Queens|     17712|
+---------+----------+
only showing top 2 rows



### Question 2

In [16]:
pickupsByHourDF = taxiDF \
    .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
    .groupBy("hour_of_day") \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByHourDF.show(3)

+-----------+----------+
|hour_of_day|totalTrips|
+-----------+----------+
|         16|     22121|
|         17|     21598|
|         19|     20884|
+-----------+----------+
only showing top 3 rows



### Question 3

Get stats for taxiDF

In [17]:
tripDistanceDF = taxiDF.select(col("trip_distance").alias("distance"))
tripDistanceStatsDF = tripDistanceDF.select(
    count("*").alias("count"),
    lit(30).alias("threshold"),
    mean("distance").alias("mean"),
    stddev("distance").alias("stddev"),
    min("distance").alias("min"),
    max("distance").alias("max")
  )

tripDistanceStatsDF.show(3)

+------+---------+------------------+------------------+---+----+
| count|threshold|              mean|            stddev|min| max|
+------+---------+------------------+------------------+---+----+
|331893|       30|2.7179894423805155|3.4851522248851214|0.0|66.0|
+------+---------+------------------+------------------+---+----+



We will add a isLong column with the true/false for long/short rides

In [18]:
tripsWithLengthDF = taxiDF.withColumn("isLong", col("trip_distance") >= 30)

tripsWithLengthDF.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|isLong|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------+
|       2| 2018-01-24 23:02:56|  2018-01-24 23:10:58|              1|         2.02|         1|                 N|          48|         107|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8| false|
|       2| 2018-01-2

As we want to know how many long/short trip are we have to groupBy islong and count()

In [19]:
tripsByLengthDF = tripsWithLengthDF.groupBy("isLong").count()

tripsByLengthDF.show()

+------+------+
|isLong| count|
+------+------+
|  true|    83|
| false|331810|
+------+------+



### Question 4

We will use the DF created before with isLong flag. Then, we'll groupBy either islong and hour columns and make a count.

In [20]:
pickupsByHourByLengthDF = tripsWithLengthDF \
    .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
    .groupBy("hour_of_day", "isLong") \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByHourByLengthDF.filter(col("isLong") == True).show()

+-----------+------+----------+
|hour_of_day|isLong|totalTrips|
+-----------+------+----------+
|         19|  true|        10|
|         17|  true|         9|
|          7|  true|         6|
|          6|  true|         5|
|         12|  true|         5|
|         14|  true|         5|
|         11|  true|         5|
|         21|  true|         5|
|          5|  true|         4|
|         18|  true|         4|
|         10|  true|         3|
|         23|  true|         3|
|         13|  true|         3|
|         20|  true|         3|
|          9|  true|         3|
|         15|  true|         2|
|          8|  true|         2|
|         16|  true|         2|
|          0|  true|         2|
|          1|  true|         1|
+-----------+------+----------+
only showing top 20 rows



In [21]:
pickupsByHourByLengthDF.filter(col("isLong") == False).show()

+-----------+------+----------+
|hour_of_day|isLong|totalTrips|
+-----------+------+----------+
|         16| false|     22119|
|         17| false|     21589|
|         19| false|     20874|
|         18| false|     20314|
|         20| false|     19525|
|          6| false|     18862|
|         15| false|     18662|
|         13| false|     17840|
|         12| false|     17478|
|          7| false|     16834|
|         14| false|     16155|
|         11| false|     16077|
|         10| false|     15998|
|          9| false|     15561|
|          5| false|     15441|
|          8| false|     15346|
|         21| false|     14647|
|          4| false|      8600|
|         22| false|      7049|
|         23| false|      3975|
+-----------+------+----------+
only showing top 20 rows



### Question 5

We will use again the DF with isLong info.

If we undertand the question as PU and DO popularity seprated, we will have two DFs, one per each.

In [22]:
PUPopularDF = tripsWithLengthDF.groupBy("PULocationID").agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("PULocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Pickup_Zone") \
    .drop("LocationID", "Borough", "service_zone", "PULocationID") \
    .orderBy(col("totalTrips").desc())

PUPopularDF.show(10)

+----------+--------------------+
|totalTrips|         Pickup_Zone|
+----------+--------------------+
|     15945|Upper East Side S...|
|     15255|      Midtown Center|
|     13767|Upper East Side N...|
|     13715|        Midtown East|
|     11702|         Murray Hill|
|     11488|            Union Sq|
|     11455|Times Sq/Theatre ...|
|     10319|Penn Station/Madi...|
|     10091|        Clinton East|
|      9845|       Midtown North|
+----------+--------------------+
only showing top 10 rows



In [23]:
DOPopularDF = tripsWithLengthDF.groupBy("DOLocationID").agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("DOLocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Dropoff_Zone") \
    .drop("LocationID", "Borough", "service_zone") \
    .drop("DOLocationID") \
    .orderBy(col("totalTrips").desc())

DOPopularDF.show(10)

+----------+--------------------+
|totalTrips|        Dropoff_Zone|
+----------+--------------------+
|     15099|      Midtown Center|
|     14261|Upper East Side N...|
|     13754|Upper East Side S...|
|     11239|         Murray Hill|
|     11090|        Midtown East|
|     10054|Times Sq/Theatre ...|
|      9929|            Union Sq|
|      8666| Lincoln Square East|
|      8594|       Midtown North|
|      8258|        Clinton East|
+----------+--------------------+
only showing top 10 rows



But if we undertand the question as the most popular zones per PU/DO at the same time (trips with the same PU/DO, not just one) we have to do the groupBy for the two columns and two joins, one for PU and one for DO.

In [24]:
PUandDOPopularDF = tripsWithLengthDF.groupBy("PULocationID", "DOLocationID").agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("PULocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Pickup_Zone") \
    .drop("LocationID", "Borough", "service_zone") \
    .join(taxiZonesDF, col("DOLocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Dropoff_Zone") \
    .drop("LocationID", "Borough", "service_zone") \
    .drop("PULocationID", "DOLocationID") \
    .orderBy(col("totalTrips").desc())

PUandDOPopularDF.show(10)

+----------+--------------------+--------------------+
|totalTrips|         Pickup_Zone|        Dropoff_Zone|
+----------+--------------------+--------------------+
|      5561|                  NV|                  NV|
|      2425|Upper East Side S...|Upper East Side N...|
|      1962|Upper East Side N...|Upper East Side S...|
|      1944|Upper East Side N...|Upper East Side N...|
|      1928|Upper East Side S...|Upper East Side S...|
|      1052|Upper East Side S...|      Midtown Center|
|      1012|Upper East Side S...|        Midtown East|
|       987|      Midtown Center|Upper East Side S...|
|       965|Upper West Side S...|Upper West Side N...|
|       882|      Midtown Center|      Midtown Center|
+----------+--------------------+--------------------+
only showing top 10 rows



### Question 6

The RatecodeID columns has the info about how good is the payment.

In [25]:
taxiDF.select("RatecodeID").distinct().show()

+----------+
|RatecodeID|
+----------+
|         1|
|         6|
|         3|
|         5|
|         4|
|         2|
|        99|
+----------+



In [26]:
ratecodeDistributionDF = taxiDF \
    .groupBy(col("RatecodeID")).agg(count("*").alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

ratecodeDistributionDF.show()

+----------+----------+
|RatecodeID|totalTrips|
+----------+----------+
|         1|    324387|
|         2|      5878|
|         5|       895|
|         3|       530|
|         4|       193|
|        99|         7|
|         6|         3|
+----------+----------+



### Question 7

We have to group by pickup time and ratecode this time.

In [27]:
ratecodeEvolution = taxiDF \
    .groupBy(to_date(col("tpep_pickup_datetime")).alias("pickup_day"), col("RatecodeID")) \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("pickup_day"))

ratecodeEvolution.show()

+----------+----------+----------+
|pickup_day|RatecodeID|totalTrips|
+----------+----------+----------+
|2018-01-24|         1|     10760|
|2018-01-24|         2|       174|
|2018-01-24|         5|        80|
|2018-01-24|         3|         4|
|2018-01-24|         4|         9|
|2018-01-24|         6|         1|
|2018-01-25|        99|         7|
|2018-01-25|         3|       526|
|2018-01-25|         5|       815|
|2018-01-25|         2|      5704|
|2018-01-25|         1|    313627|
|2018-01-25|         4|       184|
|2018-01-25|         6|         2|
+----------+----------+----------+



Now we can get the avg ratecode per day.

In [28]:
ratecodeEvolutionAvg = ratecodeEvolution.groupBy("pickup_day").agg(sum("totalTrips").alias("totalTrips"), avg("RatecodeID").alias("avgRate"))

ratecodeEvolutionAvg.show()

+----------+----------+------------------+
|pickup_day|totalTrips|           avgRate|
+----------+----------+------------------+
|2018-01-25|    320865|17.142857142857142|
|2018-01-24|     11028|               3.5|
+----------+----------+------------------+



And the same but this time grouping by hour

In [29]:
ratecodeEvolutionPerHour = taxiDF \
    .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
    .groupBy(col("hour_of_day").alias("pickup_hour"), col("RatecodeID")) \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("pickup_hour"))

ratecodeEvolutionPerHour.show()

+-----------+----------+----------+
|pickup_hour|RatecodeID|totalTrips|
+-----------+----------+----------+
|          0|         1|      2514|
|          0|         5|        20|
|          0|         2|         2|
|          0|         3|         1|
|          0|         4|         1|
|          1|         2|        15|
|          1|         5|        19|
|          1|         1|      1572|
|          1|         4|         1|
|          1|         3|         3|
|          2|         3|        11|
|          2|         5|        17|
|          2|         1|      1498|
|          2|         4|         1|
|          2|         2|        59|
|          3|         1|      2894|
|          3|         3|        27|
|          3|         2|       200|
|          3|         5|        12|
|          4|         3|        23|
+-----------+----------+----------+
only showing top 20 rows



Now we agg in the same way to obtain avg rate and total trips. 

In [30]:
ratecodeEvolutionPerHourAvg = ratecodeEvolutionPerHour.groupBy("pickup_hour") \
    .agg(sum("totalTrips").alias("totalTrips"), avg("RatecodeID").alias("avgRate")).orderBy("pickup_hour")

ratecodeEvolutionPerHourAvg.show()

+-----------+----------+------------------+
|pickup_hour|totalTrips|           avgRate|
+-----------+----------+------------------+
|          0|      2538|               3.0|
|          1|      1610|               3.0|
|          2|      1586|               3.0|
|          3|      3133|              2.75|
|          4|      8600|               3.0|
|          5|     15445|               3.0|
|          6|     18867|               3.0|
|          7|     16840|               3.0|
|          8|     15348|               3.0|
|          9|     15564|              19.0|
|         10|     16001|              19.0|
|         11|     16082|17.142857142857142|
|         12|     17483|               3.0|
|         13|     17843|               3.5|
|         14|     16160|               3.0|
|         15|     18664|               3.0|
|         16|     22121|               3.0|
|         17|     21598|              19.0|
|         18|     20318|               3.0|
|         19|     20884|        