In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FeatureEngineering").master("local[*]").config("spark.submit.pyFiles", "./holidays_package.zip").getOrCreate()

# Data_LoaderのPySpark検証

In [None]:
import os
import zipfile

import pandas as pd

# ZIPファイルを展開
zip_dir = "../data/power_usage"
extract_dir = "../data/power_extracted"
os.makedirs(extract_dir, exist_ok=True)

for zip_name in sorted(os.listdir(zip_dir)):
    if zip_name.endswith(".zip"):
        zip_path = os.path.join(zip_dir, zip_name)
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(extract_dir)

In [None]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType, LongType, StringType, StructField, StructType

In [None]:
def read_and_extract_max_power(csv_path):
    try:
        df = pd.read_csv(csv_path, encoding="shift-jis", skiprows=54)
        max_power = int(df["当日実績(５分間隔値)(万kW)"].max())
        date = os.path.basename(csv_path).split("_")[0]  # YYYYMMDDを取得
        return {"date": date, "max_power": max_power}
    except Exception as e:
        print(f"Error in {csv_path}: {e}")
        return None

# pandasで抽出し、Spark DataFrame化
records = []
for fname in os.listdir(extract_dir):
    if fname.endswith(".csv"):
        path = os.path.join(extract_dir, fname)
        record = read_and_extract_max_power(path)
        if record:
            records.append(record)

# pandas → Spark DataFrame
power_usage_df = spark.createDataFrame(records, StructType([
    StructField("date", StringType(), True),
    StructField("max_power", LongType(), True),
]))

# 日付形式に変換
power_usage_df = power_usage_df.withColumn("date", to_date(col("date"), "yyyyMMdd"))

# 表示確認
power_usage_df.show(5)

In [None]:
def load_weather_data(path) -> pd.DataFrame:
        """気象データファイルを読み込む
        pysparkだとskiprowが設定できないため一度pandas dataframeで読み込む

        Args:
            encoding: ファイルエンコーディング
            skiprows: スキップする行番号のリスト

        Returns:
            pd.DataFrame: 気象データフレーム
        """
        df = pd.read_csv(path, encoding="shift-jis", skiprows=[0, 1, 2, 4, 5])

        # 必要なカラムだけ抽出
        df = df[["年月日", "最高気温(℃)", "最低気温(℃)", "天気概況(昼：06時〜18時)"]]

        # カラム名を英語に変更
        df = df.rename(
            columns={
                "年月日": "date",
                "最高気温(℃)": "max_temp",
                "最低気温(℃)": "min_temp",
                "天気概況(昼：06時〜18時)": "weather",
            },
        )# 日付をdatetime型に変換
        df["date"] = pd.to_datetime(df["date"], format="%Y/%m/%d")

        return df

In [None]:
path = "../data/weather_data.csv"
df = load_weather_data(path)

In [None]:
weather_df = spark.createDataFrame(df)
# timestampからdateにするためSparkに渡したあとに明示的に日付だけに変換
weather_df = weather_df.withColumn("date", to_date("date"))

In [None]:
weather_df.show(5)

In [None]:
# データの結合
weather_df = weather_df.repartition(60, "date")
power_usage_df = power_usage_df.repartition(60, "date")

merge_data = weather_df.join(
    power_usage_df,
    on = ["date"],
    how="inner",
)

In [None]:
merge_data.rdd.getNumPartitions()

# 前処理、特徴量エンジニアリングの確認

### 天気のカテゴリ変数まとめ

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


@udf(returnType=StringType())
def categorize_weather(weather):
    if weather is None:
        return "不明"
    if "雪" in weather or "ゆき" in weather:
        return "雪"
    if "雷" in weather:
        if "雨" in weather or "あめ" in weather:
            return "雷雨"
        if "晴" in weather:
            return "晴れ(雷あり)"
        if "曇" in weather:
            return "曇り(雷あり)"
        return "雷"
    if "快晴" in weather:
        return "快晴"
    if "晴" in weather:
        if "曇" in weather:
            return "晴れ時々曇り"
        if "雨" in weather or "あめ" in weather or "雷" in weather:
            return "晴れ時々雨"
        return "晴れ"
    if "曇" in weather:
        if "雨" in weather or "あめ" in weather:
            return "曇り時々雨"
        return "曇り"
    if "雨" in weather or "あめ" in weather:
        return "雨"
    return "その他"

In [None]:
def categorize_weather_spark(df, weather_col="weather"):
    return df.withColumn("weather_category", categorize_weather(df[weather_col])).drop(weather_col)

In [None]:
temp = categorize_weather_spark(merge_data)

In [None]:
temp.show()

### 数値系特徴量作成

In [None]:
def create_numeric_features_spark(df):
    avg = (col("max_temp") + col("min_temp")) / 2
    rng = col("max_temp") - col("min_temp")
    cdd = (avg - 18).cast("double")
    hdd = (18 - avg).cast("double")

    return (
        df.withColumn("avg", avg)
          .withColumn("rng", rng)
          .withColumn("cdd", cdd)
          .withColumn("hdd", hdd)
          .withColumn("hot", (col("max_temp") >= 30))
          .withColumn("cold", (col("min_temp") <= 5))
    )

In [None]:
temp = create_numeric_features_spark(temp)

### カレンダー系特徴量作成

In [None]:
# spark.stop()

# spark = SparkSession.builder \
#     .appName("holiday-udf") \
#     .master("local[*]") \
#     .config("spark.submit.pyFiles", "../holidays_package.zip") \
#     .getOrCreate()

In [None]:
import holidays
import numpy as np
from pyspark.sql.functions import cos, dayofmonth, dayofweek, month, sin, year

jp_holidays = holidays.Japan()

# 祝日判定
@udf(returnType=IntegerType())
def is_holiday(date):
    return int(date in jp_holidays)

def create_calendar_features_spark(df, date_col="date"):
    return (
        df.withColumn("year", year(col(date_col)))
          .withColumn("month", month(col(date_col)))
          .withColumn("day", dayofmonth(col(date_col)))
        # 1=日曜〜7=土曜
          .withColumn("dow", dayofweek(col(date_col)))
          .withColumn("dow_sin", sin(2 * np.pi * (col("dow") - 1) / 7))
          .withColumn("dow_cos", cos(2 * np.pi * (col("dow") - 1) / 7))
          .withColumn("mon_sin", sin(2 * np.pi * col("month") / 12))
          .withColumn("mon_cos", cos(2 * np.pi * col("month") / 12))
          .withColumn("weekend", ((col("dow") == 1) | (col("dow") == 7)).cast("int"))
          .withColumn("holiday", is_holiday(col(date_col)))
    )

In [None]:
test = create_calendar_features_spark(temp)

In [None]:
test.show(5)

In [None]:
a = "2023-01-01"
yyyymm = "-".join(a.split("-")[:2]) 


In [None]:
import pandas as pd

df = pd.read_parquet("~/downloads/part-00000-44afcced-ef61-4c52-a52f-38f13c083579.c000.snappy.parquet", engine="pyarrow")

In [None]:
df.info()

In [None]:
df