In [201]:
def dm_test(actual_lst, pred1_lst, pred2_lst, h=1, crit="MSE", power=2):
    """
    这个是DM检验的代码
    :param actual_lst: 真实的序列值
    :param pred1_lst: 第一个模型预测的结果
    :param pred2_lst: 第二个模型预测的结果
    :param h: 预测模型是几步预测，h就是几
    :param crit: 计算连个模型的预测偏差，的差值 d 时，使用的公式：有MSE,MAD,MAPE,poly，推荐MSE
    :param power: 只有crit=poly是用这个，计算d时使用： (模型1的偏差)的power次方 - (模型2的偏差)的power次方
    :return:
    """

    # Routine for checking errors
    def error_check():
        rt = 0
        msg = ""
        # Check if h is an integer
        if not isinstance(h, int):
            rt = -1
            msg = "The type of the number of steps ahead (h) is not an integer."
            return rt, msg
        # Check the range of h
        if h < 1:
            rt = -1
            msg = "The number of steps ahead (h) is not large enough."
            return rt, msg
        len_act = len(actual_lst)
        len_p1 = len(pred1_lst)
        len_p2 = len(pred2_lst)
        # Check if lengths of actual values and predicted values are equal
        if len_act != len_p1 or len_p1 != len_p2 or len_act != len_p2:
            rt = -1
            msg = "Lengths of actual_lst, pred1_lst and pred2_lst do not match."
            return rt, msg
        # Check range of h
        if h >= len_act:
            rt = -1
            msg = "The number of steps ahead is too large."
            return rt, msg
        # Check if criterion supported
        if crit != "MSE" and crit != "MAPE" and crit != "MAD" and crit != "poly":
            rt = -1
            msg = "The criterion is not supported."
            return rt, msg
            # Check if every value of the input lists are numerical values
        from re import compile as re_compile
        comp = re_compile("^\d+?\.\d+?$")

        def compiled_regex(s):
            """ Returns True is string is a number. """
            if comp.match(s) is None:
                return s.isdigit()
            return True

        for actual, pred1, pred2 in zip(actual_lst, pred1_lst, pred2_lst):
            is_actual_ok = compiled_regex(str(abs(actual)))
            is_pred1_ok = compiled_regex(str(abs(pred1)))
            is_pred2_ok = compiled_regex(str(abs(pred2)))
            if not (is_actual_ok and is_pred1_ok and is_pred2_ok):
                msg = "An element in the actual_lst, pred1_lst or pred2_lst is not numeric."
                rt = -1
                return rt, msg
        return rt, msg

    # Error check
    error_code = error_check()
    # Raise error if cannot pass error check
    if error_code[0] == -1:
        raise SyntaxError(error_code[1])
        return
    # Import libraries
    from scipy.stats import t
    import collections
    import pandas as pd
    import numpy as np

    # Initialise lists
    e1_lst = []
    e2_lst = []
    d_lst = []

    # convert every value of the lists into real values
    actual_lst = pd.Series(actual_lst).apply(lambda x: float(x)).tolist()
    pred1_lst = pd.Series(pred1_lst).apply(lambda x: float(x)).tolist()
    pred2_lst = pd.Series(pred2_lst).apply(lambda x: float(x)).tolist()

    # Length of lists (as real numbers)
    T = float(len(actual_lst))

    # construct d according to crit
    if crit == "MSE":
        for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
            e1_lst.append((actual - p1) ** 2)
            e2_lst.append((actual - p2) ** 2)
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)
    elif crit == "MAD":
        for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
            e1_lst.append(abs(actual - p1))
            e2_lst.append(abs(actual - p2))
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)
    elif crit == "MAPE":
        for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
            e1_lst.append(abs((actual - p1) / actual))
            e2_lst.append(abs((actual - p2) / actual))
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)
    elif crit == "poly":
        for actual, p1, p2 in zip(actual_lst, pred1_lst, pred2_lst):
            e1_lst.append(((actual - p1)) ** (power))
            e2_lst.append(((actual - p2)) ** (power))
        for e1, e2 in zip(e1_lst, e2_lst):
            d_lst.append(e1 - e2)

            # Mean of d
    mean_d = pd.Series(d_lst).mean()

    # Find autocovariance and construct DM test statistics
    def autocovariance(Xi, N, k, Xs):
        autoCov = 0
        T = float(N)
        for i in np.arange(0, N - k):
            autoCov += ((Xi[i + k]) - Xs) * (Xi[i] - Xs)
        return (1 / (T)) * autoCov

    gamma = []
    for lag in range(0, h):
        gamma.append(autocovariance(d_lst, len(d_lst), lag, mean_d))  # 0, 1, 2
    V_d = (gamma[0] + 2 * sum(gamma[1:])) / T
    DM_stat = V_d ** (-0.5) * mean_d
    harvey_adj = ((T + 1 - 2 * h + h * (h - 1) / T) / T) ** (0.5)
    DM_stat = harvey_adj * DM_stat
    # Find p-value
    p_value = 2 * t.cdf(-abs(DM_stat), df=T - 1)

    # Construct named tuple for return
    dm_return = collections.namedtuple('dm_return', 'DM p_value')
    rt = dm_return(DM=DM_stat, p_value=p_value)
    return rt



In [202]:
import pandas as pd
data=pd.read_excel('../5 sub-models/寿县_28.xlsx')
data.head()

Unnamed: 0,ETO,XGB,RF,CNN,GRU,MLP,Proposed
0,4.243704,3.1743,3.792897,4.212972,3.943855,3.914148,3.290865
1,3.635035,2.46688,3.36041,3.118935,3.643656,3.079274,2.594209
2,3.537506,3.293312,3.393853,3.748376,2.853267,1.950341,3.319929
3,3.440828,3.457177,3.678498,3.33621,2.712084,2.057823,3.455395
4,2.87874,3.976076,3.720689,3.634695,4.603443,4.256463,3.856467


In [203]:
Observed=data['ETO']
proposed=data['Proposed']
lst1=data['XGB']
lst2=data['RF']
lst3=data['CNN']
lst4=data['GRU']
lst5=data['MLP']


In [204]:
rt = dm_test(Observed, proposed, lst1, h=1, crit="MSE")
print("crit：MSE，", rt)

crit：MSE， dm_return(DM=-29.31587862385956, p_value=7.119324589400484e-174)


In [205]:
rt = dm_test(Observed, proposed, lst2, h=1, crit="MSE")
print("crit：MSE，", rt)

crit：MSE， dm_return(DM=-7.705535247761928, p_value=1.5740255056879218e-14)


In [206]:
rt = dm_test(Observed, proposed, lst3, h=1, crit="MSE")
print("crit：MSE，", rt)

crit：MSE， dm_return(DM=-0.8505104910055774, p_value=0.3950839305343212)


In [207]:
rt = dm_test(Observed, proposed, lst4, h=1, crit="MSE")
print("crit：MSE，", rt)

crit：MSE， dm_return(DM=-3.028465785844647, p_value=0.002471114082885568)


In [208]:
rt = dm_test(Observed, proposed, lst5, h=1, crit="MSE")
print("crit：MSE，", rt)

crit：MSE， dm_return(DM=-1.308238236516777, p_value=0.19085542070656586)


DM<0  模型1比模型2更好                      DM>0  模型2比模型1更好