In [14]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import datetime
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
print("파일명들을 입력해주세요 (.csv는 입력 x)")
real_data_name = input("실제 데이터 파일명 : ")
timetable_name = input("timetable 데이터 파일명 : ")
output_name = input("예측 파일명 : ")

PATH = '../~~'
real_data_path = f'{PATH}{real_data_name}.csv'
timetable_path = f'{PATH}{timetable_name}.csv'
OUTPUT_CSV_NAME = f'{output_name}.csv'

In [None]:
# 기본함수
def time_to_minutes(time_str):
    # h -> m
    hours, minutes = map(int, time_str.split(':'))
    return hours * 60 + minutes

def minutes_to_time(minutes):
    # m -> h
    hours = minutes // 60
    mins = minutes % 60
    return f'{hours:02d}:{mins:02d}'

In [15]:
# 1. 데이터 로드
real_data = pd.read_csv(real_data_path)
timetable = pd.read_csv(timetable_path)

print("\ntimetable data:\n", timetable.head())
print("\narrival data:\n", real_data.head())

NameError: name 'real_data_path' is not defined

In [16]:
# 2. 데이터 전처리 시작
days = ['MON', 'TUE', 'WED', 'THU', 'FRI']

# 시간 data -> minute로 변환
for day in days:
    timetable[f'{day}_SCHOOL_DEPART'] = timetable[f'{day}_SCHOOL_DEPART'].apply(time_to_minutes)
    timetable[f'{day}_STATION_DEPART'] = timetable[f'{day}_STATION_DEPART'].apply(time_to_minutes)
    timetable[f'{day}_SCHOOL_ARRIVE'] = timetable[f'{day}_SCHOOL_ARRIVE'].apply(time_to_minutes)

real_data['ARRIVAL_TIME'] = real_data['ARRIVAL_TIME'].apply(time_to_minutes)

print("\ntimetable data:\n", timetable.head())
print("\narrival data:\n", real_data.head())

In [None]:
# Training data 생성
training_data = []

# 셔틀이 절대 없는 시간대
# 셔틀이 절!대 없는 시간대
NO_SHUTTLE_TIME = [
    ["00:00", "07:40"],
    ["08:30", "08:40"],
    ["09:30", "09:40"],
    ["10:30", "11:45"],
    ["12:30", "12:45"],
    ["13:30", "13:45"],
    ["14:30", "14:45"],
    ["18:45", "19:05"],
    ["19:45", "24:00"]
]

# 분으로 변환
NO_SHUTTLE_TIME = [[time_to_minutes(start), time_to_minutes(end)] for start, end in NO_SHUTTLE_TIME]

for day in days:
    # Real data 전처리
    day_arrivals = real_data[real_data['DAY'] == day]

    # 가짜 데이터 걸러내기
    for start, end in NO_SHUTTLE_TIME:
        day_arrivals = day_arrivals[
            ~((day_arrivals['ARRIVAL_TIME'] >= start) &
              (day_arrivals['ARRIVAL_TIME'] <= end))
        ]

    for _, arrival in day_arrivals.iterrows():
        arrival_time = arrival['ARRIVAL_TIME']
        depart_at = arrival['DEPART_AT']

        # 모든 row를 대상으로 가장 가까운 시간 찾기
        if depart_at == 'STA':
            # station arrival인 경우
            time_diffs = abs(timetable[f'{day}_STATION_DEPART'] - arrival_time)
        else:
            # school arrival인 경우
            time_diffs = abs(timetable[f'{day}_SCHOOL_DEPART'] - arrival_time)

        # 가장 가까운 시간을 가진 row 찾기
        closest_idx = time_diffs.idxmin()
        min_diff = time_diffs.min()

        # 15분 이내의 차이만 인정
        if min_diff <= 15:
            closest_row = timetable.loc[closest_idx]

            # 해당 요일의 FIXED가 False인 경우만 training data에 추가
            if not closest_row[f'{day}_FIXED']:
                features = {
                    'day': day,
                    'planned_school_depart': closest_row[f'{day}_SCHOOL_DEPART'],
                    'planned_station_depart': closest_row[f'{day}_STATION_DEPART'],
                    'planned_school_arrive': closest_row[f'{day}_SCHOOL_ARRIVE'],
                    'depart_at': depart_at,
                    'arrival_time': arrival_time,
                }

                if depart_at == 'STA':
                    target = arrival_time - closest_row[f'{day}_STATION_DEPART']
                else:
                    target = arrival_time - closest_row[f'{day}_SCHOOL_DEPART']

                training_data.append({**features, 'target': target})

