# ML

### 필요 라이브러리 import 및 데이터 로드

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sin, cos, pi, lag, avg,year,max as spark_max, min as spark_min, expr, when, concat
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from xgboost.spark import SparkXGBRegressor
from pyspark.ml import Pipeline
from lightgbm import LGBMRegressor
import mlflow
from databricks import automl
import databricks.automl as db_automl
import lightgbm
from lightgbm import LGBMRegressor

In [0]:
df=spark.read.option("header", "true").csv("/Volumes/project/default/project/UHI_data_test.csv")

In [0]:
display(df.limit(5))

In [0]:
df_processed = df.withColumn("green_rate", col("green_rate").cast("double")) \
                 .withColumn("Building_Density", col("Building_Density").cast("double")) \
                 .withColumn("car_registration_count", col("car_registration_count").cast("long")) \
                 .withColumn("population_density", col("population_density").cast("double")) \
                 .withColumn("avg_km_per_road_km", col("avg_km_per_road_km").cast("double")) \
                 .withColumn("UHII", col("UHII").cast("double")) \
                 .withColumn("timestamp", col("timestamp").cast("timestamp")) \
                 .withColumn("suburban_temp_current", col("suburban_temp_current").cast("double"))

### feature column 선택 및 vectorassembler 적용

In [0]:
# 레이블 컬럼 정의
LABEL_COL = 'UHII'

# 범주형 피처 정의
categorical_cols = ['District'] 

# 수치형 피처 정의
numerical_cols = [
    'green_rate',
    'Building_Density',
    'car_registration_count',
    'population_density',
    'avg_km_per_road_km',
    'suburban_temp_current'
]

In [0]:
# 파이프라인 스테이지 정의
stages = []

# StringIndexer: 문자열 카테고리를 숫자 인덱스로 변환
indexed_district_col = 'District_indexed'
string_indexer = StringIndexer() \
    .setInputCol('District') \
    .setOutputCol(indexed_district_col) \
    .setHandleInvalid("skip") # 'skip'은 학습 데이터에 없는 카테고리를 무시합니다.
stages.append(string_indexer)

# VectorAssembler: 모든 피처를 하나의 벡터로 묶기
final_feature_cols = numerical_cols

assembler = VectorAssembler() \
    .setInputCols(final_feature_cols) \
    .setOutputCol("features")
stages.append(assembler)

# MLlib Pipeline 생성 및 적용
pipeline = Pipeline(stages=stages)
# 이 시점에는 df_processed가 이미 준비되어 있어야 합니다.
# (df_processed = 이전에 CSV를 읽고 피처 엔지니어링이 완료된 DataFrame)
pipeline_model = pipeline.fit(df_processed)
assembled_df = pipeline_model.transform(df_processed)

print("--- 파이프라인 적용 후 DataFrame 스키마 ---")
assembled_df.printSchema()
print("\n--- 파이프라인 적용 후 DataFrame (일부) ---")
# 'features' 컬럼에 모든 전처리된 피처들이 벡터 형태로 들어있을 겁니다.
assembled_df.select('District', 'timestamp', 'features', LABEL_COL).show(5, truncate=False)

In [0]:
# 1. df_processed의 전체 스키마 확인 (컬럼 이름과 데이터 타입 집중)
print("--- df_processed 스키마 ---")
df_processed.printSchema()

# 2. df_processed의 상위 5개 행 데이터 확인 (값들이 제대로 들어있는지)
print("\n--- df_processed 상위 5개 행 ---")
df_processed.show(5, truncate=False)

# 3. LABEL_COL과 timestamp 컬럼을 제외한 모든 피처 컬럼 이름들을 확인
# 이 리스트가 비어있다면, AutoML이 학습할 다른 피처가 없다는 의미입니다.
all_cols_except_time_and_target = [
    col for col in df_processed.columns
    if col not in [LABEL_COL, "timestamp"]
]
print(f"\n--- timestamp와 {LABEL_COL}을 제외한 피처 컬럼들: ---")
print(all_cols_except_time_and_target)

# 만약 여기에 예상했던 피처 컬럼들이 없다면, df_processed를 만드는 과정에서 누락된 것입니다.
# 만약 있다면, 해당 컬럼들의 데이터 타입을 개별적으로 점검해야 합니다.

# 4. 각 피처 컬럼의 고유값, 결측치, 통계량 확인 (의심되는 컬럼에 대해)
# 예시: '활성녹지율' 컬럼에 대해
# df_processed.select('활성녹지율').summary().show()
# df_processed.filter(col('활성녹지율').isNull()).count()
# df_processed.select('활성녹지율').distinct().count() # 고유값 개수

