In [47]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

In [48]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-02-28 07:43:55--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 20.248.137.48
Connecting to github.com (github.com)|20.248.137.48|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230228%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230228T074355Z&X-Amz-Expires=300&X-Amz-Signature=12902d63ac9483bb2ad3b18fddec672c4bbacdf7107c3300c8572e31bc662a7d&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-02-28 07:43:55--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad

In [49]:
df = pd.read_csv('fhvhv_tripdata_2021-06.csv.gz', compression='gzip')

KeyboardInterrupt: 

In [5]:
df.to_csv('fhvhv_tripdata_2021-06.csv', index=False)

In [6]:
!wc -l fhvhv_tripdata_2021-06.csv

14961893 fhvhv_tripdata_2021-06.csv


In [50]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

In [51]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [27]:
!head -n 101 fhvhv_tripdata_2021-06.csv > head.csv

In [52]:
df_pandas = pd.read_csv('head.csv')

In [53]:
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropoff_datetime          object
PULocationID               int64
DOLocationID               int64
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [54]:
df_pandas = df_pandas.astype('str')

In [55]:
spark.createDataFrame(df_pandas)

DataFrame[dispatching_base_num: string, pickup_datetime: string, dropoff_datetime: string, PULocationID: string, DOLocationID: string, SR_Flag: string, Affiliated_base_number: string]

Integer - 4 bytes
Long - 8 bytes

In [56]:
from pyspark.sql import types

In [57]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.IntegerType(), True)
])

In [58]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [59]:
df = df.repartition(12)

In [60]:
df.write.parquet('fhvhv/2021/06/')

                                                                                

In [73]:
df = spark.read.parquet('fhvhv/2021/06/')

In [62]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: integer (nullable = true)



SELECT * FROM df WHERE hvfhs_license_num =  HV0003

In [63]:
from pyspark.sql import functions as F

In [64]:
from pyspark.sql.functions import date_format

In [65]:
# Filter the DataFrame to include only trips that started on June 15th
june_15_trips = df.filter(date_format('pickup_datetime', 'yyyy-MM-dd') == '2021-06-15')

# Count the number of trips
num_trips = june_15_trips.count()

# Print the result
print(f"There were {num_trips} taxi trips started on June 15th.")



There were 452470 taxi trips started on June 15th.


                                                                                

In [76]:
df.registerTempTable('fhvhv_june_trips')



In [90]:
df_result = spark.sql("""
SELECT 
    PULocationID, COUNT(*) as count
FROM fhvhv_june_trips
GROUP BY PULocationID
ORDER BY count DESC
"""
).show()



+------------+------+
|PULocationID| count|
+------------+------+
|          61|231279|
|          79|221244|
|         132|188867|
|          37|187929|
|          76|186780|
|         231|164344|
|         138|161596|
|         234|158937|
|         249|154698|
|           7|152493|
|         148|151020|
|          68|147673|
|          42|146402|
|         255|143683|
|         181|143594|
|         225|141427|
|          48|139611|
|         246|139431|
|          17|138428|
|         170|137879|
+------------+------+
only showing top 20 rows



                                                                                

In [72]:
df

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, Affiliated_base_number: int]

In [38]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-03|  2021-01-03|         255|          34|
| 2021-01-05|  2021-01-05|         189|         107|
| 2021-01-01|  2021-01-01|         198|          37|
| 2021-01-04|  2021-01-04|         229|         113|
| 2021-01-02|  2021-01-02|         159|          20|
| 2021-01-03|  2021-01-03|          92|          73|
| 2021-01-04|  2021-01-04|         230|         142|
| 2021-01-03|  2021-01-03|         132|          72|
| 2021-01-01|  2021-01-01|         188|          61|
| 2021-01-04|  2021-01-04|          97|         189|
| 2021-01-01|  2021-01-01|         174|         235|
| 2021-01-05|  2021-01-05|          35|          76|
| 2021-01-06|  2021-01-06|          35|          39|
| 2021-01-02|  2021-01-02|         117|         117|
| 2021-01-03|  2021-01-03|          77|         256|
| 2021-01-02|  2021-01-02|          97|       

In [33]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2021-01-03 17:17:21|2021-01-03 17:26:18|         255|          34|   null|
|           HV0003|              B02882|2021-01-05 22:14:07|2021-01-05 22:32:28|         189|         107|   null|
|           HV0003|              B02866|2021-01-01 01:45:59|2021-01-01 01:51:19|         198|          37|   null|
|           HV0003|              B02864|2021-01-04 19:05:43|2021-01-04 19:19:25|         229|         113|   null|
|           HV0003|              B02882|2021-01-02 20:28:53|2021-01-02 20:46:38|         159|          20|   null|
|           HV0003|              B02395|2021-01-03 22:26:46|2021-01-03 22:33:23|

In [39]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [40]:
crazy_stuff('B02884')

's/b44'

In [66]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [55]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003')


[Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 13), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 30, 35), PULocationID=147, DOLocationID=159),
 Row(pickup_datetime=datetime.datetime(2021, 1, 6, 11, 43, 12), dropoff_datetime=datetime.datetime(2021, 1, 6, 11, 55, 7), PULocationID=79, DOLocationID=164),
 Row(pickup_datetime=datetime.datetime(2021, 1, 4, 15, 35, 32), dropoff_datetime=datetime.datetime(2021, 1, 4, 15, 52, 2), PULocationID=174, DOLocationID=18),
 Row(pickup_datetime=datetime.datetime(2021, 1, 4, 13, 42, 15), dropoff_datetime=datetime.datetime(2021, 1, 4, 14, 4, 57), PULocationID=201, DOLocationID=180),
 Row(pickup_datetime=datetime.datetime(2021, 1, 3, 18, 42, 3), dropoff_datetime=datetime.datetime(2021, 1, 3, 19, 12, 22), PULocationID=132, DOLocationID=72)]

In [50]:
!head -n 10 head.csv











