In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("NYC_TLC")
         .config("spark.sql.shuffle.partitions", "200")
         .getOrCreate())

In [None]:
import os, glob

base_path = "/content/drive/MyDrive/Mining of massive dataset/data"

years = [str(y) for y in range(2009, 2025)]
year_paths = [os.path.join(base_path, y) for y in years]

# Đếm số file parquet mỗi năm
for y, p in zip(years, year_paths):
    files = glob.glob(os.path.join(p, "*.parquet"))
    if files:
        print(y, "->", len(files), "files | example:", os.path.basename(files[0]))

2009 -> 12 files | example: yellow_tripdata_2009-01.parquet
2010 -> 12 files | example: yellow_tripdata_2010-01.parquet
2011 -> 13 files | example: yellow_tripdata_2011-01 (1).parquet
2012 -> 12 files | example: yellow_tripdata_2012-01.parquet
2013 -> 12 files | example: yellow_tripdata_2013-01.parquet
2014 -> 12 files | example: yellow_tripdata_2014-01.parquet
2015 -> 12 files | example: yellow_tripdata_2015-01.parquet
2016 -> 12 files | example: yellow_tripdata_2016-01.parquet
2017 -> 12 files | example: yellow_tripdata_2017-01.parquet
2018 -> 12 files | example: yellow_tripdata_2018-01.parquet
2019 -> 12 files | example: yellow_tripdata_2019-01.parquet
2020 -> 12 files | example: yellow_tripdata_2020-01.parquet
2021 -> 12 files | example: yellow_tripdata_2021-01.parquet
2022 -> 12 files | example: yellow_tripdata_2022-01.parquet
2023 -> 12 files | example: yellow_tripdata_2023-01.parquet
2024 -> 6 files | example: yellow_tripdata_2024-01.parquet


In [None]:
test_year = "2024"
test_path = os.path.join(base_path, test_year)

df_2024 = spark.read.parquet(test_path)
df_2024.printSchema()
df_2024.show(5, truncate=False)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------

In [None]:
y1, y2 = "2010", "2024"

df1 = spark.read.parquet(os.path.join(base_path, y1))
df2 = spark.read.parquet(os.path.join(base_path, y2))

schema1 = {f.name: str(f.dataType) for f in df1.schema.fields}
schema2 = {f.name: str(f.dataType) for f in df2.schema.fields}

common_cols = set(schema1.keys()) & set(schema2.keys())

# tìm cột cùng tên nhưng khác kiểu
diff_type = [(c, schema1[c], schema2[c]) for c in common_cols if schema1[c] != schema2[c]]

print("Common columns:", len(common_cols))
print("Columns with different types:", len(diff_type))
for item in diff_type[:30]:
    print(item)

Common columns: 9
Columns with different types: 1
('payment_type', 'StringType()', 'LongType()')


## Phân tích Demand cơ bản

In [None]:
pickup_candidates = ["tpep_pickup_datetime", "lpep_pickup_datetime", "pickup_datetime"]

pickup_col = next((c for c in pickup_candidates if c in df_all.columns), None)
print("pickup_col:", pickup_col)

df_feat = df_all
if pickup_col:
    df_feat = (df_feat
               .withColumn("pickup_ts", F.to_timestamp(F.col(pickup_col)))
               .withColumn("pickup_hour", F.hour("pickup_ts"))
               .withColumn("pickup_date", F.to_date("pickup_ts")))

pickup_col: tpep_pickup_datetime


##### demand theo giờ

In [None]:
hourly = (df_feat.groupBy("pickup_hour")
          .count()
          .orderBy("pickup_hour"))

hourly.show(24)

+-----------+-------+
|pickup_hour|  count|
+-----------+-------+
|          0|2778992|
|          1|1839474|
|          2|1216665|
|          3| 805214|
|          4| 558697|
|          5| 607038|
|          6|1433794|
|          7|2750724|
|          8|3745490|
|          9|4193930|
|         10|4568650|
|         11|4959515|
|         12|5383531|
|         13|5537720|
|         14|5930167|
|         15|6084058|
|         16|6086972|
|         17|6632542|
|         18|6959590|
|         19|6229726|
|         20|5496722|
|         21|5443009|
|         22|5043751|
|         23|4012446|
+-----------+-------+