# 5. 혹시 명시적으로 feature_columns를 넣는 파라미터가 다시 생겼는지 확인 (AutoML 버전 업데이트 가능성)
# help(automl.forecast)를 다시 한번 실행하여 최신 파라미터 목록을 확인하세요.
# 만약 feature_columns 파라미터가 있다면, 명시적으로 넣어주세요.
# (하지만 제공해주신 help 출력에는 없었습니다.)

In [0]:
# AutoML 실행
print("--- Databricks AutoML (회귀) 시작 ---")
summary = db_automl.forecast(
    dataset=df_processed,       # 전처리 및 피처 벡터화가 완료된 전체 데이터셋
    target_col=LABEL_COL,       # 'UHII'
    time_col="timestamp",
    identity_col="District",
    timeout_minutes=30,         # 1시간 동안 최적의 모델 탐색 (조절 가능)
    primary_metric="rmse",      # 주된 최적화 지표를 RMSE로 설정
)

print("\n--- Databricks AutoML 완료 ---")

In [0]:
help(db_automl.forecast)

In [0]:

# AutoML 결과 확인
# summary 객체는 최적의 모델, 실험 결과 등에 대한 정보를 포함합니다.
# 최적의 모델을 재현하는 노트북 링크를 얻을 수 있습니다.
#print(f"최적의 모델 링크: {summary.best_model_notebook_url}")
print(f"모든 실험 결과 링크: {summary.experiment.url}")

In [0]:
# --- 여기에 최적의 모델 객체를 받아오는 코드 추가 ---

import mlflow
from mlflow.tracking import MlflowClient

# 1. MlflowClient 인스턴스 생성
client = MlflowClient()

# 2. AutoML 실행이 포함된 MLflow Experiment ID 가져오기
experiment_id = summary.experiment.experiment_id

# 3. Experiment 내의 모든 런(runs)을 검색하여 최적의 런 찾기
# 'metrics.rmse' (주된 최적화 지표)를 기준으로 오름차순(ASC) 정렬하여 가장 좋은 런을 가져옵니다.
runs = client.search_runs(
    experiment_ids=[experiment_id],
    # 수정 후 (RMSE는 낮을수록 좋으므로 ASC)
    order_by=["metrics.var_root_mean_squared_error ASC"],
    max_results=1
)

best_model = None # 최적의 모델 객체를 저장할 변수 초기화

if runs:
    best_run = runs[0]
    print(f"\n--- 최적의 MLflow 런 정보 ---")
    print(f"최적의 MLflow 런 ID: {best_run.info.run_id}")
    print(f"최적의 런 지표 (RMSE): {best_run.data.metrics.get('rmse', 'N/A')}")

    # 4. 최적의 런에 로깅된 모델 로드
    # Databricks AutoML은 일반적으로 모델을 "model"이라는 이름으로 로깅합니다.
    logged_model_path = f"runs:/{best_run.info.run_id}/model"
    
    try:
        # MLflow API를 사용하여 모델 로드 (PySpark MLlib 모델의 경우)
        # AutoML이 로깅하는 모델은 주로 PySpark MLlib PipelineModel 형태입니다.
        best_model = mlflow.spark.load_model(logged_model_path)
        print("\n--- 최적의 PySpark MLlib 모델 객체가 성공적으로 로드되었습니다. ---")
        print(f"로드된 모델 타입: {type(best_model)}")
        
        # 이제 best_model을 사용하여 새로운 데이터에 대해 예측을 수행할 수 있습니다.
        # 예: predictions = best_model.transform(new_data_df)
        
    except Exception as e:
        print(f"\n--- 모델 로드 중 오류 발생 ---")
        print(f"오류 메시지: {e}")
        print(f"MLflow 런에 'model'이라는 이름으로 Spark MLlib 모델이 로깅되었는지 확인하세요.")
        print(f"로그된 아티팩트 경로: {best_run.info.artifact_uri}")

else:
    print(f"\nExperiment ID {experiment_id}에서 최적의 런을 찾을 수 없습니다.")

In [0]:
# --- . 학습/검증/테스트 데이터셋 분할 (시간 순서 고려) ---
print("\n--- 4. 학습/검증/테스트 데이터셋 분할 (시간 순서 고려) ---")
min_date, max_date = assembled_df.agg(
    spark_min(col("timestamp")), spark_max(col("timestamp"))
).head()
print(f"전체 데이터 기간: {min_date} ~ {max_date}")

# 시계열 데이터이므로 timestamp 기준으로 정렬 후 분할합니다.
sorted_df = assembled_df.orderBy("timestamp")
train_df, val_df, test_df = sorted_df.randomSplit([0.8, 0.1, 0.1], seed=42)

print(f"학습 데이터셋 크기: {train_df.count()} 행")
print(f"검증 데이터셋 크기: {val_df.count()} 행")
print(f"테스트 데이터셋 크기: {test_df.count()} 행")

