In [13]:
from pyspark.sql import SparkSession, functions as F
import math

spark = SparkSession.builder.appName("ERA5-NearestPoint").getOrCreate()

hourly = spark.read.parquet("data_processed/era5_hourly_*_jochiwon.parquet")
hourly = hourly.withColumn("time", F.to_timestamp("time"))

# ★★★ 여기서 연도 필터 추가 ★★★
hourly = hourly.filter(
    (F.col("time") >= F.lit("2020-01-01")) &
    (F.col("time") <  F.lit("2024-01-01"))
)

CENTER_LAT = 36.6110
CENTER_LON = 127.2870

# distinct grid points (이제 2020–2023 기준)
grid = hourly.select("lat","lon").distinct()

# 근사 거리로 nearest 계산
grid2 = grid.withColumn(
    "dist2",
    (F.col("lat") - F.lit(CENTER_LAT))**2 +
    (F.col("lon") - F.lit(CENTER_LON))**2
)

nearest = grid2.orderBy("dist2").limit(1)
nearest.show(truncate=False)


+----+------+--------------------+
|lat |lon   |dist2               |
+----+------+--------------------+
|36.5|127.25|0.013689999999999812|
+----+------+--------------------+



In [14]:
nearest_row = nearest.collect()[0]
NEAR_LAT = float(nearest_row["lat"])
NEAR_LON = float(nearest_row["lon"])
print("Nearest grid:", NEAR_LAT, NEAR_LON)

point = hourly.filter((F.col("lat")==F.lit(NEAR_LAT)) & (F.col("lon")==F.lit(NEAR_LON)))
point.select("time","lat","lon","t2m","tp").show(5, truncate=False)

print("hourly rows:", point.count())


Nearest grid: 36.5 127.25
+-------------------+----+------+---------+------------+
|time               |lat |lon   |t2m      |tp          |
+-------------------+----+------+---------+------------+
|2022-08-01 00:00:00|36.5|127.25|301.2179 |4.529953E-5 |
|2022-08-01 01:00:00|36.5|127.25|302.51147|3.9100647E-5|
|2022-08-01 02:00:00|36.5|127.25|303.39014|2.1457672E-5|
|2022-08-01 03:00:00|36.5|127.25|304.45898|4.2438507E-5|
|2022-08-01 04:00:00|36.5|127.25|304.64307|3.6239624E-5|
+-------------------+----+------+---------+------------+
only showing top 5 rows

hourly rows: 35064


In [16]:
point = (point
    .withColumn("t2m_C", F.col("t2m") - F.lit(273.15))
    .withColumn("tp_mm", F.col("tp") * F.lit(1000.0))
    .withColumn("date", F.to_date("time"))
)

daily = (point.groupBy("date")
    .agg(
        F.avg("t2m_C").alias("tmean_C"),
        F.max("t2m_C").alias("tmax_C"),
        F.min("t2m_C").alias("tmin_C"),
        F.stddev("t2m_C").alias("tstd_C"),
        F.sum("tp_mm").alias("prcp_mm_sum"),
        F.max("tp_mm").alias("prcp_mm_max_hour"),
        F.avg("sp").alias("sp_mean"),
        F.avg("d2m").alias("d2m_mean"),
        F.sqrt(F.avg(F.col("u10")*F.col("u10") + F.col("v10")*F.col("v10"))).alias("wind_speed_mean"),
        F.sum("ssrd").alias("ssrd_sum"),
    )
    .withColumn("dtr_C", F.col("tmax_C") - F.col("tmin_C"))
    .orderBy("date")
)

daily.show(5, truncate=False)
print("daily rows:", daily.count())


+----------+-------------------+------------------+-------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+----------+-----------------+
|date      |tmean_C            |tmax_C            |tmin_C             |tstd_C            |prcp_mm_sum         |prcp_mm_max_hour   |sp_mean           |d2m_mean          |wind_speed_mean   |ssrd_sum  |dtr_C            |
+----------+-------------------+------------------+-------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+----------+-----------------+
|2020-01-01|-0.5439615885416439|2.9770751953125227|-3.8031982421874773|1.879465653703158 |0.030517578125      |0.00762939453125   |101548.5390625    |268.36273193359375|1.3022124442621101|6768768.0 |6.7802734375     |
|2020-01-02|1.076570129394554  |5.580224609375023 |-1.8761962890624773|2.476738338058776 |0.006198883056640625|0.001907348632812

