# Advanced Data Analysis Project: NYC Taxi Dataset

Useful References:
* [ACM DEBS 2015 Grand Challenge](http://www.debs2015.org/call-grand-challenge.html)
* [Spark web site](https://spark.apache.org/)
* [Spark MLlib main page](https://spark.apache.org/mllib/)
* [Spark MLlib guide](https://spark.apache.org/docs/latest/ml-guide.html)
* [Spark GraphX main page](https://spark.apache.org/graphx/)
* [Spark GraphFrames main page](https://graphframes.github.io/graphframes/docs/_site/index.html)
* [Spark GraphFrames User Guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html)

In [None]:
import pandas as pd
import numpy as np
import os
import sys
from pyspark.sql import SparkSession

In [None]:
use_sample = True

if 'DATABRICKS_RUNTIME_VERSION' in os.environ : 
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("Group project") \
        .getOrCreate()

    spark.conf.set("fs.azure.sas.data.novasbeadatrab.blob.core.windows.net",
      "https://novasbeadatrab.blob.core.windows.net/?sv=2019-12-12&ss=b&srt=co&sp=rl&se=2020-12-19T06:59:10Z&st=2020-11-22T22:59:10Z&spr=https&sig=jMOoA0U33yOje%2F5mRkcJYuIEbM6K3i02zGKk4p%2BGXkc%3D")
    if use_sample:
        FILENAME = "wasbs://data@novasbeadatrab.blob.core.windows.net/sample.csv"
    else:
        FILENAME = "wasbs://data@novasbeadatrab.blob.core.windows.net/sorted_data.csv"    
else:
    if use_sample:
        FILENAME = "data/sample.csv"
    else:
        FILENAME = "data/sorted_data.csv"

In [None]:
import matplotlib.pyplot as plt
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

-----
## Simple statistics

First program prints simple statistics.

In [None]:
import matplotlib.pyplot as plt
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Group project") \
    .getOrCreate()

In [None]:
import matplotlib.pyplot as plt
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Group project") \
    .getOrCreate()

mySchema = StructType([
    StructField("medallion", StringType()),
    StructField("hack_license", StringType()),
    StructField("pickup_datetime", TimestampType()),
    StructField("dropoff_datetime", TimestampType()),
    StructField("trip_time_in_secs", IntegerType()),
    StructField("trip_distance", DoubleType()),
    StructField("pickup_longitude", DoubleType()),
    StructField("pickup_latitude", DoubleType()),
    StructField("dropoff_longitude", DoubleType()),
    StructField("dropoff_latitude", DoubleType()),
    StructField("payment_type", StringType()),
    StructField("fare_amount", DoubleType()),
    StructField("surcharge", DoubleType()),
    StructField("mta_tax", DoubleType()),
    StructField("tip_amount", DoubleType()),
    StructField("tolls_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
])

dataset = spark.read.load(FILENAME, format="csv", 
                         sep=",", schema=mySchema, header="false")
dataset.createOrReplaceTempView("data")

statistics = spark.sql( """SELECT COUNT( DISTINCT medallion) AS num_medallion, 
                                  COUNT( DISTINCT hack_license) AS num_license,
                                  MIN( pickup_datetime) AS min_pickup,
                                  MAX( dropoff_datetime) AS max_dropoff,
                                  MAX( trip_time_in_secs) AS max_trip_time,
                                  MAX( trip_distance) AS max_trip_distance,
                                  MAX( total_amount) AS max_total_amount
                                  FROM data""")
statistics.show()


-----
## Ploting information

### First plot

This first plot helps showing that data has several invalid values.

Let's plot the pickups.

In [None]:
import matplotlib.pyplot as plt
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Group project") \
    .getOrCreate()

mySchema = StructType([
    StructField("medallion", StringType()),
    StructField("hack_license", StringType()),
    StructField("pickup_datetime", TimestampType()),
    StructField("dropoff_datetime", TimestampType()),
    StructField("trip_time_in_secs", IntegerType()),
    StructField("trip_distance", DoubleType()),
    StructField("pickup_longitude", DoubleType()),
    StructField("pickup_latitude", DoubleType()),
    StructField("dropoff_longitude", DoubleType()),
    StructField("dropoff_latitude", DoubleType()),
    StructField("payment_type", StringType()),
    StructField("fare_amount", DoubleType()),
    StructField("surcharge", DoubleType()),
    StructField("mta_tax", DoubleType()),
    StructField("tip_amount", DoubleType()),
    StructField("tolls_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
])

dataset = spark.read.load(FILENAME, format="csv", 
                         sep=",", schema=mySchema, header="false")
dataset.createOrReplaceTempView("data")

# Plotting all points is probably too much in many computers, so lets plot only a few 1000's
data = spark.sql( "SELECT * FROM data LIMIT 10000")
pickups = data.collect()

print('Plotting pickups')
plt.scatter([row.pickup_longitude for row in pickups],
            [row.pickup_latitude for row in pickups],
            s=1)
plt.show()


### Plotting heatamps

Plotting **plot heatmaps** and how to **create a grid** with coordinates. We need to group nearby coordinates together or (almost) every coordinate will be unique.

The example creates a grid, with cell of 150m of side (commented out the code for cells of 500m of side).

\[From ACM DEBS 2015 Grand Challenge page\]

**Question 1**: Is the earth flat or how to map coordinates to cells?

**Answer**: For the challenge we allow a simplified flat earth assumption for mapping coordinates to cells in the queries. You can assume that a distance of 500 meter south corresponds to a change of 0.004491556 degrees in the coordinate system. For moving 500 meter east you can assume a change of 0.005986 degrees in the coordinate system.

In [None]:
import matplotlib.pyplot as plt
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Group project") \
    .getOrCreate()

#Squares of 500 meters
latitudeStep = 0.004491556
longitudeStep = 0.005986
northLatitude = 41.474937 - 0.5 * latitudeStep
southLatitude = northLatitude - 300 * latitudeStep
eastLongitude = -74.913585 - 0.5 * longitudeStep
westLongitude = eastLongitude + 300 * longitudeStep

# Squares of 150 meters
#latitudeStep = 0.0013474668
#longitudeStep = 0.0017958
#northLatitude = 40.95
#southLatitude = northLatitude - 300 * latitudeStep
#eastLongitude = -74.2
#westLongitude = eastLongitude + 300 * longitudeStep

# function to round longitude to a point in the middle of the square
def longiRound( val):
    return ((val - eastLongitude) // longitudeStep) * longitudeStep + eastLongitude + longitudeStep / 2
spark.udf.register("longround", longiRound, DoubleType())

# function to round latitude to a point in the middle of the square
def latRound( l):
    return northLatitude - ((northLatitude - l) // latitudeStep) * latitudeStep - latitudeStep / 2
spark.udf.register("latround", latRound, DoubleType())

mySchema = StructType([
    StructField("medallion", StringType()),
    StructField("hack_license", StringType()),
    StructField("pickup_datetime", TimestampType()),
    StructField("dropoff_datetime", TimestampType()),
    StructField("trip_time_in_secs", IntegerType()),
    StructField("trip_distance", DoubleType()),
    StructField("pickup_longitude", DoubleType()),
    StructField("pickup_latitude", DoubleType()),
    StructField("dropoff_longitude", DoubleType()),
    StructField("dropoff_latitude", DoubleType()),
    StructField("payment_type", StringType()),
    StructField("fare_amount", DoubleType()),
    StructField("surcharge", DoubleType()),
    StructField("mta_tax", DoubleType()),
    StructField("tip_amount", DoubleType()),
    StructField("tolls_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
])

dataset = spark.read.load(FILENAME, format="csv", 
                         sep=",", schema=mySchema, header="false")

# Let's filter data outside of the box and build a grid
# Points in each square are mapped to the center of the square.
dataset.createOrReplaceTempView("data")
filteredDataDF = spark.sql( """SELECT medallion, hack_license, pickup_datetime,
                                    dropoff_datetime, trip_time_in_secs, trip_distance,
                                    longround(pickup_longitude) AS pickup_longitude, 
                                    latround(pickup_latitude) AS pickup_latitude,
                                    longround(dropoff_longitude) AS dropoff_longitude, 
                                    latround(dropoff_latitude) AS dropoff_latitude, 
                                    payment_type, fare_amount, mta_tax, 
                                    tip_amount, tolls_amount, total_amount
                                  FROM data
                                  WHERE pickup_longitude >= """ + str(eastLongitude) + """ AND
                                  pickup_longitude <=  """ + str(westLongitude) + """ AND
                                  dropoff_longitude >=  """ + str(eastLongitude) + """ AND
                                  dropoff_longitude <=  """ + str(westLongitude) + """ AND
                                  pickup_latitude <= """ + str(northLatitude) + """ AND
                                  pickup_latitude >= """ + str(southLatitude) + """ AND
                                  dropoff_latitude <=  """ + str(northLatitude) + """ AND
                                  dropoff_latitude >=  """ + str(southLatitude))
filteredDataDF.createOrReplaceTempView("data")

# Frequency for pickups
pickupsDF = spark.sql( """SELECT pickup_longitude, pickup_latitude, count(*) AS cnt
                                  FROM data
                                  GROUP BY pickup_longitude, pickup_latitude""")
pickups = pickupsDF.collect()

print('Plotting pickups')
p = plt.scatter([row.pickup_longitude for row in pickups],
            [row.pickup_latitude for row in pickups],
            c=[row.cnt for row in pickups],s=1,cmap="rainbow")
plt.colorbar(p)
plt.show()

# Statistics for pickups
dropoffsDF = spark.sql( """SELECT dropoff_longitude, dropoff_latitude, count(*) AS cnt
                                  FROM data
                                  GROUP BY dropoff_longitude, dropoff_latitude""")
dropoffs = dropoffsDF.collect()

print('Plotting pickups')
p = plt.scatter([row.dropoff_longitude for row in dropoffs],
            [row.dropoff_latitude for row in dropoffs],
            c=[row.cnt for row in dropoffs],s=1,cmap="rainbow")
plt.colorbar(p)
plt.show()



## Exercise 0: another simple statistics

This example computes, for each license, the number of trips performed.

We have the code using Spark and Pandas, printing the time for doing the computation.
**Draw some conclusions** by comparing the time for performing the computation using Spark and Pandas, and also when using the small and long dataset.

### Code: Spark

In [None]:
import matplotlib.pyplot as plt
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Group project") \
    .getOrCreate()

start_time = time.time()
mySchema = StructType([
    StructField("medallion", StringType()),
    StructField("hack_license", StringType()),
    StructField("pickup_datetime", TimestampType()),
    StructField("dropoff_datetime", TimestampType()),
    StructField("trip_time_in_secs", IntegerType()),
    StructField("trip_distance", DoubleType()),
    StructField("pickup_longitude", DoubleType()),
    StructField("pickup_latitude", DoubleType()),
    StructField("dropoff_longitude", DoubleType()),
    StructField("dropoff_latitude", DoubleType()),
    StructField("payment_type", StringType()),
    StructField("fare_amount", DoubleType()),
    StructField("surcharge", DoubleType()),
    StructField("mta_tax", DoubleType()),
    StructField("tip_amount", DoubleType()),
    StructField("tolls_amount", DoubleType()),
    StructField("total_amount", DoubleType()),
])

dataset = spark.read.load(FILENAME, format="csv", 
                         sep=",", schema=mySchema, header="false")
dataset.createOrReplaceTempView("data")
statistics = spark.sql( """SELECT hack_license, COUNT(*) AS cnt FROM data GROUP BY hack_license""")
statistics.show()

end_time = time.time()

print( "Runtime = " + str(end_time - start_time))

### Results (Spark)

**In our computer:**

The time to process the small dataset was : **TO COMPLETE** seconds. 10.244155168533325

The time to process the large dataset was : **TO COMPLETE** seconds. 830.9792747497559

**In Azure Databricks:**

The time to process the small dataset was : **TO COMPLETE** seconds. 8.084648132324219

The time to process the large dataset was : **TO COMPLETE** seconds. 525.8180425167084

### Code: Pandas library

In [None]:
import pandas as pd
import time

start_time = time.time()
mySchema = ["medallion", "hack_license", "pickup_datetime",
            "dropoff_datetime", "trip_time_in_secs", "trip_distance",
            "pickup_longitude", "pickup_latitude", "dropoff_longitude",
            "dropoff_latitude", "payment_type", "fare_amount", 
            "surcharge", "mta_tax", "tip_amount",
            "tolls_amount", "total_amount"]

dataset = pd.read_csv(FILENAME,names=mySchema)
result = dataset.groupby("hack_license").count()
print(result)

end_time = time.time()

print( "Runtime = " + str(end_time - start_time))

### Results (Pandas)

This will not work in Databricks.

**In our computer:**

The time to process the small dataset was : **44.3519287109375** seconds. 

The time to process the large dataset was : **(could not run)**

#### Results discussion

Sample vs. All data

The sample dataset takes a lot less time than the larger dataset, when run both locally and on Azure Databricks. This result was expected due to their difference in size: the small datset comprises data of taxi trips of 20 days (roughly 2 million events; 130 MB) while the large dataset has data fot the whole year of 2013 (roughly 173 million events; 33GB). Particularly, in Pandas, it was not possible to run the large dataset locally.

Pandas vs. Spark (incomplete)

When running the small dataset locally, Pandas takes a lot more time than Spark (44.35 secs vs. 10.24).
The large dataset was not possible to run on pandas.

Spark locally vs. Spark Azure

When comparing running the code on Spark locally and in Azure Databricks, running both the sample dataset and the large dataset is much faster in Azure Databricks (8.085 secs. and 525.818 secs.) than locally (10.244 secs and 525.818 secs). because Azure Databricks is a cloud-based data engineering tool that  is made to process large sets of data, by storing the files in different servers, and therefore allowing simultaneous processes in multiple machines.

# Data Cleaning

Our first step for this project was to clean our dataset.
#### 1. New York City Limits
First, we only want consider coordinates inside of the New York City area. We also want to eliminate coordinates that might have been misrecorded (like latitudes equal to 0).
For this purpose, we considered a degree of 41 latitude for the most Northern Point of NYC, and and 40.4 for the Southern limit. For the longitude, we considered and limit of - 73.6 for the most eastern boundary and -74.5 for the most western boundary.
#### 2. Trip Distance
Secondly, we want to eliminate any trips that had their distance potentially wrongly recorded, and get rid of outliers. For this, we only considered trips below 25 miles.

#### 3. Trip Time
Then we eliminated every trip with the time marked as 0 seconds, as these were likely wrong entries and/or are not to be considered. We also eliminate trips that were above 1 hour length, and these are considered outliers when looking at the data distribution.

In [None]:
clean = spark.sql("SELECT * FROM data \
                    WHERE pickup_longitude BETWEEN -74.5 AND -73.6 AND pickup_latitude BETWEEN 40.4 AND 41\
                    AND dropoff_longitude BETWEEN -74.5 AND -73.6 AND dropoff_latitude BETWEEN 40.4 AND 41\
                    AND trip_time_in_secs != 0\
                    AND trip_distance < 25\
                    AND trip_time_in_secs < 3600")
clean.createOrReplaceTempView("clean_data")

## Exercise 1

Let's start by trying to help the city to identify which new express bus routes shoud introduce. To this end, you should find the most frequent routes whose distance is above a given treshold (defined by you).

For establishing these routes, we suggest that you use a grid of 500m of side.

In [None]:
#Squares of 500 meters
latitudeStep = 0.004491556
longitudeStep = 0.005986
northLatitude = 41.474937 - 0.5 * latitudeStep
southLatitude = northLatitude - 300 * latitudeStep
eastLongitude = -74.913585 - 0.5 * longitudeStep
westLongitude = eastLongitude + 300 * longitudeStep

def longiRound( val):
    return ((val - westLongitude) // longitudeStep) * longitudeStep + \
                westLongitude + longitudeStep / 2

spark.udf.register("longround", longiRound, DoubleType())

def latRound( l):
    return northLatitude - ((northLatitude - l) // latitudeStep) * \
        latitudeStep - latitudeStep / 2
spark.udf.register("latround", latRound, DoubleType())

top_routes = spark.sql("""SELECT latround(pickup_latitude) as p1_lat, longround(pickup_longitude) as p1_long,\
                    latround(dropoff_latitude) as p2_lat, longround(dropoff_longitude) as p2_long,\
                    COUNT(*) as Metric\
                    FROM clean_data WHERE trip_distance > 2 \
                    GROUP BY p1_lat, p1_long, p2_lat, p2_long \
                    ORDER BY metric DESC""" )
top_routes.createOrReplaceTempView("top_routes")
top_routes.show()

#### Discussion



For this exercise, we considered that it was only worth it to build a new bus route for walking distances between 30 to 40 minutes or above, that is, we assume that individuals may prefer to walk in distances below 40 or 30 minutes. With this logic, we only considered taxi trips above 2 miles.

In a bus with more stops, shorter walks would be considered, but given that this an express bus with just 1 source and 1 destination, we consider slightly longer distance. In the end, all of our top 8 routes were above 3 miles in distance).

We used a grid of 500m of side to join points that are not sinificantly far away from each other.
Group By allows us to have the frequency of trips (our Metric) for each combinations of source and destination. We ordered our results by the ones with the highest Metric value to show the most relevant routes first.

The most common taxi route was from the Terminal area of the LaGardia airport (40.77425426399999, -73.872021) and the area around the intersection of East 49th Street with Park Avenue, in midtown Manhattan (40.756288039999994, -73.973783) (https://goo.gl/maps/X19MU7sMDsWogVDRA). This should be our new bus route.

If we want to build more bus routes, and looking strictly at the pick up and drop off points to make one-destination busroutes, the following are the 7 next to be considered (from most important to less important, as shown in the dataframe):
- https://goo.gl/maps/HWhoiyRwTWTo6rgCA (from Midtwon Manhattan to La Guardia Airport)
- https://goo.gl/maps/iQPYpaJQSsMq29Gx7 (from La Guardia Airport to Midtwon Manhattan)
- https://goo.gl/maps/Yxh1VPD3RXGTbJsu8 (from La Guardia Airport to Midtwon Manhattan)
- https://goo.gl/maps/Hp2TE6N9cT8GWvWb8 (from La Guardia Airport to Midtwon Manhattan)
- https://goo.gl/maps/AWdeaNnvzS3W7JkGA (from La Guardia Airport to Midtwon Manhattan)
- https://goo.gl/maps/P8JaoC3sNDNjwVS29 (from Midtown Manhattan to La Guardia Airport)
- https://goo.gl/maps/V7dmBGK6osHGpKeYA (from Midtown Manhattan to the Upper East side)
- https://goo.gl/maps/ocVaU7VyPE9tciha6 (from Midtown Manhattan to La Guardia Airport)
- https://goo.gl/maps/qX5G7P3BZYx2fhSB9 (from La Guardia Airport to Midtwon Manhattan)

However, if we wanted to combine the 10 routes above, we could plan the following:
- From the 6 routes above that go from the airport to Midtown Manhatten, one bus from Lagardia Airport that stops in those 6 different areas of Midtown Manhattan
- From the 3 routes that go from Midtown Manhattan to the La Guardia Airport, have a bus that passes through those 3 Midtown Manhattan areas and goes to the airport.
- Have a 3rd express bus as the one above, from Midtown Manhattan to the Upper East side.

## Exercise 2

The second question intends to help taxi drivers to decide to which area of the city they should go next. To this end, we could have a web site/mobile app where the drivers could check the best area at a given moment. 
To support such application efficiently, it would be necessary to have a pre-computed index with the value for each area and period of time (e.g. combining the week day and a period of one hour). 

You should create the program to create such index. The output tuples should be something like: longitude, latitude, day_of_week, hour value.

Define your own metric for the value of an area. Parameters that may be included in such metric include: the number of pickups in the area, the amount collected in the trip, the average time a taxi is idle in the area, etc.

Besides presenting the code, explain the rationale of your solution.

**Note:** SQL functions date(col), dayofweek(col) and hour(col) return, respectively, the date, day of week and hour of a datatime in column col.

In [None]:
print('Hello, taxi driver! Thank you for visiting our app. Our algorithm will help you decide where you should go next.')

print('\nGetting our data...')
#gives 3 extra fields: date, day of the week, and hour
data_time = spark.sql("SELECT *, date(pickup_datetime) as Date, dayofweek(pickup_datetime) as Week_Day, \
                        hour(pickup_datetime) as Hour from clean_data")
data_time.createOrReplaceTempView("datatime")

#transforming longitudes and latitudes
code_df=spark.sql("""SELECT *, longround(pickup_longitude) as p1_long, \
                        latround(pickup_latitude) as p1_lat FROM datatime""")
code_df.createOrReplaceTempView("code_df")

#Getting a dataframe with the relevant values for this problem
values_df=spark.sql("""SELECT * FROM(SELECT Week_Day, Hour, p1_long, p1_lat, \
                    count(*) as n_trips, sum(total_amount) AS payment,\
                    sum(trip_time_in_secs)\
                    FROM code_df\
                    GROUP BY Week_Day, Hour, p1_long, p1_lat)\
                    WHERE n_trips > 12""") #firstly we only want to consider areas where there is more than one client every 5 minutes
values_df.createOrReplaceTempView("values")

#Data with the number of trips, and average revenue per trip, per each pick_up location
some_metrics=spark.sql("""SELECT Week_Day, Hour, Week_Day ||'_'|| Hour as code, p1_long, p1_lat, n_trips,\
                    payment, payment/n_trips as avgpay\
                    FROM values""")
some_metrics.createOrReplaceTempView("some_metrics")

#Getting for each hour, four each day of the week, the number of trips above the 75 percentile. 
demand_new=spark.sql("SELECT *, Week_Day1 ||'_'|| Hour1 as code1 FROM\
                        (SELECT Week_Day as Week_Day1, Hour as Hour1, percentile(n_trips, 0.75) as percentile_75 FROM some_metrics\
                        GROUP BY Week_Day, Hour\
                        ORDER BY Week_Day, Hour)")
demand_new.createOrReplaceTempView("demand_new")

#Joining the two dataframes, to have a columnn with the Percentile 75 of n_trips
join_demand=spark.sql("SELECT * FROM some_metrics JOIN demand_new ON some_metrics.code=demand_new.code1").drop('code').drop('Week_Day1','Hour1')
join_demand.createOrReplaceTempView("join_demand")

#Eliminating, for each hour for each day of the week, the pick up locations where the number of trips is below the percentile 75
selected_demand=spark.sql("SELECT *, Week_Day ||'_'|| Hour as code FROM\
                            (SELECT Week_Day, Hour, p1_long, p1_lat, n_trips, avgpay FROM join_demand\
                            WHERE n_trips > percentile_75)")
selected_demand.createOrReplaceTempView("selected_demand")

print('Here are several good locations you can go to. Rank is given by the average revenue per trip. Please wait a moment.')
#Ranking, for each hour for each day of the week, the selected locations, by their average revenue per trip
locations_ranked = spark.sql(""" SELECT * FROM \
                                (SELECT Week_Day, Hour, MAX(avgpay) as metric, p1_lat,  p1_long,\
                                row_number() over (partition by Week_Day, Hour ORDER BY MAX(avgpay) desc) as max_rank_metric\
                                FROM selected_demand
                                WHERE n_trips >= 5 
                                GROUP BY Week_Day, Hour, p1_lat,  p1_long
                                ORDER BY Week_Day, Hour, max_rank_metric ASC)""")

locations_ranked.show(30)
locations_ranked.createOrReplaceTempView("locations_ranked")

print('If you prefer, here is THE BEST location for each hour of the day, for each day of the week.')
the_best_location = spark.sql(""" SELECT * FROM \
                                (SELECT Week_Day, Hour, MAX(avgpay) as metric, p1_lat,  p1_long,\
                                row_number() over (partition by Week_Day, Hour ORDER BY MAX(avgpay) desc) as max_rank_metric\
                                FROM selected_demand
                                WHERE n_trips >= 5 
                                GROUP BY Week_Day, Hour, p1_lat,  p1_long
                                ORDER BY Week_Day, Hour)\
                                WHERE max_rank_metric = 1""")

the_best_location.show(30)
the_best_location.createOrReplaceTempView("the_best_location")

print('Our algorithm is done. We hope it was helpful!')

#### Discussion


**Algorithm rationale:**

Firstly, because we wanted our algorithm to recommend areas where the driver wouldn't have to wait more than 5 minutes, we started by eliminating the areas where the number of trips per hour was below or equal to 12, that is, we only considered areas where there is more than 1 client every 5 minutes.

Secondly, our algorithm eliminates pick up areas where the number of trips (for every hour, for every day of the week) is below the percentile 75 (of number of trips for each hour, for each day of the week).

Lastly our algorithm ranks the selected areas according to the average revenue per trip. The user of the app is able to see the entire rank of the selected trips for each hour, for each day of the week, and also a table with the one best area for each hour, for each day of the week.


**Code explanation:**

The first two statements, "data_time", and "code_df", transform the pickup_datetime column into a week day column and hour column that we can use. 

The third statement, "values_df", returns the number of trips, the sum of all payments of those trips, for each other, for every hour, for every day of the week, and takes into consideration only the pick_up areas where the waiting time is, on average, below 5 minutes (be selecting areas where, for every hour, the number of trips is above 12). 

This concludes the first part of the algorithm.

The next statement, "some_metrics", adds a columns with the average payment per trip, for every pick up area, for every hour and day of the week, and a code (made of the day of the week and the hour).

The "demand_new" statement creates a dataframe with the percentile 75 of the number of trips for each hour and day of the week, and also a code (made of the day of the week and the hour).

After this, we join the two dataframes described above with the "join_demand" statement, and then eliminate the areas where the number of trips is below the percentile 75, with the "selected_demand" statement. 

Thus concluding the second part of our algorithm.

The "locations_ranked" statement returns all of the selected pick up spots, ranked by the respective average revenue per trip. Lastly, the "the_best_location" returns the one unique best location in NYC, for each hour and day of the week.

This concludes the third part of our algorithm.

## Exercise 3

The third question intends to define the location of taxi ranks (the places where taxis stop waiting for collecting clients) in a way that tries to minimize the distance a client needs to travel to reach the taxi rank.

Consider that you want to establish, at least, 100 taxi ranks - present the code that defines the number and locations of the ranks.

**Note:** This dataset is for NYC taxis. So, pickups outside of the city are infrequent and not representative of the demand in such areas. As such, you should focus on pickups in a square that includes NYC (it is ok if the square includes parts outside of the city). Use, for example, the following square:
```
northLatitude = 40.86
southLatitude = 40.68
eastLongitude = -74.03
westLongitude = -73.92
```

**Suggestion:** Plot your results as a heatmap, with the color being a measurement of the value of the taxi rank; use the visual feedback to enhance your solution.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.clustering import KMeans
import sklearn
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id 

#create DF with pickups limited to NYC
pickupsDF = spark.sql( """SELECT x, y, cnt
                        FROM(SELECT pickup_longitude as x , pickup_latitude as y, count(*) AS cnt
                                  FROM clean_data
                                  WHERE pickup_latitude<40.87 AND  pickup_latitude>40.68 AND
                                  pickup_longitude<-73.92 AND pickup_longitude > -74.03
                                  GROUP BY x, y)
                                  """)

#create vector with coordinates of pickup and dropoff
assembler = VectorAssembler(inputCols=['x','y'],outputCol="features")
assembler.transform(pickupsDF)
dataset = assembler.transform(pickupsDF)

#find 1centroid for each cluster (create 100 clusters)
kmeans = KMeans().setK(100).setSeed(1)
model = kmeans.fit(dataset)
centers = model.clusterCenters() 

print("Cluster Centers: ")
for center in centers:
       print(center)

In [None]:
ce = pd.DataFrame(centers)

centersdf = spark.createDataFrame(ce,schema=["lon", "lata"])

centroids = centersdf.select("*").withColumn("ida", monotonically_increasing_id())
w = Window.orderBy('ida')
centroids2=centroids.withColumn('index',row_number().over(w)-1)
centroids2=centroids2.drop('ida')
centroids2.createOrReplaceTempView("centroidsdf")

#apply the model to our dataset to find the cluster each point belongs to
predictions = model.transform(dataset)
predictions.createOrReplaceTempView("predi")

predictions_coord = spark.sql("""SELECT * FROM predi JOIN centroidsdf ON predi.prediction=centroidsdf.index""")
predictions_coord=predictions_coord.drop('index')
predictions_coord.createOrReplaceTempView("pred_coord")
predictions_coord.show()

#Frequency for each cluster
cluster_count = spark.sql("""SELECT prediction, lon, lata, count(*) as counter \
                            FROM pred_coord GROUP BY lon, lata, prediction""")

cluster_count.createOrReplaceTempView("clust_count")

rank_center = spark.sql("""SELECT *, ROW_NUMBER() OVER( ORDER BY counter) Taxi_Rank\
                            FROM clust_count""")

rank_center.show()

rank = rank_center.collect()

In [None]:
#Plot center of each 100 cluster
plt.style.use('seaborn-whitegrid')
plt.figure(figsize=(10,6))
plot_cluster = plt.scatter([row.lon for row in rank],
            [row.lata for row in rank],
           c=[row.Taxi_Rank for row in rank],s=3,cmap="rainbow")
plt.colorbar(plot_cluster)
plt.show()

#### Discussion

We start by limiting our window of action to NYC limits to ignore trips that might be outside of that area.
Afterwards, we use kmeans to create 100 clusters - where the center of each cluster will be the location of the taxi rank. Kmeans allows to group points in such a way that ensures that each point will be allocated to a group where its distance to the center of that group will be lower to the center of any other group. In this context, by creating 100 points with kmeans, we ensure that they are located in places that will allow clients to walk as minimum distance possible.

The "centroids2" df, gives us the coordinates of each centroid (which is numbered form 0 to 99).

In "predictions_coord" we have the cluster each pickup point belongs to, as well as the coordinates of the center of the cluster.

"rank" orders the taxi stops by the number of pickups that exist in the corresponding cluster. The #1 rank is the one with the lowest number of pickups. We plot the centers of each cluster with a color for each rank - a warmer color means that the rank number is closer to 100, which means the corresponding cluster has a higher number of trips. We can clearly see that the clusters with a higher number of pickups are located in the center.


## Exercise 4

Renova is a portuguese company that sells paper-derived products, such as toilet paper, paper towels, etc. 
The company plans to enter the market in NYC with an aggressive marketing strategy: it wants to reach as many people as possible by identifying communities where activity is tightly connected, and placing one billboard in each community, at its busiest location.

In [None]:
pip install GraphFrames

In [None]:
from graphframes import *

In [None]:
#Grid for close locations

#Squares of 150 meters
latitudeStep = 0.0013474668
longitudeStep = 0.0017958
northLatitude = 40.95
southLatitude = northLatitude - 300 * latitudeStep
eastLongitude = -74.2
westLongitude = eastLongitude + 300 * longitudeStep


def longiRound( val):
    return ((val - westLongitude) // longitudeStep) * longitudeStep + \
                westLongitude + longitudeStep / 2

spark.udf.register("longround", longiRound, DoubleType())

def latRound( l):
    return northLatitude - ((northLatitude - l) // latitudeStep) * \
        latitudeStep - latitudeStep / 2
spark.udf.register("latround", latRound, DoubleType())

#only consider trips with a distance lower than 1 mile

filter_distance = spark.sql("""SELECT * FROM clean_data WHERE trip_distance < 1""")
filter_distance.createOrReplaceTempView("filtered_dist")

#only consider trips in NYC area 

filter_NYC = spark.sql("""SELECT * FROM(SELECT longround(pickup_longitude) as p1_long, latround(pickup_latitude) as p1_lat,\
                   longround(dropoff_longitude) as p2_long,latround(dropoff_latitude) as p2_lat,
                   COUNT(*) as ntripsnyc FROM filtered_dist\
                  GROUP BY p1_long, p1_lat, p2_long, p2_lat \
                   ORDER BY ntripsnyc DESC)
                   WHERE p1_lat<40.87 AND p2_lat<40.87 AND p1_lat>40.68 AND p2_lat>40.68
                                 AND p1_long<-73.92 AND p2_long<-73.92 AND p1_long > -74.03
                                 AND p2_long > -74.03""" ) 
filter_NYC.createOrReplaceTempView("filtered_NYC")

#exclude locations that have a residual amount of trips (3)
filter_ntrips = spark.sql("""SELECT * FROM filtered_NYC WHERE ntripsnyc>3""")
filter_ntrips.createOrReplaceTempView("relevant_locations")

#define source (pickup coords) and destination (drop off coords)
preprocDF = spark.sql( """SELECT concat('(',p1_long,',',p1_lat,')') AS src, concat('(',p2_long,',',p2_lat,')') AS dst,
p1_long, p1_lat,
p2_long, p2_lat FROM relevant_locations""")
preprocDF.createOrReplaceTempView("part1")

#define relation between source and destination
edges = spark.sql( """SELECT src, dst, 'trip' FROM part1""")

#create vertex with coords of relevant points
vertex = spark.sql( """SELECT src as id, p1_long as longitude, p1_lat as latitude FROM part1
UNION
SELECT dst as id, p2_long as longitude,
p2_lat as latitude  FROM part1""")

#define graphframe
g = GraphFrame(vertex, edges)

#find communities from network algorithm 
result = g.labelPropagation(maxIter=5)

result.createOrReplaceTempView("communities")

#will only consider as a community if it has at least 50 points inside
count_filter = spark.sql("""SELECT * FROM
                            (SELECT label as label1, count(*) as freq FROM communities GROUP BY label1)
                            WHERE freq >50 """)


count_filter.createOrReplaceTempView("relevant_communities")

points_communities = spark.sql("""SELECT * FROM communities JOIN relevant_communities
                                ON communities.label = relevant_communities.label1 """)


points_communities = points_communities.drop('label1')
points_communities = points_communities.drop('freq')

communities_final = points_communities.collect()

points_communities.createOrReplaceTempView("communitiesdf")

#shows the community each point belongs to and its id
points_communities.show()

#how many communities exist
count_clusters = spark.sql(""" SELECT COUNT (DISTINCT label) FROM communitiesdf; """)

count_clusters.show()



In [None]:
#create dfs with common codes for drop offs, pick ups and communities
code_communities= spark.sql( """SELECT longitude, latitude, label, longitude ||'_'|| latitude as code 
FROM communitiesdf""")

code_communities.createOrReplaceTempView("c_comm")
code_pickup = spark.sql( """SELECT p1_long, p1_lat, p1_long ||'_'|| p1_lat as codep,
count(*) as count_p FROM relevant_locations group by p1_lat, p1_long""")
code_dropoff = spark.sql( """SELECT p2_long, p2_lat, p2_long ||'_'|| p2_lat as coded,
count(*) as count_d FROM relevant_locations group by p2_lat, p2_long, coded""")

code_pickup.createOrReplaceTempView("c_pickup")
code_dropoff.createOrReplaceTempView("c_dropoff")

pickup_dropoff= spark.sql("""SELECT * FROM c_pickup JOIN c_dropoff ON c_pickup.codep=c_dropoff.coded""")

pickup_dropoff.createOrReplaceTempView("pick_drop")

#count the number of times someone has passed by each location (wheter in a pick up or a drop off)

count_all = spark.sql("""SELECT coded, count_p + count_d as count2 FROM pick_drop """)
count_all.createOrReplaceTempView("pick_drop_count")

pickdrop_community = spark.sql("""SELECT * FROM pick_drop_count JOIN c_comm
ON pick_drop_count.coded=c_comm.code""")

pickdrop_community.createOrReplaceTempView("pdc")


#select the location with highest frequency for each community
top_locations= spark.sql("""SELECT * FROM
(SELECT *, row_number() over (partition by label order by count2 desc) as coord_rank FROM pdc
WHERE count2>5) WHERE coord_rank <2""")

top_locations = top_locations.drop('coded')
top_locations = top_locations.drop('count2')


top_locations.show()

top_loc = top_locations.collect()

p = plt.scatter([row.longitude for row in top_loc],
            [row.latitude for row in top_loc],
            marker="+")

print('Plotting pickups')
p = plt.scatter([row.longitude for row in communities_final],
            [row.latitude for row in communities_final],
            c=[row.label for row in communities_final],s=1,cmap="rainbow")
plt.colorbar(p)
plt.show()

#### Discussion

We will consider that the best locations for ADA's billboards will be the ones that are most popular. 
For this, we should condier various locations that allow to 1) reach the highest number of different people and 2) guarantee a high number of visualizations.

We will start by dividing the city in different communities. Each community is defined by people with similar patterns in travelling. By allocating 1 billboard per community, we ensure that we are reaching all of NYC's public.


For this, firstly, we use the grid of 150m of side to join points that are not sinificantly far away from each other that they should be considered 2 different locations - a shorter distance is used because we want to find patterns inside the same community.

We filter our data to only include coordinates inside the area of NYC. We also exclude the coordinates of points that are below a threshold of what we consider to be the minimum number of trips for the location to be relevant : 3. After testing with various numbers, we found that 3 was the minimum amount to better detect communities.

We fount the labelPropagation algorithm from graphFrames be the most appropriate one to find the communities, as it looks for networking relations, meaning that it will join together all points that are connected by similar movements. This is, people that pass by a location in the community will likely pass by another.

In order to be considered a community, we find that the cluster must include at least 50 points. Otherwise, it is not enough to be representative of a pattern. This threshhold was also found trough trial and error.

In total, 6 clusters were found, meaning the company should create 6 billboards.


Finally, for each community, we find the coordinates of the point with most affluency by counting the number of occurences (both pickups and dropoffs) at each pair of longitude and latitude. The top 1 found for each community will be the location of each billboard.