##### demand theo zone

In [None]:
if "PULocationID" in df_feat.columns:
    zone_hour = (df_feat.groupBy("PULocationID", "pickup_hour")
                 .count()
                 .orderBy(F.desc("count")))
    zone_hour.show(20)
else:
    print("Dataset không có PULocationID")

+------------+-----------+------+
|PULocationID|pickup_hour| count|
+------------+-----------+------+
|         161|         18|368758|
|         161|         17|366561|
|         237|         14|352554|
|         237|         15|351050|
|         237|         18|348061|
|         237|         17|346660|
|         236|         15|343674|
|         132|         16|337735|
|         237|         16|333872|
|         161|         16|330065|
|         132|         15|328171|
|         132|         22|326146|
|         161|         19|325499|
|         132|         17|316984|
|         236|         17|315709|
|         237|         13|315507|
|         132|         21|312796|
|         237|         12|311950|
|         132|         19|310793|
|         236|         18|309139|
+------------+-----------+------+
only showing top 20 rows


### Kiểm tra tất cả các cột từ năm 2011 - 2014 và 2015 đến 2024

In [None]:
def list_year_files(year):
    p = os.path.join(base_path, str(year))
    return sorted(glob.glob(os.path.join(p, "*.parquet")))

def pick_one_file(year):
    files = list_year_files(year)
    return files[0] if files else None

years_11_14 = list(range(2011, 2015))
years_15_24 = list(range(2015, 2025))

print("2011-2014 samples:")
for y in years_11_14:
    print(y, pick_one_file(y))

print("\n2015-2024 samples:")
for y in years_15_24[:5]:
    print(y, pick_one_file(y))

2011-2014 samples:
2011 /content/drive/MyDrive/Mining of massive dataset/data/2011/yellow_tripdata_2011-01 (1).parquet
2012 /content/drive/MyDrive/Mining of massive dataset/data/2012/yellow_tripdata_2012-01.parquet
2013 /content/drive/MyDrive/Mining of massive dataset/data/2013/yellow_tripdata_2013-01.parquet
2014 /content/drive/MyDrive/Mining of massive dataset/data/2014/yellow_tripdata_2014-01.parquet

2015-2024 samples:
2015 /content/drive/MyDrive/Mining of massive dataset/data/2015/yellow_tripdata_2015-01.parquet
2016 /content/drive/MyDrive/Mining of massive dataset/data/2016/yellow_tripdata_2016-01.parquet
2017 /content/drive/MyDrive/Mining of massive dataset/data/2017/yellow_tripdata_2017-01.parquet
2018 /content/drive/MyDrive/Mining of massive dataset/data/2018/yellow_tripdata_2018-01.parquet
2019 /content/drive/MyDrive/Mining of massive dataset/data/2019/yellow_tripdata_2019-01.parquet


In [None]:
from pyspark.sql import DataFrame

def schema_dict_from_year(year):
    f = pick_one_file(year)
    if f is None:
        return None
    sdf = spark.read.parquet(f)
    return {field.name: str(field.dataType) for field in sdf.schema.fields}

In [None]:
import pandas as pd

def build_presence_table(years):
    schemas = {}
    for y in years:
        s = schema_dict_from_year(y)
        if s is not None:
            schemas[y] = s

    all_cols = sorted(set().union(*[set(s.keys()) for s in schemas.values()]))
    table = pd.DataFrame(index=all_cols)

    for y, s in schemas.items():
        table[str(y)] = [("✓" if c in s else "") for c in all_cols]

    return table, schemas

presence_11_14, schemas_11_14 = build_presence_table(years_11_14)
print("Num years loaded:", len(schemas_11_14))
print("Total unique columns:", presence_11_14.shape[0])
presence_11_14.head(40)

Num years loaded: 4
Total unique columns: 19


Unnamed: 0,2011,2012,2013,2014
DOLocationID,✓,✓,✓,✓
PULocationID,✓,✓,✓,✓
RatecodeID,✓,✓,✓,✓
VendorID,✓,✓,✓,✓
airport_fee,✓,✓,✓,✓
congestion_surcharge,✓,✓,✓,✓
extra,✓,✓,✓,✓
fare_amount,✓,✓,✓,✓
improvement_surcharge,✓,✓,✓,✓
mta_tax,✓,✓,✓,✓


