# HW3 - Q3 [35 pts]

## Important Notices

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> add any cells to this Jupyter Notebook, because that will crash the autograder.
</div>

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> implement any additional libraries into this workbook.
</div>

All instructions, code comments, etc. in this notebook **are part of the assignment instructions**. That is, if there is instructions about completing a task in this notebook, that task is not optional.  

<div class="alert alert-block alert-info">
    You <strong>must</strong> implement the following functions in this notebook to receive credit.
</div>

`user()`

`bucket()`

`long_trips()`

`manhattan_trips()`

`weighted_profit()`

`final_output()`

Each method will be auto-graded using different sets of parameters or data, to ensure that values are not hard-coded.  You may assume we will only use your code to work with data from the NYC-TLC dataset during auto-grading.

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove or modify the following utility functions:
</div>

`load_data()`

`main()`

<div class="alert alert-block alert-info">
    Do <strong>not</strong> change the below cell. Run it to initialize your PySpark instance. If you don't get any output, make sure your Notebook's Kernel is set to "PySpark" in the top right corner.
</div>

In [1]:
sc

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1616987294136_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remodify the below cell. It contains the function for loading data and all imports, and the function for running your code.
</div>

In [2]:
#### DO NOT CHANGE ANYTHING IN THIS CELL ####

from pyspark.sql.functions import col
from pyspark.sql import *

def load_data(size='small'):
    # Loads the data for this question. Do not change this function.
    # This function should only be called with the parameter 'small' or 'large'
    
    if size != 'small' and size != 'large':
        print("Invalid size parameter provided. Use only 'small' or 'large'.")
        return
    
    input_bucket = "s3://cse6242-spring2021"
    
    # Load Trip Data
    trip_path = '/'+size+'/yellow_tripdata*'
    trips = spark.read.csv(input_bucket + trip_path, header=True, inferSchema=True)
    print("Trip Count: ",trips.count()) # Prints # of trips (# of records, as each record is one trip)
    
    # Load Lookup Data
    lookup_path = '/'+size+'/taxi*'
    lookup = spark.read.csv(input_bucket + lookup_path, header=True, inferSchema=True)
    
    return trips, lookup

