In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-03 13:17:19--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230303T131720Z&X-Amz-Expires=300&X-Amz-Signature=6fed59d5177a8602eb5fcc72bbb77eecedab4bcd1d6c185373433778f1e56774&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-03 13:17:20--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [20]:
!wc -l fhvhv_tripdata_2021-06.csv.gz

651315 fhvhv_tripdata_2021-06.csv.gz


In [None]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [5]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [24]:
!zcat fhvhv_tripdata_2021-06.csv.gz | head -n 1001 > head.csv


gzip: stdout: Broken pipe


In [25]:
df_pandas = pd.read_csv('head.csv')

In [None]:
df_pandas.dtypes

In [36]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.IntegerType(), True)
])



In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [47]:
df = df.repartition(12)

In [48]:
df.write.parquet('fhvhv/2021/06/')

                                                                                

In [6]:
df = spark.read.parquet('fhvhv/2021/06/')

In [50]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: integer (nullable = true)



In [19]:
df_sel_count = df.filter(df["pickup_datetime"] >= "2021-06-15 00:00:00.0000") \
           .filter(df["pickup_datetime"] < "2021-06-16 00:00:00.0000") \
            .count()

                                                                                

In [20]:
df_sel_count

452470

In [84]:
df_sel_duration = df.withColumn('trip_duration', (F.unix_timestamp('dropoff_datetime')-F.unix_timestamp('pickup_datetime'))/3600) \
                    .select('trip_duration')

In [90]:
 df_sel_duration.select(F.max(df_sel_duration.trip_duration)).show()

[Stage 26:>                                                         (0 + 4) / 4]

+------------------+
|max(trip_duration)|
+------------------+
|  66.8788888888889|
+------------------+





In [8]:
df_lookup = spark.read \
    .option("header", "true") \
    .csv('lookup/taxi+_zone_lookup.csv')

In [10]:
df.registerTempTable('trips_data')



In [12]:
df_lookup.registerTempTable('zone_lookup')

In [15]:
spark.sql("""
SELECT
    z.Zone,
    count(1) cnt
FROM
    trips_data t join zone_lookup z on t.PULocationID = z.LocationID
GROUP BY 
    z.Zone
ORDER BY cnt desc
limit 1  
""").show()



+-------------------+------+
|               Zone|   cnt|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+



                                                                                