In [None]:
presence_15_24, schemas_15_24 = build_presence_table(years_15_24)
print("Num years loaded:", len(schemas_15_24))
print("Total unique columns:", presence_15_24.shape[0])
presence_15_24.head(40)

Num years loaded: 10
Total unique columns: 20


Unnamed: 0,2015,2016,2017,2018,2019,2020,2021,2022,2023,2024
Airport_fee,,,,,,,,,,✓
DOLocationID,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
PULocationID,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
RatecodeID,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
VendorID,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
airport_fee,✓,✓,✓,✓,✓,✓,✓,✓,✓,
congestion_surcharge,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
extra,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
fare_amount,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓
improvement_surcharge,✓,✓,✓,✓,✓,✓,✓,✓,✓,✓


In [None]:
cols_11_14 = set(presence_11_14.index)
cols_15_24 = set(presence_15_24.index)

only_11_14 = sorted(cols_11_14 - cols_15_24)
only_15_24 = sorted(cols_15_24 - cols_11_14)
common_cols = sorted(cols_11_14 & cols_15_24)

print("Only in 2011-2014:", len(only_11_14))
print(only_11_14[:60])

print("\nOnly in 2015-2024:", len(only_15_24))
print(only_15_24[:60])

print("\nCommon columns:", len(common_cols))
print(common_cols[:30])

Only in 2011-2014: 0
[]

Only in 2015-2024: 1
['Airport_fee']

Common columns: 19
['DOLocationID', 'PULocationID', 'RatecodeID', 'VendorID', 'airport_fee', 'congestion_surcharge', 'extra', 'fare_amount', 'improvement_surcharge', 'mta_tax', 'passenger_count', 'payment_type', 'store_and_fwd_flag', 'tip_amount', 'tolls_amount', 'total_amount', 'tpep_dropoff_datetime', 'tpep_pickup_datetime', 'trip_distance']


In [None]:
from collections import defaultdict

# schemas: {year: {col: dtype}}
def find_type_conflicts(schemas):
    col2types = defaultdict(set)
    col2year_types = defaultdict(list)

    for y, s in schemas.items():
        for c, t in s.items():
            col2types[c].add(t)
            col2year_types[c].append((y, t))

    conflicts = {c: sorted(list(types)) for c, types in col2types.items() if len(types) > 1}
    return conflicts, col2year_types

conf_11_14, detail_11_14 = find_type_conflicts(schemas_11_14)
conf_15_24, detail_15_24 = find_type_conflicts(schemas_15_24)

print("Type conflicts 2011-2014:", len(conf_11_14))
for c in list(conf_11_14.keys())[:30]:
    print(c, "->", conf_11_14[c])

print("\nType conflicts 2015-2024:", len(conf_15_24))
for c in list(conf_15_24.keys())[:30]:
    print(c, "->", conf_15_24[c])

Type conflicts 2011-2014: 2
congestion_surcharge -> ['DoubleType()', 'IntegerType()']
airport_fee -> ['DoubleType()', 'IntegerType()']

Type conflicts 2015-2024: 7
VendorID -> ['IntegerType()', 'LongType()']
passenger_count -> ['DoubleType()', 'LongType()']
RatecodeID -> ['DoubleType()', 'LongType()']
PULocationID -> ['IntegerType()', 'LongType()']
DOLocationID -> ['IntegerType()', 'LongType()']
congestion_surcharge -> ['DoubleType()', 'IntegerType()']
airport_fee -> ['DoubleType()', 'IntegerType()']


## Tiền xử lý dữ liệu cơ bản

##### test 2024

In [None]:
YEAR = "2024"
year_path = os.path.join(base_path, YEAR)

df_raw = spark.read.parquet(year_path)
print("Columns:", len(df_raw.columns))
df_raw.printSchema()
df_raw.show(5, truncate=False)

Columns: 19
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+

In [None]:
base_cols = [
    "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "PULocationID", "DOLocationID",
    "passenger_count", "trip_distance",
    "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount",
    "improvement_surcharge", "congestion_surcharge", "Airport_fee",
    "total_amount",
    "VendorID", "RatecodeID", "payment_type", "store_and_fwd_flag"
]

