<a href="https://colab.research.google.com/github/smduarte/spbd-2324/blob/main/proj/SPBD2324_Proj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SPBD-2324 Project Assignment

#### version 1.0 - 15/11 (Final)

The project scenario involves a dataset of taxi rides, collected December 2022, in the New York city area.

Each completed taxi ride corresponds to an event in the dataset. A ride comprises several items of information, including the pick-up and drop-off zones/regions within NY City, their respective timestamps, as well as information related to the payment and number of passengers reported by the driver. The full explanation of the available data is provided [here](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf).

A table to convert zone identifiers into proper names is found [here](https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv).

The project assignment will comprise a set of queries. All must be solved using Spark SQL Dataframes API. One query **of your choice** needs to be solved twice more, using Spark Core (mandatory) and, either using the SQL flavor of SparkSQL or using MrJOB.

# Queries

## Q1 - Basic Statistics

Compute for each day of the week, the total number of rides, the average ride duration, cost and distance travelled.

## Q2 Top-5 New York **boroughs**

Compute the top-5 New York **boroughs** most popular for pick-ups and dropoffs, for the whole month and for each day of the week, separately.

## Q3 - Compute a list of anomalous rides.

Anomalous rides are those that deviate, significantly, either in terms of cost or distance travelled, from rides that started and ended in the same zone.

## Q4 - Find the which zones tend to generate shorter rides and which generate longer rides.

 Consider a ride short or long, respectively, if it less or more than 30% than the average distance for rides that originate in that zone.

## Q5 - Find most important city zones using the Pagerank metric.

  Consider the graph where locations/zones (vertices) are connected by the
  taxi rides (edges). Locations that have many incoming rides, ie., those that are the dropoff location for many rides, will tend to be important hubs (centers of activity) in the city. Use Pagerank to find these hubs.
  
  To that end, to simplify the graph, do not consider rides that involve "Unknown" zones. Additionally, for each zone, only consider the rides that start in that zone and end in the top-5 destinations for that zone (This will remove the edges corresponding to (src-dst) zone pairs that are not very popular).

  Use the [GraphFrames API](https://graphframes.github.io/graphframes/docs/_site/index.html) and check the example below for a simple PageRank computation.

# Deadline
 + 8th December - 23h59
 + For each day late, a penalty of 0.5/20 grade points applies.

# Helper Code

The cells below show how to download the dataset and
start processing it using Spark Core and SparkSQL.

In [None]:
#@title Download Dataset
!wget -q -O taxirides.csv.gz https://shorturl.at/mzHKY
!gzip -cd taxirides.csv.gz | head -1000 > taxirides.csv

In [None]:
#@title Install PySpark
!pip install --quiet pyspark

In [None]:
#@title Spark Core Example
import pyspark

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()
sc = spark.sparkContext

try :
  rides = sc.textFile('taxirides.csv.gz')

  for ride in rides.take(10):
    print(ride)

  sc.stop()
except Exception as e:
  print(e)
  sc.stop()

In [None]:
#@title SparkSQL Example
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()

try :
  trips = spark.read.csv(path = "taxirides.csv", header= True, inferSchema= True )
  trips.printSchema()
  trips.show(5)

except Exception as err:
  print(err)

In [None]:
#@title Q4 Graphframes

!pip install --quiet graphframes

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from graphframes import *

import warnings
warnings.filterwarnings("ignore")

try:

  spark = SparkSession.builder.master('local[*]') \
          .config('spark.jars.packages', 'graphframes:graphframes:0.8.3-spark3.5-s_2.12')\
          .appName('Graphframes example').getOrCreate()


  # Create a Vertex DataFrame with unique ID column "id"
  v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ], ["id", "name", "age"])


  # Create an Edge DataFrame with "src" and "dst" columns
  e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ], ["src", "dst", "relationship"])

  # Create a GraphFramefrom graphframes import *
  g = GraphFrame(v, e)

  # Query: Get in-degree of each vertex.
  g.inDegrees.show()

  # Query: Count the number of "follow" connections in the graph.
  g.edges.filter("relationship = 'follow'").count()

  # Run PageRank algorithm, and show results.
  results = g.pageRank(resetProbability=0.01, maxIter=20)
  results.vertices.select("id", "pagerank").show()

except Exception as err:
  print(err)