if train_df.count() > 0:
    train_min_date, train_max_date = train_df.agg(spark_min(col("timestamp")), spark_max(col("timestamp"))).head()
    print(f"  학습 데이터 기간: {train_min_date} ~ {train_max_date}")
if val_df.count() > 0:
    val_min_date, val_max_date = val_df.agg(spark_min(col("timestamp")), spark_max(col("timestamp"))).head()
    print(f"  검증 데이터 기간: {val_min_date} ~ {val_max_date}")
if test_df.count() > 0:
    test_min_date, test_max_date = test_df.agg(spark_min(col("timestamp")), spark_max(col("timestamp"))).head()
    print(f"  테스트 데이터 기간: {test_min_date} ~ {test_max_date}")

In [0]:
# XGBoost Regressor 모델 정의
xgb_regressor = SparkXGBRegressor(
    features_col="features", # 이미 assembled_df에 features 컬럼이 있으므로 이 이름을 사용
    label_col="UHII",
    n_estimators=100,
    learning_rate=0.1,
    max_depth=5,
    seed=42,
)

# 모델 학습 시에는 StringIndexer, OneHotEncoder, VectorAssembler는 이미 적용되었으므로
# 모델 자체만 파이프라인에 넣거나, 바로 모델을 fit합니다.
# 여기서는 Pipeline 대신 바로 모델을 fit하는 방식을 선택합니다.

print("\n--- XGBoost 모델 학습 시작 (assembled_df로부터) ---")
# assembled_df를 train_df, val_df, test_df로 분할했으므로,
# 이제 train_df는 이미 features 컬럼을 가지고 있습니다.
# 따라서, xgb_regressor 모델 인스턴스만 사용하여 학습을 수행합니다.
model_xgb_fitted = xgb_regressor.fit(train_df) # train_df는 이미 features 컬럼을 가짐
print("--- XGBoost 모델 학습 완료 ---")

In [0]:
# AutoML이 찾아준 최적의 LightGBM 하이퍼파라미터
space = {
  "colsample_bytree": 0.7562949914479031,
  "learning_rate": 0.340128033935979,
  "max_depth": 7,
  "min_child_weight": 15,
  "n_estimators": 377,
  "n_jobs": 100,
  "subsample": 0.36705694207590633,
  "verbosity": 0,
  "random_state": 690069765,
}

# LightGBMRegressor 모델 정의
# 파라미터 이름은 "Best Trial Notebook"에서 정확히 확인하는 것이 좋습니다.
# 여기서는 synapse.ml.lightgbm의 일반적인 파라미터 이름을 사용합니다.
lgbm_regressor = LGBMRegressor(
    featuresCol="features",
    labelCol="UHII",
    numIterations=space["n_estimators"], # n_estimators -> numIterations
    learningRate=space["learning_rate"],
    maxDepth=space["max_depth"],
    numLeaves=space["num_leaves"],
    minChildSamples=space["min_child_samples"],
    subsample=space["subsample"],
    colsampleByTree=space["colsample_bytree"], # colsampleByTree
    regAlpha=space["lambda_l1"], # lambda_l1 -> regAlpha
    regLambda=space["lambda_l2"], # lambda_l2 -> regLambda
    seed=space["random_state"],
    # objective="regression" # 명시적으로 회귀 문제임을 지정할 수 있습니다.
)

# LightGBM Estimator를 단일 스테이지 Pipeline에 포함
# train_df는 이미 features와 UHII 컬럼을 가지고 있으므로, 추가적인 전처리 스테이지는 필요 없습니다.
pipeline_lgbm = Pipeline(stages=[lgbm_regressor])

print("\n--- LightGBM 모델 학습 시작 (Pipeline 사용) ---")
# 파이프라인을 train_df에 fit합니다.
# 파이프라인이 내부적으로 lgbm_regressor.fit(train_df)를 올바른 MLlib 방식으로 호출합니다.
model_lgbm_fitted = pipeline_lgbm.fit(train_df)
print("--- LightGBM 모델 학습 완료 ---")

# --- 모델 평가 (이전 코드와 동일하게 유지) ---
# 검증 데이터셋으로 예측 수행
predictions_val_lgbm = model_lgbm_fitted.transform(val_df)

# 테스트 데이터셋으로 예측 수행
predictions_test_lgbm = model_lgbm_fitted.transform(test_df)

# RegressionEvaluator를 사용하여 모델 평가
evaluator = RegressionEvaluator(
    labelCol="UHII",
    predictionCol="prediction",
    metricName="rmse"
)

rmse_val_lgbm = evaluator.evaluate(predictions_val_lgbm)
print(f"\n검증 데이터셋 RMSE (LightGBM): {rmse_val_lgbm:.4f}")

