In [25]:
import matplotlib.pyplot as plt
import platform
import pandas as pd
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# 한글 폰트 설정
if platform.system() == "Windows":
    plt.rc("font", family="Malgun Gothic")
elif platform.system() == "Darwin":
    plt.rc("font", family="AppleGothic")
else:  # Linux 계열 (예: Colab)
    plt.rc("font", family="NanumGothic")

plt.rcParams["axes.unicode_minus"] = False  # 마이너스 깨짐 방지


# 데이터 불러오기
df_pred = pd.read_csv("8호선_예측결과.csv", encoding="utf-8")
df_gate = pd.read_csv(
    "../../data/결과/가까운탑승구/전철역 가까운 탑승구_가공_최종.csv", encoding="euc-kr"
)

In [26]:
line_car_count = {"8호선": 8}
target_car_count = line_car_count["8호선"]


def extract_cars(cell, max_car=8):
    if pd.isna(cell):
        return []
    cars = [v.strip().split("-")[0] for v in str(cell).split(",") if "-" in v]
    return [c for c in cars if c.isdigit() and 1 <= int(c) <= max_car]


df_gate["상행_칸번호"] = df_gate["가까운 출구"].apply(
    lambda x: extract_cars(x, target_car_count)
)
df_gate["하행_칸번호"] = df_gate[df_gate.columns[7]].apply(
    lambda x: extract_cars(x, target_car_count)
)

car_weight_dict = {}
for _, row in df_gate.iterrows():
    역 = row["역명"]
    if row["상행_칸번호"]:
        w = [1] * len(row["상행_칸번호"])
        car_weight_dict[(역, "상행")] = dict(
            zip(row["상행_칸번호"], [wi / sum(w) for wi in w])
        )
    if row["하행_칸번호"]:
        w = [1] * len(row["하행_칸번호"])
        car_weight_dict[(역, "하행")] = dict(
            zip(row["하행_칸번호"], [wi / sum(w) for wi in w])
        )

for 역 in df_pred["역명"].unique():
    for 방향 in ["상행", "하행"]:
        key = (역, 방향)
        if key not in car_weight_dict:
            car_weight_dict[key] = {
                str(k): 1 / target_car_count for k in range(1, target_car_count + 1)
            }

In [27]:
records = []
for _, row in df_pred.iterrows():
    key = (row["역명"], row["방향"])
    if key not in car_weight_dict:
        continue
    for 칸, 비율 in car_weight_dict[key].items():
        records.append(
            {
                "역명": row["역명"],
                "방향": row["방향"],
                "평일/주말": row["평일/주말"],
                "시간대": row["시간대"],
                "혼잡도": row["혼잡도기반"] * 비율,
                "인원수": row["최종보정값"] * 비율,
                "칸번호": 칸,
            }
        )
df_cars = pd.DataFrame(records)


def classify_level(c):
    if c <= 100:
        return "낮음"
    elif c <= 200:
        return "보통"
    elif c <= 250:
        return "혼잡"
    else:
        return "매우 혼잡"


# 분위수 기준 분할
df_cars["혼잡레벨"] = df_cars["혼잡도"].apply(classify_level)

In [28]:
le_st = LabelEncoder()
le_dir = LabelEncoder()
le_day = LabelEncoder()
le_time = LabelEncoder()
le_car = LabelEncoder()


df_cars["역명_enc"] = le_st.fit_transform(df_cars["역명"])
df_cars["방향_enc"] = le_dir.fit_transform(df_cars["방향"])
df_cars["평일/주말_enc"] = le_day.fit_transform(df_cars["평일/주말"])
df_cars["시간대_enc"] = le_time.fit_transform(df_cars["시간대"])
df_cars["칸번호_enc"] = le_car.fit_transform(df_cars["칸번호"])


# 결측값 제거
df_cars = df_cars.dropna(subset=["혼잡도"])