In [None]:
# 3. 데이터프레임으로 변환
df = pd.DataFrame(training_data)

# 요일을 숫자로 인코딩
day_mapping = {day: i for i, day in enumerate(days)}
df['day'] = df['day'].map(day_mapping)

# depart_at을 숫자로 인코딩
depart_mapping = {'STA': 0, 'SCH': 1}
df['depart_at'] = df['depart_at'].map(depart_mapping)

In [None]:
# 4. 특성과 타겟 분리
features = ['day', 'planned_school_depart', 'planned_station_depart', 'depart_at']
X = df[features]
y = df['target']

In [None]:
# 5. 학습/테스트 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# 6. RandomForestRegressor 모델 학습
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

In [None]:
# 7. 모델 평가
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
print(f"Training R2 Score: {train_score:.4f}")
print(f"Test R2 Score: {test_score:.4f}")

In [None]:
# 8. 예측을 위한 새로운 데이터 생성
all_predictions = []

# 9-1. Fixed=True인 행들 먼저 처리
for _, row in timetable[timetable['MON_FIXED']].iterrows():  # MON_FIXED를 기준으로 함
    prediction_row = {}
    # 각 요일별 FIXED 값 설정
    for day in days:
        prediction_row[f'{day}_FIXED'] = row[f'{day}_FIXED']

    for day in days:
        school_depart = row[f'{day}_SCHOOL_DEPART']
        station_depart = row[f'{day}_STATION_DEPART']
        school_arrive = row[f'{day}_SCHOOL_ARRIVE']

        prediction_row[f'{day}_SCHOOL_DEPART'] = minutes_to_time(school_depart)
        prediction_row[f'{day}_STATION_DEPART'] = minutes_to_time(station_depart)
        prediction_row[f'{day}_SCHOOL_ARRIVE'] = minutes_to_time(school_arrive)

    # 정렬을 위한 시간 추가
    prediction_row['sort_time'] = row['MON_SCHOOL_DEPART']
    all_predictions.append(prediction_row)

# 9-2. Fixed=False인 행들 예측
for _, row in timetable[~timetable['MON_FIXED']].iterrows():  # MON_FIXED를 기준으로 함
    prediction_row = {}
    # 각 요일별 FIXED 값 설정
    for day in days:
        prediction_row[f'{day}_FIXED'] = row[f'{day}_FIXED']

    orig_mon_time = row['MON_SCHOOL_DEPART']  # 정렬용 원본 시간 저장

    for day in days:
        # 학교 출발 예측
        school_features = pd.DataFrame({
            'day': [day_mapping[day]],
            'planned_school_depart': [row[f'{day}_SCHOOL_DEPART']],
            'planned_station_depart': [row[f'{day}_STATION_DEPART']],
            'depart_at': [depart_mapping['SCH']]
        })
        school_pred = model.predict(school_features)[0]

        # 역 출발 예측
        station_features = pd.DataFrame({
            'day': [day_mapping[day]],
            'planned_school_depart': [row[f'{day}_SCHOOL_DEPART']],
            'planned_station_depart': [row[f'{day}_STATION_DEPART']],
            'depart_at': [depart_mapping['STA']]
        })
        station_pred = model.predict(station_features)[0]

        # 예측된 실제 시간 계산
        pred_school_depart = int(row[f'{day}_SCHOOL_DEPART'] + school_pred)
        pred_station_depart = int(row[f'{day}_STATION_DEPART'] + station_pred)
        pred_school_arrive = pred_station_depart + 15

        prediction_row[f'{day}_SCHOOL_DEPART'] = minutes_to_time(pred_school_depart)
        prediction_row[f'{day}_STATION_DEPART'] = minutes_to_time(pred_station_depart)
        prediction_row[f'{day}_SCHOOL_ARRIVE'] = minutes_to_time(pred_school_arrive)

    # 정렬을 위한 시간 추가
    prediction_row['sort_time'] = orig_mon_time
    all_predictions.append(prediction_row)

