# ECE4150 Final Project: Potential NYC Taxi Profit Analysis
How to run pyspark on Colab:

https://medium.com/@chiayinchen/%E4%BD%BF%E7%94%A8-google-colaboratory-%E8%B7%91-pyspark-625a07c75000

In [1]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libxtst6
Suggested packages:
  libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 45 not upgraded.
Need to get 30.8 MB of archives.
After this operation, 104 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxtst6 amd64 2:1.2.3-1build4 [13.4 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jre-headless amd64 8u402-ga-2ubuntu1~22.04 [30.8 MB]
Fetched 30.8 MB in 4s (7,733 kB/s)
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 121752 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1b

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql.functions import *
from pyspark.sql import *

In [19]:
def load_data():
    # Loads the data for this question. Do not change this function.
    # Loads the data
    input_path = "/content/drive/MyDrive/ECE4150/Final project"

    # Load Trip Data
    trip_path = '/yellow_tripdata09-08-2021.csv'
    # trip_path = '/yellow_tripdata_2019-01_short.csv'
    trips = spark.read.csv(input_path + trip_path, header=True, inferSchema=True)
    print("Trip Count: ",trips.count()) # Prints # of trips (# of records, as each record is one trip)

    # Load Lookup Data
    lookup_path = '/taxi*'
    lookup = spark.read.csv(input_path + lookup_path, header=True, inferSchema=True)

    return trips, lookup

In [5]:
def long_trips(trips):
    # Returns a Dataframe (trips) with Schema the same as :trips:
    df = trips.filter(col("trip_distance")>=2)
    return df

In [6]:
def topDrop_trips(trips, lookup):
    # for all drop location, try to find top100 locations, and find the Borough where is most dropLocations
    # Returns a Dataframe (mtrips) with Schema: DOLocationID, pcount
    df_mtrip_join = trips.join(lookup, col("DOLocationID")==col("LocationID"))
    df_mtrip_join = df_mtrip_join.withColumn("Borough_trips", col("Borough"))
    df_mtrip_group = df_mtrip_join.groupBy("DOLocationID").agg(sum("passenger_count").alias("pcount"), first("Borough_trips").alias("Borough"))
    df_mtrip = df_mtrip_group.orderBy(desc("pcount")).limit(100)
    return df_mtrip

In [7]:
def manhattan_trips(trips, lookup):
    # from topDrop_trips, we will find that Manhattan is the most popular drop Borough
    # Here, we try to find the top30 drop locations in Manhattan
    # Returns a Dataframe (mtrips) with Schema: DOLocationID, pcount
    df_mtrip_join = trips.join(lookup, col("DOLocationID")==col("LocationID")).where(col("borough")=="Manhattan")
    df_mtrip_group = df_mtrip_join.groupBy("DOLocationID").agg(sum("passenger_count").alias("pcount"))
    df_mtrip = df_mtrip_group.orderBy(desc("pcount")).limit(30)
    return df_mtrip

In [27]:
def weighted_profit(trips, mtrips):
    # Returns a Dataframe (wp) with Schema: PULocationID, weighted_profit

    # get the avg_total_amount,and pickup_loction_count
    df_total_amount = trips.groupBy("PULocationID").agg(avg("total_amount").alias("avg_total_amount"),count("*").alias("PLoction_count"))

    # get the count of trips whose droplocations are 30 popular loction
    mtrips = mtrips.withColumnRenamed("DOLocationID", "DOLocationID_mtrips")
    df_top_drop_join = trips.join(mtrips, trips.DOLocationID == mtrips.DOLocationID_mtrips)
    df_top_drop = df_top_drop_join.groupBy("PULocationID", "DOLocationID").agg(count("*").alias("topDrop_count"))
    df_top_drop = df_top_drop.withColumnRenamed("PULocationID", "PULocationID_mtrips")

    # join
    df_top_counts = df_total_amount.join(df_top_drop, df_total_amount["PULocationID"] == df_top_drop["PULocationID_mtrips"])

    # get the weighted profits
    df_weighted = df_top_counts.withColumn("weighted_profit_one", (col("topDrop_count")/col("PLoction_count"))*col("avg_total_amount"))
    df_weighted = df_weighted.groupBy("PULocationID").agg(sum("weighted_profit_one").alias("weighted_profit"))
    df_weighted = df_weighted.select("PULocationID", "weighted_profit")

    return df_weighted

In [28]:
def final_output(wp, lookup):
    # Returns a Dataframe (final) with Schema: Zone, Borough, weighted_profit
    df_join = wp.join(lookup, wp["PULocationID"]==lookup["LocationID"])
    df_final = df_join.select("LocationID","Zone","Borough","weighted_profit").orderBy(desc("weighted_profit"))
    return df_final

In [29]:
def main(bucket1, bucket2):
    trips, lookup = load_data()
    trips = long_trips(trips)
    top_drop = topDrop_trips(trips, lookup)
    top_drop.write.csv(bucket1, mode="overwrite")
    top_drop.show()

    # just select "manhattan"
    mtrips = manhattan_trips(trips, lookup)
    wp = weighted_profit(trips, mtrips)
    final = final_output(wp, lookup)

    # Outputs the results
    final.show()

    # Writes out as a CSV
    final.write.csv(bucket2, mode="overwrite")

In [30]:
path1 = "/content/output"
path2 = "/content/output-result"
main(path1, path2)

Trip Count:  7667792
+------------+------+---------+
|DOLocationID|pcount|  Borough|
+------------+------+---------+
|         236|112777|Manhattan|
|         161| 89818|Manhattan|
|         230| 86085|Manhattan|
|         239| 79939|Manhattan|
|         138| 78368|   Queens|
|         162| 77466|Manhattan|
|         238| 76882|Manhattan|
|          48| 75840|Manhattan|
|         170| 73647|Manhattan|
|         142| 72981|Manhattan|
|         263| 72822|Manhattan|
|         231| 72616|Manhattan|
|          79| 72243|Manhattan|
|         237| 69195|Manhattan|
|         141| 68975|Manhattan|
|         140| 64530|Manhattan|
|          68| 60815|Manhattan|
|         234| 58053|Manhattan|
|         186| 56822|Manhattan|
|         132| 56689|   Queens|
+------------+------+---------+
only showing top 20 rows

+----------+--------------------+-------------+------------------+
|LocationID|                Zone|      Borough|   weighted_profit|
+----------+--------------------+-------------+----