X = df_cars[["역명_enc", "방향_enc", "평일/주말_enc", "시간대_enc", "칸번호_enc"]]
y = df_cars["혼잡도"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

X = df_cars[["역명_enc", "방향_enc", "평일/주말_enc", "시간대_enc", "칸번호_enc"]]
y = df_cars["혼잡도"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

0,1,2
,n_estimators,100
,criterion,'squared_error'
,max_depth,
,min_samples_split,2
,min_samples_leaf,1
,min_weight_fraction_leaf,0.0
,max_features,1.0
,max_leaf_nodes,
,min_impurity_decrease,0.0
,bootstrap,True


In [29]:
def predict_crowd_all(역명, 방향, 평일주말, 시간대, 호선="8호선"):
    total_cars = line_car_count.get(호선, 8)
    칸리스트 = [str(i) for i in range(1, total_cars + 1)]
    input_data = []
    for 칸 in 칸리스트:
        try:
            row = [
                le_st.transform([역명])[0],
                le_dir.transform([방향])[0],
                le_day.transform([평일주말])[0],
                le_time.transform([시간대])[0],
                le_car.transform([칸])[0],
            ]
            input_data.append(row)
        except:
            input_data.append(None)
    results = []
    for 칸, row in zip(칸리스트, input_data):
        pred = model.predict([row])[0] if row is not None else 0.0
        results.append(
            {
                "칸번호": 칸,
                "예측혼잡도": round(pred, 1),
                "혼잡레벨": classify_level(pred),
            }
        )
    return pd.DataFrame(results)

In [30]:
def plot_crowd_and_recommend(역명, 방향, 평일주말, 시간대, 호선="8호선"):
    df_result = predict_crowd_all(역명, 방향, 평일주말, 시간대, 호선).set_index(
        "칸번호"
    )
    plt.figure(figsize=(8, 1.6))
    sns.heatmap(
        [df_result["예측혼잡도"]], annot=True, fmt=".1f", cmap="YlGnBu", cbar=True
    )
    plt.title(f"{역명} {방향} {평일주말} {시간대} 칸별 혼잡도 예측", fontsize=14)
    plt.xlabel("칸번호")
    plt.yticks([0.5], [시간대])
    plt.tight_layout()
    plt.show()
    df_sorted = df_result.sort_values("예측혼잡도")
    min_value = df_sorted["예측혼잡도"].min()
    추천칸 = df_sorted[df_sorted["예측혼잡도"] == min_value].index.tolist()
    print(f"✅ {역명} {방향} {시간대} 기준 혼잡도 가장 낮은 칸:")
    for 칸 in 추천칸:
        print(f"  - 칸 {칸} (혼잡도 {min_value})")

In [31]:
from tqdm import tqdm


def generate_all_predictions_csv(save_path="8호선_혼잡도_전체예측결과.csv"):
    pred_rows = []
    total_cars = line_car_count["8호선"]
    car_list = [str(i) for i in range(1, total_cars + 1)]

    역명목록 = df_pred["역명"].unique()
    방향목록 = ["상행", "하행"]
    요일목록 = ["평일", "주말"]
    시간목록 = df_pred["시간대"].unique()

    print("모든 경우의 수 예측 중...")

    for 역 in tqdm(역명목록):
        for 방향 in 방향목록:
            for 요일 in 요일목록:
                for 시간 in 시간목록:
                    try:
                        X_input = []
                        for 칸 in car_list:
                            row = [
                                le_st.transform([역])[0],
                                le_dir.transform([방향])[0],
                                le_day.transform([요일])[0],
                                le_time.transform([시간])[0],
                                le_car.transform([칸])[0],
                            ]
                            X_input.append(row)
                        preds = model.predict(X_input)
                        for 칸, pred in zip(car_list, preds):
                            pred_rows.append(
                                {
                                    "역명": 역,
                                    "방향": 방향,
                                    "평일/주말": 요일,
                                    "시간대": 시간,
                                    "칸번호": 칸,
                                    "예측혼잡도": round(pred, 1),
                                    "혼잡레벨": classify_level(pred),
                                }
                            )
                    except:
                        continue
    df_all = pd.DataFrame(pred_rows)
    df_all.to_csv(save_path, index=False, encoding="utf-8-sig")
    print(f"✅ 저장 완료: {save_path}")
    return df_all

In [32]:
generate_all_predictions_csv()

모든 경우의 수 예측 중...


100%|██████████| 18/18 [00:06<00:00,  2.74it/s]

✅ 저장 완료: 8호선_혼잡도_전체예측결과.csv





Unnamed: 0,역명,방향,평일/주말,시간대,칸번호,예측혼잡도,혼잡레벨
0,암사,상행,평일,06:00,1,9.8,낮음
1,암사,상행,평일,06:00,2,9.8,낮음
2,암사,상행,평일,06:00,3,9.8,낮음
3,암사,상행,평일,06:00,4,9.8,낮음
4,암사,상행,평일,06:00,5,9.8,낮음
...,...,...,...,...,...,...,...
10363,남위례,하행,주말,23:00,4,170.0,보통
10364,남위례,하행,주말,23:00,5,184.2,보통
10365,남위례,하행,주말,23:00,6,165.1,보통
10366,남위례,하행,주말,23:00,7,165.2,보통
