In [1]:
def sarima_model(train_data, parameter_dic):
    error_threshold = int(parameter_dic["error_threshold"])
    imputation_window_size = int(parameter_dic["imputation_window_size"])
    seasonality = int(parameter_dic["seasonality"])

    imputated_data = imputation(train_data, seasonality)
    vst_value, value_min = VST(imputated_data)
    vst_pred, converge_check = SARIMA_fit(vst_value, seasonality)

    if converge_check == True:
        pred = SARIMA_prediction(vst_pred, value_min)
        pe1 = SARIMA_predictionError(vst_pred, imputated_data, error_threshold)
        errorcode = 0
    else:
        pred, pe1, errorcode = None, None, "301"

    return pred, pe1, errorcode

In [2]:
def SARIMA_fit(vst_value, seasonality):
    try:
        vst_value = vst_value.reset_index(drop=True)
        vst_test_target = vst_value[(seasonality + 1) : len(vst_value)].reset_index(
            drop=True
        )
        Y_Sdiff = vst_value[1 : (len(vst_value) - seasonality)].reset_index(drop=True)
        Y_SAR = vst_value[seasonality : (len(vst_value) - 1)].reset_index(
            drop=True
        ) - vst_value[0 : (len(vst_value) - seasonality - 1)].reset_index(drop=True)

        A = np.vstack([Y_SAR, np.ones(len(Y_SAR))]).T
        y = vst_test_target - Y_Sdiff

        leastSQ = np.linalg.lstsq(A, y)
        m, c = leastSQ[0]
        lse_fit = (
            vst_value[len(vst_value) - seasonality]
            + m
            * (
                vst_value[len(vst_value) - 1]
                - vst_value[len(vst_value) - seasonality - 1]
            )
            + c
        )
        rss = np.sqrt(leastSQ[1][0] / len(A))

        class vst_pred_class(object):
            def __init__(self, predicted_mean, se_mean):
                self.predicted_mean = predicted_mean
                self.se_mean = se_mean

        vst_pred = vst_pred_class(lse_fit, rss)
        converge_check = True
        return vst_pred, converge_check
    except Exception as ex:
        print(ex)
        return None, False

In [3]:

def SARIMA_prediction(vst_pred, value_min):
    try:
        if value_min < 0:
            pred = float(np.power(vst_pred.predicted_mean, 2)) + value_min
        else:
            pred = float(np.power(vst_pred.predicted_mean, 2))
        return pred
    except Exception as ex:
        print(ex)
        return 0


In [4]:

def SARIMA_predictionError(vst_pred, value, error_threshold):
    weight1 = 0.5

    try:
        pred = float(np.power(vst_pred.predicted_mean, 2))
        confi_High = float(
            np.power((vst_pred.predicted_mean + vst_pred.se_mean * error_threshold), 2)
        )
        confidenceError = confi_High - pred

        pe1 = confidenceError + weight1

        return pe1
    except Exception as ex:
        print(ex)
        return "SARIMA_predictionError error"


In [5]:

def VST(value):
    try:
        # 음수 데이터의 경우 vst를 적용하기 위해 최소값으로 선형변환 하기 위함
        value_min = np.min(value)

        if np.min(value) < 0:
            value_tmp = value - value_min
            vst_value = np.sqrt(value_tmp)
            return vst_value, value_min
        else:
            vst_value = np.sqrt(value)
            return vst_value, value_min
    except Exception as ex:
        print(ex)
        return "VST error"