In [None]:
# 10. 결과를 데이터프레임으로 변환하고 정렬
result_df = pd.DataFrame(all_predictions)

# 각 요일별로 독립적으로 정렬
days = ['MON', 'TUE', 'WED', 'THU', 'FRI']
sorted_dfs = []

for day in days:
    # 해당 요일의 컬럼만 선택
    day_columns = [
        f'{day}_FIXED',
        f'{day}_SCHOOL_DEPART',
        f'{day}_STATION_DEPART',
        f'{day}_SCHOOL_ARRIVE'
    ]
    day_df = result_df[day_columns].copy()
    # 해당 요일의 SCHOOL_DEPART로 정렬
    sorted_indices = day_df[f'{day}_SCHOOL_DEPART'].sort_values().index
    sorted_dfs.append(day_df.loc[sorted_indices])

# 정렬된 각 요일의 데이터프레임을 순서대로 옆으로 붙이기
final_df = pd.concat(sorted_dfs, axis=1)

# 원하는 컬럼 순서로 재정렬
desired_columns = [
    'MON_FIXED', 'MON_SCHOOL_DEPART', 'MON_STATION_DEPART', 'MON_SCHOOL_ARRIVE',
    'TUE_FIXED', 'TUE_SCHOOL_DEPART', 'TUE_STATION_DEPART', 'TUE_SCHOOL_ARRIVE',
    'WED_FIXED', 'WED_SCHOOL_DEPART', 'WED_STATION_DEPART', 'WED_SCHOOL_ARRIVE',
    'THU_FIXED', 'THU_SCHOOL_DEPART', 'THU_STATION_DEPART', 'THU_SCHOOL_ARRIVE',
    'FRI_FIXED', 'FRI_SCHOOL_DEPART', 'FRI_STATION_DEPART', 'FRI_SCHOOL_ARRIVE'
]

result_df = final_df[desired_columns]

In [None]:
# 11. CSV 파일로 저장
OUTPUT_PATH = f'{PATH}{OUTPUT_CSV_NAME}'
result_df.to_csv(OUTPUT_PATH, index=False)
print(f"\n{OUTPUT_CSV_NAME}이 \n {OUTPUT_PATH}에 저장되었습니다")

In [None]:
# 12. 모델 성능 시각화
plt.figure(figsize=(10, 5))

# 실제값과 예측값 비교
y_pred = model.predict(X_test)
plt.scatter(y_test, y_pred, alpha=0.5)
plt.plot([-15, 15], [-15, 15], 'r--')
plt.xlabel('real time diff (m)')
plt.ylabel('predicted time diff (m)')
plt.title('compare real & predicted time diff')

# 그래프 저장
plt.show()
plt.close()

In [None]:
# 13. 특성 중요도 시각화
feature_importance = pd.DataFrame({
    'feature': features,
    'importance': model.feature_importances_
})
feature_importance = feature_importance.sort_values('importance', ascending=True)

plt.figure(figsize=(10, 5))
plt.barh(feature_importance['feature'], feature_importance['importance'])
plt.xlabel('importance')
plt.title('feature importance')
plt.show()
plt.close()

print("\n예측 결과 샘플 (처음 5개 행):")
print(result_df.head())

In [None]:
# 14. 모델 성능 상세 분석
from sklearn.metrics import mean_absolute_error, mean_squared_error
import numpy as np

mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)

print("\n모델 성능 지표:")
print(f"Mean Absolute Error: {mae:.2f} 분")
print(f"Root Mean Squared Error: {rmse:.2f} 분")