def main(size, bucket):
    # Runs your functions implemented above.
    
    print(user())
    trips, lookup = load_data(size=size)
    trips = long_trips(trips)
    mtrips = manhattan_trips(trips, lookup)
    wp = weighted_profit(trips, mtrips)
    final = final_output(wp,lookup)
    
    # Outputs the results for you to visually see
    final.show()
    
    # Writes out as a CSV to your bucket.
    final.write.csv(bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Implement the below functions for this assignment:
<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> change any function inputs or outputs, and ensure that the dataframes your code returns align with the schema definitions commented in each function
</div>

## 3a. [1 pt] Update the `user()` function
This function should return your GT username, eg: gburdell3

In [3]:
def user():
    # Returns a string consisting of your GT username.
    return 'eperalta6'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3b. [2 pts] Update the `long_trips()` function
This function filters trips to keep only trips greater than or equal to 2 miles.

In [4]:
def long_trips(trips):
    # Returns a Dataframe with Schema the same as :trips:
    long_trips = trips.where("trip_distance >= 2")
    return long_trips

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3c. [6 pts] Update the `manhattan_trips()` function

This function determines the top 20 locations with a `DOLocationID` in manhattan by passenger_count (pcount).

Example output formatting:

```
+--------------+--------+
| DOLocationID | pcount |
+--------------+--------+
|             5|      15|
|            16|      12| 
+--------------+--------+
```

In [5]:
def manhattan_trips(trips, lookup):
    manhattan = lookup.filter("Borough == 'Manhattan'") # Return all stops in Manhattan in the lookup table
    selection = trips.select("DOLocationID", "passenger_count") # Select the two columns of interest from trips

    man_join = selection.join(manhattan, selection.DOLocationID == manhattan.LocationID, "inner") # Join the two sub-tables on the LocationID
    man_join= man_join.select('DOLocationID', 'passenger_count') # Reduce joined table 
    
    man_join = man_join.withColumn('passenger_count', col('passenger_count').cast('double'))

    man_join = man_join.groupBy('DOLocationID').sum('passenger_count') # Group dataframe by DOLocationID and sum the passenger counts for each location
    man_join = man_join.withColumnRenamed('sum(passenger_count)', 'pcount') # Rename passenger count column
    man_join = man_join.orderBy(col('pcount').desc()) # Order sub-table by pcount, descending 
    man_join = man_join.limit(20) # Reduce to 20 items
    
    return man_join

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3d. [6 pts] Update the `weighted_profit()` function
This function should determine the average `total_amount`, the total count of trips, and the total count of trips ending in the top 20 destinations and return the `weighted_profit` as discussed in the homework document.

Example output formatting:
```
+--------------+-------------------+
| PULocationID |  weighted_profit  |
+--------------+-------------------+
|            18| 33.784444421924436| 
|            12| 21.124577637149223| 
+--------------+-------------------+
```

In [9]:
def weighted_profit(trips, mtrips): 
    # Returns a Dataframe with Schema: PULocationID, weighted_profit
    # Note: Use decimal datatype for weighted profit (NOTE: DON'T USE FLOAT)
    # Our grader will be only be checking the first 8 characters for each value in the dataframe
    from pyspark.sql.functions import sum,avg,max,min,mean,count

    #the average total amount per trip and the total count of all trips that start at that location
    pu_df = trips.groupBy('PULocationID').agg(avg('total_amount'), count(col('PULocationID')))
    pu_df = pu_df.withColumnRenamed('avg(total_amount)','avg_total_amount') # Rename columns
    pu_df = pu_df.withColumnRenamed('count(PULocationID)','PULocID_count') # Rename columns 
    
    # total count of trips ending in the top 20 destinations
    pop_df = trips.join(mtrips, trips.DOLocationID == mtrips.DOLocationID).groupBy('PULocationID').count()
   
    # Join popular pickup counts with pickup data frame
    pu_df = pu_df.join(pop_df, ['PULocationID'])
    pu_df = pu_df.withColumnRenamed('count','popular_PU_count')
    
    # Get all trip counts groubBy PULocID
    all_pu = trips.groupBy('PULocationID').count()
    all_pu = all_pu.withColumnRenamed('count', 'total_number_of_trips')
    
    # Add total counts to PickUps Dataframe
    pu_df = pu_df.join(all_pu, ['PULocationID'])
    
    pu_df= pu_df.withColumn('weighted_profit', (col('popular_PU_count')/col('total_number_of_trips'))*col('avg_total_amount'))

    pu_df = pu_df.select('PULocationID','weighted_profit')
    
    return pu_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3e. [5 pts] Update the `final_output()` function
This function will take the results of `weighted_profit`, links it to the `borough` and `zone` and returns the top 20 locations with the highest `weighted_profit`.

Example output formatting:
```
+------------+---------+-------------------+
|    Zone    | Borough |  weighted_profit  |
+----------------------+-------------------+
| JFK Airport|   Queens|  16.95897820117925|
|     Jamaica|   Queens| 14.879835188762488|
+------------+---------+-------------------+
```

In [10]:
def final_output(wp, lookup): 
    # Returns a Dataframe with Schema: Zone, Borough, weighted_profit
    # Note: Use decimal datatype for weighted profit (NOTE: DON'T USE FLOAT)
    # Our grader will be only be checking the first 8 characters for each value in the dataframe
    final_df = lookup.join(wp, lookup.LocationID == wp.PULocationID).select('Zone', 'Borough', 'weighted_profit').orderBy(col('weighted_profit').desc()).limit(20)
    return final_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<div class="alert alert-block alert-info">
    Test your code on the small dataset first, as the large dataset will take a significantly longer time to run
</div>

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> use the same bucket url for multiple runs of the `main()` function, as this will cause errors. Make sure to change the name of your output location every time. (ie: s3://cse6242-gburdell3/output-small2)
</div>

Update the below cell with the path to your bucket, then run the below cell to run your code to store the results in S3.

When you have confirmed the results of the small dataset, run it again using the large dataset. Your output file will appear in a folder in your s3 bucket called YOUROUTPUT.csv as a csv file with a name something like part-0000-4d992f7a-0ad3-48f8-8c72-0022984e4b50-c000.csv. Download this file and rename it to q3_output.csv for submission. Do not make any other changes to the file. 

In [13]:
# # update your bucket path
# bucket = 's3://cse6242-eperalta6/output-small'
# main('small', bucket)

bucket = 's3://cse6242-eperalta6/output-large'
main('large', bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

eperalta6
Trip Count:  187203269
+--------------------+-------------+------------------+
|                Zone|      Borough|   weighted_profit|
+--------------------+-------------+------------------+
|        Baisley Park|       Queens| 29.36045577913085|
|Flushing Meadows-...|       Queens|27.304845733617668|
|       South Jamaica|       Queens| 26.29491623987347|
|     Randalls Island|    Manhattan|24.150989940227525|
|        Astoria Park|       Queens| 21.70641711214752|
|Briarwood/Jamaica...|       Queens|19.945064631789336|
|Springfield Garde...|       Queens|19.468309288781906|
|             Jamaica|       Queens|19.283943000137903|
|              Corona|       Queens| 18.22876924815598|
|   LaGuardia Airport|       Queens|18.181338808373006|
|         Jamaica Bay|       Queens| 17.10052944675789|
|             Maspeth|       Queens|17.005450640079545|
|Eltingville/Annad...|Staten Island| 16.83776475694445|
|         JFK Airport|       Queens|16.777725348249632|
|        Batter

#### Testing

<div class="alert alert-block alert-info">
    You may use the below cell for any additional testing you need to do, however any code implemented below will not be run or used when grading
</div>

## Load datasets

In [5]:
#Load data set 
trips, lookup = load_data(size='small')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Trip Count:  7667792

In [85]:
trips = long_trips(trips)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
lookup.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

## For question 3c, I need to First, find the 20 most popular drop off locations in the Manhattan borough by finding which of these destinations had the greatest passenger count.

Approach:
* Order by passenger_count, select DoLocationID and passenger_count
* Join the manhattan subtable with my selected sub-table of DOLocationID and passenger count on LocationID
* Limit 20

Functions used: 
* orderBy - function to sort on one or more columns. By default, it orders by ascending.
    * This may not be the best approach because it is ordering by LocID as well. The column to focus on for ordering is passenger_count. Thus, rework approach.

In [89]:
mtrips = manhattan_trips(trips, lookup)
mtrips.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|DOLocationID|pcount|
+------------+------+
|         236|147086|
|         161|117642|
|         230|112155|
|         239|104164|
|         162|101209|
|         238|100189|
|          48| 98784|
|         170| 96109|
|         142| 95053|
|         231| 94621|
|         263| 94548|
|          79| 94421|
|         237| 90335|
|         141| 90061|
|         140| 84212|
|          68| 79559|
|         234| 75786|
|         186| 73863|
|         163| 73145|
|          13| 70693|
+------------+------+

## 3d. [6 pts] Update the `weighted_profit()` function
This function should determine the average `total_amount`, the total count of trips, and the total count of trips ending in the top 20 destinations and return the `weighted_profit` as discussed in the homework document.

Example output formatting:
```
+--------------+-------------------+
| PULocationID |  weighted_profit  |
+--------------+-------------------+
|            18| 33.784444421924436| 
|            12| 21.124577637149223| 
+--------------+-------------------+
```

### Analyze all pickup locations, regardless of borough.
* For each **pickup** location determine
    * the average total amount per trip
    * the total count of all trips that start at that location
    * the count of all trips that start at that location and end at one of most popular drop off locations. 
* Using the above values,
    * determine the proportion of trips that end in one of the popular drop off locations (# trips that end in drop off location divided by total # of trips) and
    * multiply that proportion by the average total amount to get a weighted profit value based on the probability of passengers going to one of the popular destinations.

## Calculate weighted average: 
$$ \frac{Number \space of \space trips \space from \space a \space PickupID \space ending \space in \space the \space top \space 20 \space DOlocations}{Number \space of \space all \space trips \space with \space that\space pickupID} $$

In [152]:
wp = weighted_profit(trips, mtrips)
wp.orderBy(col('weighted_profit').desc()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------------------+
|PULocationID|   weighted_profit|
+------------+------------------+
|           6|31.784444444444446|
|          99|29.342499999999998|
|         187|24.357777777777777|
|          93|24.330137731666948|
|          10| 23.62469433349846|
|         194| 23.12457751514923|
|         215|22.174114939432236|
|         156|21.786556927297667|
|         206| 21.49189349112426|
|           5|          20.29625|
|           8|19.965360000000004|
|         138| 17.92315394960492|
|         171|17.585450424382717|
|         132|16.958978201177352|
|         130|14.879835188762476|
|         105|          14.79625|
|          58|14.613195266272191|
|         219|14.337175219423653|
|         157|13.940437160690953|
|          28|13.327861288088645|
+------------+------------------+
only showing top 20 rows

## Final_output(wp, lookup)
This function:    
* takes the results of weighted_profit,
* links it to the borough and zone through the lookup data frame, and
* returns the top 20 locations with the highest weighted_profit.
* Returns a PySpark DataFrame with the schema (Zone, Borough, weighted_profit)

In [162]:
final_df = lookup.join(wp, lookup.LocationID == wp.PULocationID).select('Zone', 'Borough', 'weighted_profit').orderBy(col('weighted_profit').desc()).limit(20)
final_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------+------------------+
|                Zone|      Borough|   weighted_profit|
+--------------------+-------------+------------------+
|Arrochar/Fort Wad...|Staten Island|31.784444444444446|
|     Freshkills Park|Staten Island|29.342499999999998|
|       Port Richmond|Staten Island|24.357777777777777|
|Flushing Meadows-...|       Queens| 24.33013773166695|
|        Baisley Park|       Queens|23.624694333498464|
|     Randalls Island|    Manhattan| 23.12457751514923|
|       South Jamaica|       Queens|22.174114939432226|
|     Mariners Harbor|Staten Island|21.786556927297667|
|Saint George/New ...|Staten Island| 21.49189349112426|
|       Arden Heights|Staten Island|          20.29625|
|        Astoria Park|       Queens|19.965360000000004|
|   LaGuardia Airport|       Queens| 17.92315394960937|
|  Murray Hill-Queens|       Queens| 17.58545042438271|
|         JFK Airport|       Queens|16.958978201178994|
|             Jamaica|       Queens|14.879835188

In [172]:
final_df.withColumn('weighted_profit', col('weighted_profit').cast('decimal(32,14)')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Zone', 'string'), ('Borough', 'string'), ('weighted_profit', 'double')]