In [16]:
import pandas as pd
import os
import glob
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import SimpleExpSmoothing
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score

In [17]:
FILE_DIR = 'datasets/cleaned/'
OUT_DIR = 'datasets/test_forecast'
PATTERN = '*_cleaned.csv'
METRICS_FILE = 'datasets/test_forecast/metrics_summary.csv'
TEST_DAYS = 7  # число дней для тестовой выборки

In [18]:
def read_and_agg(fp):
    """
    Убирает пропуски и ошибки
    Агрегирует поминутные данные до дневных
    """
    df = pd.read_csv(fp, parse_dates=['Datetime'])
    df = df.sort_values('Datetime')
    df = df[df['IsMissing'] == 0].dropna(subset=['Final_Close'])

    # Превращаем Datetime в дату
    df['Date'] = pd.to_datetime(df['Datetime'].dt.date)

    # Группируем по дням
    daily = (
        df.groupby('Date')
          .agg({'Final_Close': 'mean'})
          .reset_index()
          .rename(columns={'Final_Close': 'Price'})
    )
    return daily

def split_data(daily, test_days):
    train = daily.iloc[:-test_days].copy()
    test = daily.iloc[-test_days:].copy()

    # to_datetime с ежедневной частотой 
    train['Date'] = pd.to_datetime(train['Date'])
    train = train.set_index('Date').asfreq('D')
    train['Price'] = train['Price'].interpolate(method='time')

    test['Date'] = pd.to_datetime(test['Date'])
    test = test.set_index('Date').asfreq('D')
    test['Price'] = test['Price'].interpolate(method='time')

    return train, test

def stationarize(train):
    """
    Проверяем на стационарность.
    Если P-значение > 0.05, делаем разность первого порядка.
    Возвращаем (DataFrame, порядок d).
    """
    adf_res = adfuller(train['Price'])
    if adf_res[1] > 0.05:
        train['Diff'] = train['Price'].diff().dropna()
        st_train = train.dropna(subset=['Diff'])
        return st_train, 1
    return train, 0

In [19]:
# SES
def ses_forecast(train, steps):

    preds = []
    cur = train['Price'].copy()
    idx = cur.index[-1]

    for _ in range(steps):
        model = SimpleExpSmoothing(cur).fit(optimized=True)
        f = model.forecast(1).iloc[-1]

        lo = f * 0.95
        hi = f * 1.05
        preds.append((f, lo, hi))

        idx += pd.Timedelta(days=1)
        cur.loc[idx] = f

    return preds

# ARIMA
def arima_forecast(train, steps):

    st_train, d = stationarize(train.copy())

    # Если d=1, работаем с train['Diff'], иначе с train['Price']
    series_for_fit = st_train['Diff'] if d == 1 else train['Price']

    best_aic = float('inf')
    best_order = None

    for p in range(1, 3):
        for q in range(1, 3):
            try:
                model = ARIMA(series_for_fit, order=(p, d, q))
                res = model.fit()
                if res.aic < best_aic:
                    best_aic = res.aic
                    best_order = (p, d, q)
            except:
                continue

    preds = []
    cur = train['Price'].copy()
    idx = cur.index[-1]

    for _ in range(steps):
        model = ARIMA(
            cur, 
            order=best_order,
            enforce_stationarity=False,
            enforce_invertibility=False
        ).fit()

        f = model.forecast(1).iloc[-1]

        lo = f * 0.95
        hi = f * 1.05
        preds.append((f, lo, hi))

        idx += pd.Timedelta(days=1)
        cur.loc[idx] = f

    return preds