In [17]:
from pyspark.sql.window import Window

daily = daily.withColumn("site_id", F.lit("NEAREST_GRID"))
w = Window.partitionBy("site_id").orderBy("date")
w7  = w.rowsBetween(-6, 0)
w30 = w.rowsBetween(-29, 0)

feat = (daily
    # 타깃: 내일 평균기온
    .withColumn("y_tmean_next", F.lead("tmean_C", 1).over(w))

    # lag
    .withColumn("tmean_lag1", F.lag("tmean_C", 1).over(w))
    .withColumn("tmean_lag7", F.lag("tmean_C", 7).over(w))
    .withColumn("prcp_lag1",  F.lag("prcp_mm_sum", 1).over(w))
    .withColumn("prcp_lag7",  F.lag("prcp_mm_sum", 7).over(w))

    # rolling
    .withColumn("tmean_roll7",  F.avg("tmean_C").over(w7))
    .withColumn("tmean_roll30", F.avg("tmean_C").over(w30))
    .withColumn("prcp_roll7",   F.avg("prcp_mm_sum").over(w7))
    .withColumn("prcp_roll30",  F.avg("prcp_mm_sum").over(w30))
)

# 타깃이나 주요 피처 NULL 제거 (초기 30일, 마지막 1일)
feat = feat.dropna(subset=["y_tmean_next","tmean_roll30","prcp_roll30"])

feat.select("date","tmean_C","y_tmean_next","tmean_roll7","prcp_mm_sum").show(5, truncate=False)
print("usable rows:", feat.count())


+----------+-------------------+-------------------+-------------------+--------------------+
|date      |tmean_C            |y_tmean_next       |tmean_roll7        |prcp_mm_sum         |
+----------+-------------------+-------------------+-------------------+--------------------+
|2020-01-01|-0.5439615885416439|1.076570129394554  |-0.5439615885416439|0.030517578125      |
|2020-01-02|1.076570129394554  |0.1732040405273665 |0.26630427042645505|0.006198883056640625|
|2020-01-03|0.1732040405273665 |0.35636800130210605|0.23527086046009218|9.5367431640625E-4  |
|2020-01-04|0.35636800130210605|1.1391871134440332 |0.2655451456705957 |0.0                 |
|2020-01-05|1.1391871134440332 |4.610701497395856  |0.4402735392252832 |0.732421875         |
+----------+-------------------+-------------------+-------------------+--------------------+
only showing top 5 rows

usable rows: 1460


In [18]:
pdf = feat.toPandas()
pdf.head(), pdf.shape


(         date   tmean_C    tmax_C    tmin_C    tstd_C  prcp_mm_sum  \
 0  2020-01-01 -0.543962  2.977075 -3.803198  1.879466     0.030518   
 1  2020-01-02  1.076570  5.580225 -1.876196  2.476738     0.006199   
 2  2020-01-03  0.173204  6.526575 -4.113623  3.616834     0.000954   
 3  2020-01-04  0.356368  7.841089 -4.751074  4.126130     0.000000   
 4  2020-01-05  1.139187  7.012964 -3.859351  3.055253     0.732422   
 
    prcp_mm_max_hour        sp_mean    d2m_mean  wind_speed_mean  ...  \
 0          0.007629  101548.539062  268.362732         1.302212  ...   
 1          0.001907  101309.763021  270.739243         1.910210  ...   
 2          0.000954  101074.838542  269.682648         2.098175  ...   
 3          0.000000  101161.213542  269.364324         2.189079  ...   
 4          0.432014  101445.265625  267.538788         1.539129  ...   
 
         site_id  y_tmean_next tmean_lag1  tmean_lag7  prcp_lag1  prcp_lag7  \
 0  NEAREST_GRID      1.076570        NaN         NaN

In [1]:
import numpy as np
import pandas as pd

pdf = pdf.copy()
pdf["date"] = pd.to_datetime(pdf["date"])
pdf = pdf.sort_values("date").reset_index(drop=True)

pdf["doy"] = pdf["date"].dt.dayofyear.astype(int)
pdf["sin_doy"] = np.sin(2*np.pi*pdf["doy"]/365.25)
pdf["cos_doy"] = np.cos(2*np.pi*pdf["doy"]/365.25)

pdf[["date","tmean_C","y_tmean_next","sin_doy","cos_doy"]].head()


NameError: name 'pdf' is not defined