rmse_test_lgbm = evaluator.evaluate(predictions_test_lgbm)
print(f"테스트 데이터셋 RMSE (LightGBM): {rmse_test_lgbm:.4f}")

evaluator.setMetricName("r2")
r2_val_lgbm = evaluator.evaluate(predictions_val_lgbm)
print(f"검증 데이터셋 R2 (LightGBM): {r2_val_lgbm:.4f}")

r2_test_lgbm = evaluator.evaluate(predictions_test_lgbm)
print(f"테스트 데이터셋 R2 (LightGBM): {r2_test_lgbm:.4f}")

evaluator.setMetricName("mae")
mae_val_lgbm = evaluator.evaluate(predictions_val_lgbm)
print(f"검증 데이터셋 MAE (LightGBM): {mae_val_lgbm:.4f}")

mae_test_lgbm = evaluator.evaluate(predictions_test_lgbm)
print(f"테스트 데이터셋 MAE (LightGBM): {mae_test_lgbm:.4f}")

print("\n--- LightGBM 모델 평가 완료 ---")
predictions_test_lgbm.select("UHII", "prediction").show(5)

In [0]:
# 검증 데이터셋으로 예측 수행
# val_df 또한 이미 features 컬럼을 가지고 있습니다.
predictions_val_xgb = model_lgbm_fitted.transform(val_df)

# 테스트 데이터셋으로 예측 수행
# test_df 또한 이미 features 컬럼을 가지고 있습니다.
predictions_test_xgb = model_lgbm_fitted.transform(test_df)

# RegressionEvaluator를 사용하여 모델 평가 (이 부분은 동일)
evaluator = RegressionEvaluator(
    labelCol="UHII",
    predictionCol="prediction",
    metricName="rmse"
)

rmse_val_xgb = evaluator.evaluate(predictions_val_xgb)
print(f"\n검증 데이터셋 RMSE (XGBoost): {rmse_val_xgb:.4f}")

rmse_test_xgb = evaluator.evaluate(predictions_test_xgb)
print(f"테스트 데이터셋 RMSE (XGBoost): {rmse_test_xgb:.4f}")

evaluator.setMetricName("r2")
r2_val_xgb = evaluator.evaluate(predictions_val_xgb)
print(f"검증 데이터셋 R2 (XGBoost): {r2_val_xgb:.4f}")

r2_test_xgb = evaluator.evaluate(predictions_test_xgb)
print(f"테스트 데이터셋 R2 (XGBoost): {r2_test_xgb:.4f}")

evaluator.setMetricName("mae")
mae_val_xgb = evaluator.evaluate(predictions_val_xgb)
print(f"검증 데이터셋 MAE (XGBoost): {mae_val_xgb:.4f}")

mae_test_xgb = evaluator.evaluate(predictions_test_xgb)
print(f"테스트 데이터셋 MAE (XGBoost): {mae_test_xgb:.4f}")

print("\n--- XGBoost 모델 평가 완료 ---")
predictions_test_xgb.select("UHII", "prediction").show(5)

In [0]:
print("\n--- UHII와 수치형 피처 간의 상관관계 ---")

# UHII와 각 수치형 피처 간의 피어슨 상관계수 계산
for col_name in numerical_cols:
    try:
        correlation = assembled_df.stat.corr(LABEL_COL, col_name)
        print(f"UHII 와 {col_name:<25}: {correlation:.4f}")
    except Exception as e:
        print(f"Error calculating correlation for {col_name}: {e}")

# --- 수치형 피처 및 UHII 간의 상관관계 행렬 (Pandas 변환, 메모리 주의!) ---
# 데이터셋의 크기가 크지 않다면 이 방법이 편리합니다.
# 대용량 데이터셋에서는 Spark의 RDD 또는 VectorAssembler 후 행렬 연산 등의 고급 기법이 필요할 수 있습니다.
# 여기서는 예시로 train_df를 사용합니다. 실제로는 전체 assembled_df를 사용하거나 적절히 샘플링하여 사용하세요.

cols_for_corr_matrix = numerical_cols + [LABEL_COL]

try:
    # 필요한 컬럼만 선택하여 Pandas DataFrame으로 변환
    pandas_df_for_corr = assembled_df.select(cols_for_corr_matrix).toPandas()

    print("\n--- 수치형 피처 및 UHII 간의 상관관계 행렬 (Pandas) ---")
    correlation_matrix = pandas_df_for_corr.corr(method='pearson')
    print(correlation_matrix.round(2)) # 소수점 두 자리까지 표시

except Exception as e:
    print(f"\nError converting to Pandas for correlation matrix or calculation: {e}")
    print("Consider sampling the DataFrame or using Spark-native correlation methods if the dataset is too large.")