In [20]:
def plot_forecast(test, ses_preds, ari_preds):
    plt.figure(figsize=(12, 6))

    # Реальные данные (последние 14 дней)
    real_dates = test.index[-14:]
    real_vals = test['Price'][-14:]
    plt.plot(real_dates, real_vals, label='Реальные (посл. 14 дн.)',
             color='blue', marker='o', linestyle='-')

    # ARIMA (3 дня)
    ari_dates = [test.index[-1] + pd.Timedelta(days=i) for i in range(1, len(ari_preds) + 1)]
    ari_vals = [x[0] for x in ari_preds]
    ari_lo = [x[1] for x in ari_preds]
    ari_hi = [x[2] for x in ari_preds]

    plt.plot([real_dates[-1]] + ari_dates, [real_vals.iloc[-1]] + ari_vals,
             color='red', marker='o', linestyle='-', label='ARIMA')
    plt.fill_between(ari_dates, ari_lo, ari_hi, color='red', alpha=0.2)

    # SES (3 дня)
    ses_dates = [test.index[-1] + pd.Timedelta(days=i) for i in range(1, len(ses_preds) + 1)]
    ses_vals = [x[0] for x in ses_preds]
    ses_lo = [x[1] for x in ses_preds]
    ses_hi = [x[2] for x in ses_preds]

    plt.plot([real_dates[-1]] + ses_dates, [real_vals.iloc[-1]] + ses_vals,
             color='green', marker='o', linestyle='-', label='SES')
    plt.fill_between(ses_dates, ses_lo, ses_hi, color='blue', alpha=0.2)

    plt.title('Прогноз цены (последние 14 дней + 3 дня вперёд)')
    plt.xlabel('Дата')
    plt.ylabel('Цена')
    plt.xticks(rotation=90)
    plt.legend()
    plt.grid(True)
    plt.show()

In [21]:
def main():
    file_paths = glob.glob(os.path.join(FILE_DIR, PATTERN))
    os.makedirs(OUT_DIR, exist_ok=True)

    metrics_list = []

    for fp in file_paths:
        print(f"\nОбработка файла: {fp}")

        daily = read_and_agg(fp)
        ticker = os.path.basename(fp).split('_')[0]

        if len(daily) < TEST_DAYS:
            print("Слишком мало данных для прогноза.")
            continue

        train, test = split_data(daily, TEST_DAYS)

        ses_preds = ses_forecast(train, 3)
        ari_preds = arima_forecast(train, 3)

        print(f"\nПрогнозы для {ticker}:")
        print("SES:", [round(x[0], 3) for x in ses_preds])
        print("ARIMA:", [round(x[0], 3) for x in ari_preds])

        # Метрики за 3 дня
        test_3 = test['Price'][:3]
        ses_vals_3 = [x[0] for x in ses_preds]
        ari_vals_3 = [x[0] for x in ari_preds]

        ses_mae = mean_absolute_error(test_3, ses_vals_3)
        ses_mape = mean_absolute_percentage_error(test_3, ses_vals_3)
        try:
            ses_r2 = r2_score(test_3, ses_vals_3)
        except:
            ses_r2 = float('nan')

        ari_mae = mean_absolute_error(test_3, ari_vals_3)
        ari_mape = mean_absolute_percentage_error(test_3, ari_vals_3)
        try:
            ari_r2 = r2_score(test_3, ari_vals_3)
        except:
            ari_r2 = float('nan')

        print("\n=== Итоговые метрики (3 дня) ===")
        print(f"SES:   MAE={ses_mae:.3f}, MAPE={ses_mape:.3f}, R2={ses_r2:.3f}")
        print(f"ARIMA: MAE={ari_mae:.3f}, MAPE={ari_mape:.3f}, R2={ari_r2:.3f}")

        # Метрики по каждому дню (1, 2, 3)
        print("\n=== Метрики по каждому дню ===")
        for i in range(3):
            act = test_3.iloc[i]
            ses_pred = ses_vals_3[i]
            ari_pred = ari_vals_3[i]

            mae_ses_day = mean_absolute_error([act], [ses_pred])
            mape_ses_day = mean_absolute_percentage_error([act], [ses_pred])

            mae_ari_day = mean_absolute_error([act], [ari_pred])
            mape_ari_day = mean_absolute_percentage_error([act], [ari_pred])

            print(f"День {i+1}:")
            print(f"  SES:   MAE={mae_ses_day:.4f}, MAPE={mape_ses_day:.4f}, R2=nan")
            print(f"  ARIMA: MAE={mae_ari_day:.4f}, MAPE={mape_ari_day:.4f}, R2=nan")

        # метрики в общий список
        metrics_list.append({
            'Ticker': ticker,
            'SES_MAE_3': ses_mae,
            'SES_MAPE_3': ses_mape,
            'SES_R2_3': ses_r2,
            'ARIMA_MAE_3': ari_mae,
            'ARIMA_MAPE_3': ari_mape,
            'ARIMA_R2_3': ari_r2
        })

        plot_forecast(test, ses_preds, ari_preds)

    if metrics_list:
        out_df = pd.DataFrame(metrics_list)
        out_df.to_csv(METRICS_FILE, index=False)
        print(f"\nВсе метрики сохранены в файл {METRICS_FILE}")


In [22]:
if __name__ == "__main__":
    main()