keep = [c for c in base_cols if c in df_raw.columns]
df = df_raw.select(*keep).withColumn("year", F.lit(int(YEAR)))

df.printSchema()

root
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- year: integer (nullable = false)



In [None]:
# Datetime -> timestamp
for c in ["tpep_pickup_datetime", "tpep_dropoff_datetime"]:
    if c in df.columns:
        df = df.withColumn(c, F.to_timestamp(F.col(c)))

In [None]:
df = (df
      .withColumn("pickup_ts", F.col("tpep_pickup_datetime"))
      .withColumn("dropoff_ts", F.col("tpep_dropoff_datetime"))
      .withColumn("pickup_date", F.to_date("pickup_ts"))
      .withColumn("pickup_hour", F.hour("pickup_ts"))
      .withColumn("pickup_dow", F.dayofweek("pickup_ts"))
)

df = df.withColumn(
    "trip_duration_min",
    (F.col("dropoff_ts").cast("long") - F.col("pickup_ts").cast("long")) / 60.0
)

In [None]:
df.select("pickup_ts", "dropoff_ts","pickup_date","pickup_dow","pickup_hour","trip_duration_min","trip_distance","total_amount").show(5, truncate=False)

+-------------------+-------------------+-----------+----------+-----------+------------------+-------------+------------+
|pickup_ts          |dropoff_ts         |pickup_date|pickup_dow|pickup_hour|trip_duration_min |trip_distance|total_amount|
+-------------------+-------------------+-----------+----------+-----------+------------------+-------------+------------+
|2024-05-01 00:59:15|2024-05-01 01:23:50|2024-05-01 |4         |0          |24.583333333333332|6.1          |42.45       |
|2024-04-30 23:58:26|2024-05-01 00:29:42|2024-04-30 |3         |23         |31.266666666666666|11.23        |66.87       |
|2024-05-01 00:57:17|2024-05-01 01:14:15|2024-05-01 |4         |0          |16.966666666666665|9.02         |65.16       |
|2024-05-01 00:24:47|2024-05-01 00:48:51|2024-05-01 |4         |0          |24.066666666666666|6.53         |42.36       |
|2024-05-01 00:11:20|2024-05-01 00:52:10|2024-05-01 |4         |0          |40.833333333333336|14.38        |66.8        |
+---------------

In [None]:
df_clean = df

# thời gian hợp lệ
df_clean = df_clean.filter(F.col("pickup_ts").isNotNull() & F.col("dropoff_ts").isNotNull())
df_clean = df_clean.filter(F.col("dropoff_ts") > F.col("pickup_ts"))

In [None]:
if "trip_distance" in df_clean.columns:
    df_clean = df_clean.filter(F.col("trip_distance") > 0)

In [None]:
if "passenger_count" in df_clean.columns:
    df_clean = df_clean.filter(F.col("passenger_count") >= 0)

In [None]:
if "total_amount" in df_clean.columns:
    df_clean = df_clean.filter(F.col("total_amount") >= 0)

In [None]:
df_clean = df_clean.filter((F.col("trip_duration_min") > 0) & (F.col("trip_duration_min") <= 360))

In [None]:
df_clean.select("pickup_ts","dropoff_ts","trip_duration_min","trip_distance","passenger_count","total_amount").show(5)

+-------------------+-------------------+------------------+-------------+---------------+------------+
|          pickup_ts|         dropoff_ts| trip_duration_min|trip_distance|passenger_count|total_amount|
+-------------------+-------------------+------------------+-------------+---------------+------------+
|2024-05-01 00:59:15|2024-05-01 01:23:50|24.583333333333332|          6.1|              1|       42.45|
|2024-04-30 23:58:26|2024-05-01 00:29:42|31.266666666666666|        11.23|              1|       66.87|
|2024-05-01 00:57:17|2024-05-01 01:14:15|16.966666666666665|         9.02|              1|       65.16|
|2024-05-01 00:24:47|2024-05-01 00:48:51|24.066666666666666|         6.53|              1|       42.36|
|2024-05-01 00:11:20|2024-05-01 00:52:10|40.833333333333336|        14.38|              1|        66.8|
+-------------------+-------------------+------------------+-------------+---------------+------------+
only showing top 5 rows
