In [68]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import max
from pyspark.sql.functions import count
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('week5') \
    .getOrCreate()

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-02-23 12:53:43--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230223T072344Z&X-Amz-Expires=300&X-Amz-Signature=4e7eb04168b3089b68e16e5a7ba6f5904ac867eb480362bab594b10c9038d2bc&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-02-23 12:53:44--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564

In [5]:
!gunzip fhvhv_tripdata_2021-06.csv.gz

fhvhv_tripdata_2021-06.csv already exists -- do you wish to overwrite (y or n)? ^C


In [3]:
!wc -l fhvhv_tripdata_2021-06.csv

 14961893 fhvhv_tripdata_2021-06.csv


In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

In [5]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [9]:
df = df.repartition(12)

In [10]:
df.write.parquet('fhvhv/2021/06/')

In [11]:
df = spark.read.parquet('fhvhv/2021/06/')

In [13]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [14]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-02 17:19:43|2021-06-02 17:47:47|          80|          65|      N|                B02764|
|              B02867|2021-06-01 20:34:48|2021-06-01 20:39:10|         171|         171|      N|                B02867|
|              B02877|2021-06-02 18:40:14|2021-06-02 18:57:25|         250|         259|      N|                B02877|
|              B02887|2021-06-02 10:02:03|2021-06-02 10:22:13|         223|         138|      N|                B02887|
|              B02510|2021-06-01 13:42:47|2021-06-01 13:55:46|          82|          95|      N|                  null|
|              B02884|2021-06-03 09:32:1

In [74]:
df.select(df.pickup_datetime) \
  .withColumn('pickup_datetime', F.to_date(df.pickup_datetime)) \
  .filter((df.pickup_datetime >= '2021-06-15') & (df.pickup_datetime < '2021-06-16')) \
  .count()

452470

In [69]:
# to calculate the longest trip, we can create a new column 
timeDiff = (unix_timestamp('dropoff_datetime', "yyyy-MM-dd HH:mm:ss") - unix_timestamp('pickup_datetime', "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("Duration", timeDiff)

In [70]:
df.select(max(df.Duration)).show()

+-------------+
|max(Duration)|
+-------------+
|       240764|
+-------------+



In [71]:
print(240764 / 3600.)

66.8788888888889


In [75]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-02-23 14:08:59--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230223T083900Z&X-Amz-Expires=300&X-Amz-Signature=025483f4da7e5c6db8791aa35583db035bd2e651e1624160c17a1bfbb5a07681&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-02-23 14:09:00--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [78]:
dfz = spark.read \
    .option("header", "true") \
    .csv("taxi_zone_lookup.csv")

In [82]:
df.groupBy('PULocationID').count().orderBy('count', ascending=False).show()

+------------+------+
|PULocationID| count|
+------------+------+
|          61|231279|
|          79|221244|
|         132|188867|
|          37|187929|
|          76|186780|
|         231|164344|
|         138|161596|
|         234|158937|
|         249|154698|
|           7|152493|
|         148|151020|
|          68|147673|
|          42|146402|
|         255|143683|
|         181|143594|
|         225|141427|
|          48|139611|
|         246|139431|
|          17|138428|
|         170|137879|
+------------+------+
only showing top 20 rows



In [83]:
dfz.select(dfz.Zone).where(dfz.LocationID == '61').show()

+-------------------+
|               Zone|
+-------------------+
|Crown Heights North|
+-------------------+

