# Queens Trips
### On this example, we'll take a look at how long the taxi trips that originated in the borough of Queens took

#### First, let's set up our environment and connections

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import percentile_approx, count

In [None]:
db_url = "jdbc:postgresql://demo_db:5432/demo_db"
properties = {
    "user": "user",
    "password": "shhh_its_a_secret",
    "driver": "org.postgresql.Driver"
}

In [None]:
spark = SparkSession.builder.config("spark.jars", "/opt/spark_jars/postgresql-42.7.1.jar"). \
    master("spark://spark-master:7077").getOrCreate()

#### There are two tables in the database: taxi trips and locations

In [None]:
locations = spark.read.jdbc(db_url, 'locations', properties = properties)
taxi_trips = spark.read.jdbc(db_url, 'taxi_trips', properties = properties)

In [None]:
queens_ids = locations.filter(locations['borough'] == 'Queens')

In [None]:
queens_trips = taxi_trips.join(
    queens_ids.withColumnRenamed('id', 'pickup_location_id'),
    'pickup_location_id'
).select(taxi_trips.columns)

#### How many rows do we have after that?

In [None]:
queens_trips.count()

#### Alright, that's a lot. we'd better keep operating inside spark as much as we can.
#### Let's check those trip times

In [None]:
queens_trips = queens_trips.withColumn(
    'trip_duration_minutes',
    (queens_trips['dropoff_ts'].cast('long') - queens_trips['pickup_ts'].cast('long')) / 60
)

In [None]:
queens_trips.select('trip_duration_minutes').summary().show()

#### Looks like we have some outliers. Trips that take negative time and at least one that took almost 24h. Let's take them out.

In [None]:
queens_trips = queens_trips.filter(queens_trips['trip_duration_minutes'] > 0)

In [None]:
perc_99 = queens_trips.select(percentile_approx('trip_duration_minutes', 0.99)).collect()[0][0]

In [None]:
perc_99

In [None]:
queens_trips = queens_trips.filter(queens_trips['trip_duration_minutes'] < perc_99) 

#### Let's check that summary again

In [None]:
queens_trips.select('trip_duration_minutes').summary().show()

#### Much nicer. Let's plot the distribution

In [None]:
queens_trips.select('trip_duration_minutes').toPandas().hist(bins = 20)

#### Most of the trips are below 20 minutes. Let's check that per destination.

In [None]:
queens_trips_cols = queens_trips.columns

In [None]:
queens_trips = queens_trips.join(
    locations.withColumnRenamed('id', 'dropoff_location_id'),
    'dropoff_location_id'
).withColumnRenamed(
    'borough',
    'dropoff_borough'
).select(queens_trips_cols + ['dropoff_borough'])

In [None]:
queens_trips.select('dropoff_borough').distinct().show()

In [None]:
queens_trips.groupBy('dropoff_borough').agg(count('*').alias('n_trips')).show()

In [None]:
queens_trips.select(
    'dropoff_borough',
    'trip_duration_minutes'
).toPandas().boxplot(by = 'dropoff_borough', showmeans = True)

#### It seems that EWR and Staten Island are the furthest people go form Queens
#### Also, a lot of outliers inside Queens? Are those drivers driving in circles, trying to take advantage of some poor tourists? Or maybe those are all traffic jams?

#### Anyway, that's good enough for now.
#### Never forget to close your spark sessions after you're done!

In [None]:
spark.stop()

#### That's it for this demo!