In [1]:
import os
import pyspark
from pyspark.sql import SparkSession

os.environ["SPARK_HOME"] = "/home/ximin/anaconda3/lib/python3.12/site-packages/pyspark"
os.environ["PYSPARK_PYTHON"] = "/home/ximin/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 06:45:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
spark.version

'3.5.5'

In [28]:
import pandas as pd

In [4]:
df = pd.read_parquet("data/raw/yellow/2024/10/yellow_tripdata_2024_10.parquet")

In [5]:
df = df.head()
print(df.dtypes)

VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
dtype: object


In [8]:
df['tpep_pickup_datetime'] = df['tpep_pickup_datetime'].astype('datetime64[ns]')
df['tpep_dropoff_datetime'] = df['tpep_dropoff_datetime'].astype('datetime64[ns]')
df['store_and_fwd_flag'] = df['store_and_fwd_flag'].astype(str)

In [9]:
print(df.dtypes)

VendorID                          int32
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
dtype: object


In [10]:
spark.createDataFrame(df).schema

StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [12]:
df_yellow = spark.read \
    .option("header","true")\
    .parquet('data/raw/yellow/2024/10/yellow_tripdata_2024_10.parquet')

In [13]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [17]:
df_yellow = df_yellow.repartition(4)

In [20]:
df_yellow.show(5)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-10-07 16:40:43|  2024-10-07 18:10:56|              1|         14.8|        99|                 N|         127|         225|           1|       47.5|  0.0|    0.5|       0.

In [22]:
from pyspark.sql import functions as F

In [19]:
df_yellow.write.parquet('output', mode='overwrite')

                                                                                

In [25]:
df_yellow.select('tpep_pickup_datetime')\
         .filter(F.to_date("tpep_pickup_datetime") == F.lit("2024-10-15"))\
         .groupBy(F.to_date("tpep_pickup_datetime").alias("date"))\
         .agg(F.count("*").alias("number_of_trips")).show()

+----------+---------------+
|      date|number_of_trips|
+----------+---------------+
|2024-10-15|         128893|
+----------+---------------+



In [27]:
df_yellow.withColumn(
    "duration_hours",
    (F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")) / 3600
).agg(F.max("duration_hours").alias("max_duration_hours")).show()



+------------------+
|max_duration_hours|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

In [29]:
zlu = pd.read_csv("taxi_zone_lookup/taxi_zone_lookup.csv")

In [30]:
zlu.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [37]:
zlu = spark.read \
    .option("header","true")\
    .csv("taxi_zone_lookup/taxi_zone_lookup.csv")

In [40]:
df_yellow\
        .join(zlu, df_yellow["PULocationID"]==zlu["LocationID"], how="left")\
        .groupBy(["PULocationID", "Zone"])\
        .agg(F.count("Zone").alias("number_of_pu_location_zone"))\
        .orderBy("number_of_pu_location_zone", ascending=True)\
        .show()

[Stage 43:>                                                         (0 + 4) / 4]

+------------+--------------------+--------------------------+
|PULocationID|                Zone|number_of_pu_location_zone|
+------------+--------------------+--------------------------+
|         105|Governor's Island...|                         1|
|           5|       Arden Heights|                         2|
|         199|       Rikers Island|                         2|
|           2|         Jamaica Bay|                         3|
|         111| Green-Wood Cemetery|                         3|
|         245|       West Brighton|                         4|
|         204|   Rossville/Woodrow|                         4|
|         187|       Port Richmond|                         4|
|          44|Charleston/Totten...|                         4|
|          84|Eltingville/Annad...|                         4|
|          59|        Crotona Park|                         6|
|         109|         Great Kills|                         6|
|         118|Heartland Village...|                    

                                                                                

In [34]:
print(df_yellow)  # 确保它不是 None
      # 确保它不是 None

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double]
None


In [35]:
print(zlu)  

None
