In [1]:
import unittest
from np_base_func import *
import numpy as np
import pandas as pd
import numpy.testing as npt
import statsmodels.api as sm
df = pd.read_csv("./test/WindPrices.adjclose.csv")

In [2]:
#pandas 版本的结论
# region Auxiliary functions
  
import numpy as np
import pandas as pd
from numpy import abs
from numpy import log
from numpy import sign
from scipy.stats import rankdata
def pd_ts_sum(df, window=10):
    """
    Wrapper function to estimate rolling sum.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series min over the past 'window' days.
    """
    
    return df.rolling(window).sum()

def pd_sma(df, window=10):
    """
    Wrapper function to estimate SMA.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series min over the past 'window' days.
    """
    return df.rolling(window).mean()

def pd_stddev(df, window=10):
    """
    Wrapper function to estimate rolling standard deviation.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series min over the past 'window' days.
    """
    return df.rolling(window).std()

def pd_correlation(x, y, window=10):
    """
    Wrapper function to estimate rolling corelations.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series min over the past 'window' days.
    """
    return x.rolling(window).corr(y)

def pd_covariance(x, y, window=10):
    """
    Wrapper function to estimate rolling covariance.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series min over the past 'window' days.
    """
    return x.rolling(window).cov(y)

def pd_rolling_rank(na):
    """
    Auxiliary function to be used in pd.rolling_apply
    :param na: numpy array.
    :return: The rank of the last value in the array.
    """
    return rankdata(na)[-1]

def pd_ts_rank(df, window=10):
    """
    Wrapper function to estimate rolling rank.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series rank over the past window days.
    """
    return df.rolling(window).apply(rolling_rank)

def rolling_prod(na):
    """
    Auxiliary function to be used in pd.rolling_apply
    :param na: numpy array.
    :return: The product of the values in the array.
    """
    return np.prod(na)

def pd_product(df, window=10):
    """
    Wrapper function to estimate rolling product.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series product over the past 'window' days.
    """
    return df.rolling(window).apply(rolling_prod)

def pd_ts_min(df, window=10): #window内的最小值，ts_argmin 输出的是这个最小值在window的位置
    """
    Wrapper function to estimate rolling min.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series min over the past 'window' days.
    """
    return df.rolling(window).min()

def pd_ts_max(df, window=10):
    """
    Wrapper function to estimate rolling min.
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: a pandas DataFrame with the time-series max over the past 'window' days.
    """
    return df.rolling(window).max()

def pd_delta(df, period=1):
    """
    Wrapper function to estimate difference.
    :param df: a pandas DataFrame.
    :param period: the difference grade.
    :return: a pandas DataFrame with today’s value minus the value 'period' days ago.
    """
    return df.diff(period)

def pd_delay(df, period=1):
    """
    Wrapper function to estimate lag.
    :param df: a pandas DataFrame.
    :param period: the lag grade.
    :return: a pandas DataFrame with lagged time series
    """
    return df.shift(period)

def pd_rank(df):
    """
    Cross sectional rank
    :param df: a pandas DataFrame.
    :return: a pandas DataFrame with rank along columns.
    """
    #return df.rank(axis=1, pct=True)
    return df.rank(pct=True)

def pd_scale(df, k=1):
    """
    Scaling time serie.
    :param df: a pandas DataFrame.
    :param k: scaling factor.
    :return: a pandas DataFrame rescaled df such that sum(abs(df)) = k
    """
    return df.mul(k).div(np.abs(df).sum())

def pd_ts_argmax(df, window=10):
    """
    Wrapper function to estimate which day ts_max(df, window) occurred on
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: well.. that :)
    """
    return df.rolling(window).apply(np.argmax) 

def pd_ts_argmin(df, window=10):  #window 内最小的元素是window的第几个
    """
    Wrapper function to estimate which day ts_min(df, window) occurred on
    :param df: a pandas DataFrame.
    :param window: the rolling window.
    :return: well.. that :)
    """
    return df.rolling(window).apply(np.argmin)

In [3]:
df = df.set_index("TrdDate")
df.index = pd.to_datetime(df.index.astype('str'))

