In [None]:
from pyspark.sql.functions import col, expr, sum, month, count, year, dayofmonth, hour, minute, dayofweek, unix_timestamp, avg, stddev  
import pyspark.sql.functions as F

In [None]:
# ファイルパスのリスト作成
try:
    file_paths = []
    # 2019年から2021年までの各年についてループ
    for year in range(2019, 2022):
        # 各年の1月から12月までについてループ
        for month_val in range(1, 13):
            # ファイルパスをリストに追加
            file_paths.append((year, month_val, f"Files/Bronze/{year}/{month_val}/green_tripdata_{year}-{month_val:02d}.parquet"))
    print("File paths created successfully.")
except Exception as e:
    # エラーが発生した場合にエラーメッセージを表示
    print(f"Error creating file paths: {e}")
    
# ファイルパスのリストを表示（デバッグ用）
file_paths

In [None]:
### ユーザー定義関数 
def process_dataframe(df, filter_month):
    try:
        # 削除する不要な列のリストを定義
        columns_to_drop = ["VendorID", "store_and_fwd_flag", "RateCodeID", "extra", "mta_tax", "tolls_amount", 
                           "ehail_fee", "improvement_surcharge", "fare_amount", "tip_amount", "Trip_type", 
                           "congestion_surcharge"]
        
        # 不要な列を削除し、欠損値がある行を削除
        df = df.drop(*columns_to_drop).dropna()

        # 必要な列を選択し、型を変換してリネーム
        df = df.select(
            col("lpep_pickup_datetime").cast("timestamp").alias("pickup_datetime"),
            col("lpep_dropoff_datetime").cast("timestamp").alias("dropoff_datetime"),
            col("PULocationID").cast("int").alias("pickup_LocationID"),
            col("DOLocationID").cast("int").alias("dropoff_LocationID"),
            col("payment_type").cast("int").alias("payment_type"),
            col("passenger_count").cast("int").alias("passenger_count"),
            col("trip_distance").cast("double").alias("trip_distance"),
            col("total_amount").cast("double").alias("total_amount")
        )

        # 指定された月でフィルタリング（pickupとdropoffの両方が同じ月）
        df = df.filter((F.month(col("pickup_datetime")) == filter_month) & (F.month(col("dropoff_datetime")) == filter_month))

        # pickup_datetimeに基づいて新しい列を追加（年、月、日、時、分、曜日）
        df = df.withColumn("pickup_year", F.year(col("pickup_datetime"))) \
               .withColumn("pickup_month", F.month(col("pickup_datetime"))) \
               .withColumn("pickup_day", F.dayofmonth(col("pickup_datetime"))) \
               .withColumn("pickup_hour", F.hour(col("pickup_datetime"))) \
               .withColumn("pickup_minute", F.minute(col("pickup_datetime"))) \
               .withColumn("pickup_weekday", F.dayofweek(col("pickup_datetime")))

        # pickup_datetimeとdropoff_datetimeの差を計算し、trip_durationとして追加（単位：分）
        df = df.withColumn("trip_duration", 
                           (unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))) / 60)

        # 不要になったdropoff_datetime列を削除
        df = df.drop("dropoff_datetime")

        # 再度、列の型を明示的に指定して変換
        df = df.selectExpr(
            "cast(pickup_datetime as timestamp)",
            "cast(pickup_LocationID as int)",
            "cast(dropoff_LocationID as int)",
            "cast(payment_type as int)",
            "cast(passenger_count as int)",
            "cast(trip_distance as double)",
            "cast(total_amount as double)",
            "cast(pickup_year as int)",
            "cast(pickup_month as int)",
            "cast(pickup_day as int)",
            "cast(pickup_hour as int)",
            "cast(pickup_minute as int)",
            "cast(pickup_weekday as int)",
            "cast(trip_duration as double)"
        )

        # データのフィルタリング（適切な範囲の値のみを保持）
        df = df.filter(
            (col("pickup_LocationID").between(1, 265)) &  # pickup_LocationIDが1〜265の範囲内
            (col("dropoff_LocationID").between(1, 265)) &  # dropoff_LocationIDが1〜265の範囲内
            (col("payment_type").between(1, 5)) &  # payment_typeが1〜5の範囲内
            (col("passenger_count").between(1, 6)) &  # passenger_countが1〜6の範囲内
            (col("trip_distance") > 0) &  # trip_distanceが0より大きい
            (col("total_amount") > 0) &  # total_amountが0より大きい
            (col("pickup_year").between(2019, 2021)) &  # pickup_yearが2019〜2021年の範囲内
            (col("pickup_month").between(1, 12)) &  # pickup_monthが1〜12月の範囲内
            (col("pickup_day").between(1, 31)) &  # pickup_dayが1〜31日の範囲内
            (col("pickup_hour").between(0, 23)) &  # pickup_hourが0〜23時の範囲内
            (col("pickup_minute").between(0, 59)) &  # pickup_minuteが0〜59分の範囲内
            (col("trip_duration") > 0)  # trip_durationが0分より大きい
        )

        # 四分位範囲（IQR）を用いた異常値の除去（trip_distanceに対して）
        quantiles = df.approxQuantile("trip_distance", [0.25, 0.75], 0.01)
        Q1, Q3 = quantiles[0], quantiles[1]
        IQR = Q3 - Q1
        lower_bound, upper_bound = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
        df = df.filter((col("trip_distance") >= lower_bound) & (col("trip_distance") <= upper_bound))
        
        # total_amountに対する異常値のトリミング（上下0.5%の範囲外を除外）
        bounds = df.approxQuantile("total_amount", [0.005, 0.995], 0.0)
        lower_bound, upper_bound = bounds[0], bounds[1]
        df = df.filter((col("total_amount") >= lower_bound) & (col("total_amount") <= upper_bound))

        # trip_durationに対するZスコアの計算と異常値の除去（Zスコア2:標準偏差の2倍を超える値を除外）
        stats = df.select(avg(col('trip_duration')).alias('mean'), stddev(col('trip_duration')).alias('stddev')).collect()  
        mean_trip_duration = stats[0]['mean']  
        stddev_trip_duration = stats[0]['stddev']  
  
        df = df.filter(((col('trip_duration') - mean_trip_duration) / stddev_trip_duration <= 2) &  
                       ((col('trip_duration') - mean_trip_duration) / stddev_trip_duration >= -2))  
        
        return df  # 処理されたデータフレームを返す
    except Exception as e:
        # エラーが発生した場合にエラーメッセージを表示
        print(f"Error processing dataframe: {e}")
        return None