In [4]:
class TestNan(unittest.TestCase):
    def test_rank(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9])
        x_dst = np.array([2/4, np.nan, np.nan, np.nan, 1/4, 3/4, 4/4])
        npt.assert_array_almost_equal(rank(x), x_dst)

        y = np.array([-10, np.inf, -np.inf, 1, 0, 4])
        # 我设计的是inf和-inf不参与排名的。
        y_dst = np.array([1/4, np.nan, np.nan, 3/4, 2/4, 4/4])
        npt.assert_array_almost_equal(rank(y), y_dst)

        z = np.array([np.nan, np.nan, np.nan])
        z_dst = np.array([np.nan, np.nan, np.nan])
        npt.assert_array_almost_equal(rank(z), z_dst)
    
    def test_delay(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9])
        x_dst = np.array([np.nan, 1, np.nan, np.nan, np.nan, -1, 4])

        npt.assert_array_almost_equal(delay(x, 1), x_dst)

        x_dst_2 = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan])
        npt.assert_array_almost_equal(delay(x, 7), x_dst_2)

        x_dst_3 = np.array([np.nan, np.nan, np.nan, -1, 4, 9, np.nan])
        npt.assert_array_almost_equal(delay(x, -1), x_dst_3)

        xx = df['688588']
        npt.assert_array_almost_equal(delay(xx, 1), pd_delay(xx, 1))

    def test_delta(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9])
        x_dst1 = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, 5, 5])

        npt.assert_array_almost_equal(delta(x, 1), x_dst1)

        x_dst2 = np.array([np.nan, np.nan, np.nan, np.nan, -5, -5, np.nan])
        npt.assert_array_almost_equal(delta(x, -1), x_dst2)

        xx = df['688588']
        npt.assert_array_almost_equal(delay(xx, 1), pd_delay(xx, 1))
    def test_covariance(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9])
        y = np.array([np.nan, np.nan, np.nan, np.nan, -4, 10, 2])
        #因为covariance的正确性已经在非nan的版本下测试过，以下只测试其在存在nan的情况下的计算结果是否符合预期
        dst = covariance(np.array([-1, 4, 9]), np.array([-4, 10, 2]))
        
        npt.assert_array_almost_equal(covariance(x, y), dst)

        z = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan])
        npt.assert_array_almost_equal(covariance(x, z), z)

    def test_correlation(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9])
        y = np.array([np.nan, np.nan, np.nan, np.nan, -4, 10, 2])
        #因为covariance的正确性已经在非nan的版本下测试过，以下只测试其在存在nan的情况下的计算结果是否符合预期
        dst = correlation(np.array([-1, 4, 9]), np.array([-4, 10, 2]))
        
        npt.assert_array_almost_equal(correlation(x, y), dst)

    def test_ts_covariance(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9, np.inf, -np.inf, 10, 31, 100])
        y = np.array([1, 2,      3,      4,       5, 6, 7,  8,       9,      10, 11, 12])
        
        #window = 3
        dst = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, 
                        covariance(np.array([-1, 4]), np.array([5, 6])),
                        covariance(np.array([-1, 4, 9]), np.array([5, 6, 7])),
                        covariance(np.array([4, 9]), np.array([6, 7])),
                        np.nan, 
                        np.nan, 
                        covariance(np.array([10, 31]), np.array([10, 11])),
                        covariance(np.array([10,31,100]), np.array([10,11,12]))
                       ])
        npt.assert_array_almost_equal(ts_covariance(x, y, 3), dst)
        
        
    def test_ts_correlation(self):
        x = np.array([1, np.nan, np.nan, np.nan, -1, 4, 9, np.inf, -np.inf, 10, 31, 100])
        y = np.array([1, 2,      3,      4,       5, 6, 7,  8,       9,      10, 11, 12])
        
        #window = 3
        dst = np.array([np.nan, np.nan, np.nan, np.nan, np.nan, 
                        correlation(np.array([-1, 4]), np.array([5, 6])),
                        correlation(np.array([-1, 4, 9]), np.array([5, 6, 7])),
                        correlation(np.array([4, 9]), np.array([6, 7])),
                        np.nan, 
                        np.nan, 
                        correlation(np.array([10, 31]), np.array([10, 11])),
                        correlation(np.array([10,31,100]), np.array([10,11,12]))
                       ])
        npt.assert_array_almost_equal(ts_correlation(x, y, 3), dst)

    def test_scale(self):
        x = np.array([1,-2,np.nan, np.inf, -np.inf])
        #scale
        x_dst = np.array([2*1/3, 2*(-2)/3, np.nan, np.nan, np.nan])
        
        npt.assert_array_almost_equal(scale(x, 2), x_dst)
        
        x_dst1 = np.array([0, 0, np.nan, np.nan, np.nan])
        npt.assert_array_almost_equal(scale(x, 0), x_dst1)
        
        x_dst2 = np.array([-3*1/3, -3*(-2)/3, np.nan, np.nan, np.nan])
        npt.assert_array_almost_equal(scale(x, -3), x_dst2)
        
    def test_decay_linear(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5])
        
        x_dst = np.array([np.nan, np.nan, (1*1 - 2*2)/(1+2), -2, np.nan, 5])
        
        npt.assert_array_almost_equal(decay_linear(x, 3), x_dst)

    def test_signedpower(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 0])
        x_dst = np.array([1, -4, np.nan, np.nan, np.nan, 25, 0])
        
        npt.assert_array_almost_equal(signedpower(x, 2), x_dst)

    def test_ts_sum(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5])
        
        x_dst = np.array([
            np.nan,
            np.nan,
            (1-2)*3/2,
            -2*3,
            np.nan,
            5*3
        ])
        
        npt.assert_array_almost_equal(ts_sum(x, 3), x_dst)

    def test_ts_sma(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5])
        x_dst = np.array([
            np.nan,
            np.nan,
            (1-2)/2,
            -2,
            np.nan,
            5
        ])

        npt.assert_array_almost_equal(ts_sma(x, 3), x_dst)

    def test_stddev(self):
        x = np.array([1,2,4,np.nan,np.nan, np.nan,-5,6,0])
        def std_test(x):
            if len(x) <=1:
                return np.nan
            
            x = np.array(x)
            return np.sqrt(np.sum((x - x.mean())**2)/(len(x) - 1))
        
        x_dst = np.array([
            np.nan, np.nan,
            std_test([1,2,4]),
            std_test([2,4]),
            std_test([4]),
            np.nan,
            std_test([-5]),
            std_test([-5,6]),
            std_test([-5,6,0])
        ])
        
        npt.assert_array_almost_equal(ts_stddev(x, 3), x_dst)

    def test_ts_rank(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 4, 100, -3])
        x_dst = np.array([
            np.nan, 
            np.nan,
            np.nan,
            np.nan,
            np.nan,
            1,
            1,
            3,
            1
        ])
        npt.assert_array_almost_equal(ts_rank(x, 3), x_dst)

    def test_ts_product(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 4, 100, -3])
        x_dst = np.array([
            np.nan,
            np.nan,
            -1*np.abs(1*-2)**(3/2),
            -1*np.abs(-2)**3,
            np.nan,
            5**3,
            (5*4)**(3/2),
            5*4*100,
            4*100*(-3)
        ])
        npt.assert_array_almost_equal(ts_product(x, 3), x_dst)

    def test_ts_min(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 4, 100, -3])
        
        x_dst = np.array([
            np.nan, np.nan,
            -2, -2,
            np.nan,
            5, 
            4,
            4,
            -3
        ])
        npt.assert_equal(ts_min(x, 3), x_dst)

    def test_ts_max(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 4, 100, -3])
        
        x_dst = np.array([
            np.nan, np.nan,
            1, -2,
            np.nan,
            5,
            5,
            100,
            100
        ])
        npt.assert_array_almost_equal(ts_max(x, 3), x_dst)

    def test_ts_argmin(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 4, 100, -3])
        x_dst = np.array([
            np.nan, np.nan,
            1,
            0,
            np.nan,
            2,
            2,
            1,
            2
        ])
        npt.assert_array_almost_equal(ts_argmin(x, 3), x_dst)
        
        
    def test_ts_argmax(self):
        x = np.array([1, -2, np.nan, np.inf, -np.inf, 5, 4, 100, -3])
        x_dst = np.array([
            np.nan, np.nan,
            0,
            0,
            np.nan,
            2,
            1,
            2,
            1
        ])
        npt.assert_array_almost_equal(ts_argmax(x, 3), x_dst)

    def test_indneutralize(self):
        #此处非nan没有测试，一并测试
        y = np.array([1,2,3])
        X = np.array([[1,0], [0,1], [1,0]])
        
        npt.assert_array_almost_equal(indneutralize(y, X), sm.OLS(y, X).fit().resid)
        
        #测试存在空值的情况
        y_na = np.array([1,2,3,np.nan, 4,5,6, np.inf, -10, -np.inf, 7])
        X_na = np.array([[np.nan,0,0],
                         [0,0,1],
                         [1,0,0],
                         [1,0,0],
                         [0,1,0],
                         [9,100,3], #要被丢弃掉
                         [0,0,1],
                         [0,1,0],
                         [0,0,1],
                         [1,0,0],
                         [0,1,0]
                        ])
        y_used = np.array([2,3,4,6,-10,7])
        X_used = np.array([[0,0,1],[1,0,0],[0,1,0],[0,0,1],[0,0,1], [0,1,0]])
        
        res = indneutralize(y_na, X_na)
        res = res[~np.isnan(res)]
        
        npt.assert_array_almost_equal(res, sm.OLS(y_used, X_used).fit().resid)
        
        


In [5]:
#jupyter 中 unittest的使用方法
#https://blog.csdn.net/qq_42702821/article/details/88614718

In [6]:
unittest.main(argv=['first-arg-is-ignored'],exit=False)

....................
----------------------------------------------------------------------
Ran 20 tests in 0.024s

OK


<unittest.main.TestProgram at 0x2e19b155a00>