In [None]:
# 各ファイルを読み込み、処理して保存する
for year, month_val, file_path in file_paths:
    try:
        # 指定されたパスからParquetファイルを読み込む
        df = spark.read.parquet(file_path) 
        # 読み込んだデータフレームをユーザー定義関数を用いて処理する
        processed_df = process_dataframe(df, month_val)
        # 処理されたデータフレームが存在する場合
        if processed_df is not None:
            # 保存するテーブル名を作成
            table_name = f"Silver_LH_NYC.cleaned_green_{year}_{month_val:02d}"
            # 処理されたデータフレームをDelta形式で指定されたテーブルに上書き保存する
            processed_df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable(table_name)
            print(f"Processed and saved data for cleaned_green_{year}_{month_val:02d}")
        else:
            # 処理中にエラーが発生した場合は、保存をスキップしメッセージを表示する
            print(f"Skipping save for cleaned_green_{year}_{month_val:02d} due to processing error.")
    # ファイルの読み込みや処理中にエラーが発生した場合のエラーハンドリング
    except Exception as e:
        print(f"Error processing file {file_path}: {e}")

In [None]:
# データベースをSilver_LH_NYCに設定
spark.sql("USE Silver_LH_NYC")

#データベース内のすべてのテーブル名を取得
#SHOW TABLES コマンドを使用してデータベース内のテーブル情報を取得
#select("tableName") でテーブル名のみを選択し、RDDに変換後、リストとして収集
tables = spark.sql("SHOW TABLES").select("tableName").rdd.flatMap(lambda x: x).collect()

# テーブル名リストを表示
print("List of tables in Silver_LH_NYC:")
for table_name in tables:
    print(table_name)

In [None]:
# 確認の関数を定義
def validate_processed_table(table_name):
    try:
        df = spark.table(table_name)
        
        # カラムの存在確認
        expected_columns = ["pickup_datetime", "pickup_LocationID", "dropoff_LocationID", 
                            "payment_type", "passenger_count", "trip_distance", "total_amount",
                            "pickup_year", "pickup_month", "pickup_day", "pickup_hour", 
                            "pickup_minute", "pickup_weekday", "trip_duration"]
        for col_name in expected_columns:
            if col_name not in df.columns:
                print(f"Column {col_name} is missing in table {table_name}")
                return False
        
        # 期待されるカラムの型
        expected_schema = {
            "pickup_datetime": "TimestampType()",
            "pickup_LocationID": "IntegerType()",
            "dropoff_LocationID": "IntegerType()",
            "payment_type": "IntegerType()",
            "passenger_count": "IntegerType()",
            "trip_distance": "DoubleType()",
            "total_amount": "DoubleType()",
            "pickup_year": "IntegerType()",
            "pickup_month": "IntegerType()",
            "pickup_day": "IntegerType()",
            "pickup_hour": "IntegerType()",
            "pickup_minute": "IntegerType()",
            "pickup_weekday": "IntegerType()",
            "trip_duration": "DoubleType()"
        }
        
        for col_name, col_type in expected_schema.items():
            if str(df.schema[col_name].dataType) != col_type:
                print(f"Column {col_name} has incorrect type in table {table_name}. Expected {col_type}, got {str(df.schema[col_name].dataType)}")
                return False
        
        # フィルタリング条件の確認
        filter_conditions = [
            df["pickup_LocationID"].between(1, 265),
            df["dropoff_LocationID"].between(1, 265),
            df["payment_type"].between(1, 5),
            df["passenger_count"].between(1, 6),
            df["trip_distance"] > 0,
            df["total_amount"] > 0,
            df["pickup_year"].between(2019, 2021),
            df["pickup_month"].between(1, 12),
            df["pickup_day"].between(1, 31),
            df["pickup_hour"].between(0, 23),
            df["pickup_minute"].between(0, 59),
            df["trip_duration"] > 0
        ]
        for condition in filter_conditions:
            if df.filter(~condition).count() > 0:
                print(f"Filter condition failed for table {table_name}")
                return False

        print(f"Table {table_name} passed all checks.")
        return True
    except Exception as e:
        print(f"Error validating table {table_name}: {e}")
        return False

In [None]:
# テーブルの検証を実行
for table_name in tables:
    validate_processed_table(f"Silver_LH_NYC.{table_name}")