In [1]:
# -*- coding: utf-8 -*-
import QUANTAXIS as QA
import pandas as pd

def MACD_JCSC(dataframe, SHORT=12, LONG=26, M=9):
    """
    1.DIF向上突破DEA，买入信号参考。
    2.DIF向下跌破DEA，卖出信号参考。
    """
    CLOSE = dataframe.close
    DIFF = QA.EMA(CLOSE, SHORT) - QA.EMA(CLOSE, LONG)
    DEA = QA.EMA(DIFF, M)
    MACD = 2*(DIFF-DEA)

    CROSS_JC = QA.CROSS(DIFF, DEA)
    CROSS_SC = QA.CROSS(DEA, DIFF)
    ZERO = 0
    return pd.DataFrame({'DIFF': DIFF, 'DEA': DEA, 'MACD': MACD, 'CROSS_JC': CROSS_JC, 'CROSS_SC': CROSS_SC, 'ZERO': ZERO})

In [2]:
def QSDD(dataframe, SHORT=12, LONG=26, M=9):
    """
    1.line_mid向上突破line_long，买入信号参考。
    2.line_mid向下跌破line_long，卖出信号参考。
    """
    OPEN = dataframe.open
    HIGH = dataframe.high
    LOW = dataframe.low
    CLOSE = dataframe.close

    # QSDD策略
    # A = talib.MA(-100 * (talib.MAX(HIGH, 34) - CLOSE) / (talib.MAX(HIGH, 34) - talib.MIN(LOW, 34)), 19)
    # B = -100 * (talib.MAX(HIGH, 14) - CLOSE) / (talib.MAX(HIGH, 14) - talib.MIN(LOW, 14))
    # D = talib.EMA(-100 * (talib.MAX(HIGH, 34) - CLOSE) / (talib.MAX(HIGH, 34) - talib.MIN(LOW, 34)), 4)
    A = QA.MA(-100 * (QA.HHV(HIGH, 34) - CLOSE) /
              (QA.HHV(HIGH, 34) - QA.LLV(LOW, 34)), 19)
    B = -100 * (QA.HHV(HIGH, 14) - CLOSE) / \
        (QA.HHV(HIGH, 14) - QA.LLV(LOW, 14))
    D = QA.EMA(-100 * (QA.HHV(HIGH, 34) - CLOSE) /
               (QA.HHV(HIGH, 34) - QA.LLV(LOW, 34)), 4)

    line_long = A + 100
    line_short = B + 100
    line_mid = D + 100  # 信号线

    CROSS_JC = QA.CROSS(line_mid, line_long)
    CROSS_SC = QA.CROSS(line_long, line_mid)
    return pd.DataFrame({'line_mid': line_mid, 'line_long': line_long, 'CROSS_JC': CROSS_JC, 'CROSS_SC': CROSS_SC})

In [3]:
import QUANTAXIS as QA
import numpy as np
import talib
import pandas as pd
import scipy.signal as signal
import matplotlib.pyplot as plt
%matplotlib inline

In [4]:
# 定义MACD函数
def TA_MACD(prices, fastperiod=12, slowperiod=26, signalperiod=9):
    '''
    参数设置:
        fastperiod = 12
        slowperiod = 26
        signalperiod = 9

    返回: macd - dif, signal - dea, hist * 2 - bar, delta
    '''
    macd, signal, hist = talib.MACD(prices, 
                                    fastperiod=fastperiod, 
                                    slowperiod=slowperiod, 
                                    signalperiod=signalperiod)
    delta = np.r_[np.nan, np.diff(hist * 2)]
    return np.c_[macd, signal, hist * 2, delta]


# 定义MA函数
def TA_MA(prices, timeperiod=5):
    '''
    参数设置:
        timeperiod = 5

    返回: ma
    '''
    ma = talib.MA(prices, timeperiod=timeperiod)
    return ma


# 定义RSI函数
def TA_RSI(prices, timeperiod=12):
    '''
    参数设置:
        timeperiod = 12

    返回: ma
    '''
    rsi = talib.RSI(prices, timeperiod=timeperiod)
    delta = np.r_[np.nan, np.diff(rsi)]
    return np.c_[rsi, delta]


# 定义RSI函数
def TA_BBANDS(prices, timeperiod=5, nbdevup=2, nbdevdn=2, matype=0):
    '''
    参数设置:
        timeperiod = 5
        nbdevup = 2
        nbdevdn = 2

    返回: up, middle, low
    '''
    up, middle, low = talib.BBANDS(prices, timeperiod, nbdevup, nbdevdn, matype)
    ch = (up - low) / low
    delta = np.r_[np.nan, np.diff(ch)]

    ma30 = TA_MA(prices, timeperiod=30)
    boll_band_channel_padding = (ma30 - low) / low
    padding_delta = np.r_[np.nan, np.diff(boll_band_channel_padding)]
    ch_ma30 = talib.MA(ch, timeperiod=30)
    return np.c_[up, middle, low, ch, ch_ma30, delta, padding_delta]


def TA_KDJ(hight, low, close, fastk_period=9, slowk_matype=0, slowk_period=3, slowd_period=3):
    '''
    参数设置:
        fastk_period = 0
        lowk_matype = 0, 
        slowk_period = 3, 
        slowd_period = 3

    返回: K, D, J
    '''
    K, D = talib.STOCH(hight, low, close, fastk_period=fastk_period, slowk_matype=slowk_matype, slowk_period=slowk_period, slowd_period=slowd_period)
    J = 3 * K - 2 * D
    delta = np.r_[np.nan, np.diff(J)]
    return np.c_[K, D, J, delta]


def TA_CCI(high, low, close, timeperiod=14):
    """
    名称：平均趋向指数的趋向指数
    简介：使用ADXR指标，指标判断ADX趋势。
    """
    real = talib.CCI(high, low, close, timeperiod=14)
    delta = np.r_[np.nan, np.diff(real)]
    return np.c_[real, delta]
#获取全市场股票 list格式
code  = QA.QA_fetch_stock_list_adv().code.tolist()
codelist = QA.QA_fetch_stock_block_adv().get_block('沪深300').code
print(codelist[0:30])

# 获取全市场数据 QADataStruct格式
data1 =  QA.QA_fetch_stock_day_adv(code, '2019-10-01','2020-02-18')
data2 =  QA.QA_fetch_stock_day_adv(codelist, '2019-10-01','2020-02-18')

['000001', '000002', '000063', '000066', '000069', '000100', '000157', '000166', '000333', '000338', '000425', '000538', '000568', '000596', '000625', '000627', '000651', '000656', '000661', '000671', '000703', '000708', '000709', '000723', '000725', '000728', '000768', '000776', '000783', '000786']


In [5]:
# 写个指标 so easy
def ifup20(data):
    # QA内建指标计算 Python原生代码
    return (QA.MA(data.close, 5)-QA.MA(data.close, 20)).dropna() > 0

def ifup20_TA(data):
    # TA-lib计算
    return (TA_MA(data.close, 5)-TA_MA(data.close, 20)).dropna() > 0

# apply到 QADataStruct上

# 写个自定义指标 MAX_FACTOR QA内建指标计算 Python原生代码
def ifmaxfactor_greater(data):
    RSI = QA.QA_indicator_RSI(data)
    CCI = QA.QA_indicator_CCI(data)
    KDJ = QA.QA_indicator_KDJ(data)
    MAX_FACTOR = CCI['CCI'] + (RSI['RSI1'] - 50) * 4 + (KDJ['KDJ_J'] - 50) * 4
    MAX_FACTOR_delta = np.r_[np.nan, np.diff(MAX_FACTOR)]
    REGRESSION_BASELINE = (RSI['RSI1'] - 50) * 4
    return ((MAX_FACTOR+MAX_FACTOR_delta)-(REGRESSION_BASELINE-133)).dropna() > 0

# 写个自定义指标 MAX_FACTOR TA-lib计算
def ifmaxfactor_greater_TA(data):
    RSI = TA_RSI(data.close)
    CCI = TA_CCI(data.high, data.low, data.close)
    KDJ = TA_KDJ(data.high, data.low, data.close)    
    MAX_FACTOR = CCI[:,0] + (RSI[:,0] - 50) * 4 + (KDJ[:,2] - 50) * 4
    MAX_FACTOR_delta = np.r_[np.nan, np.diff(MAX_FACTOR)]
    REGRESSION_BASELINE = (RSI[:,0] - 50) * 4
    return pd.DataFrame(((MAX_FACTOR+MAX_FACTOR_delta)-(REGRESSION_BASELINE-133)), index=data.index).dropna() > 0

In [6]:
%timeit ind1 = data1.add_func(ifup20)

6.56 s ± 402 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
%timeit ind1 = data1.add_func(ifup20_TA)

4.48 s ± 44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
%timeit ind2 = data2.add_func(ifmaxfactor_greater)

7.59 s ± 84.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
%timeit ind2 = data2.add_func(ifmaxfactor_greater_TA)

1.24 s ± 36.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
#!/usr/bin/env python3

import datetime
import os

import pandas as pd

import QUANTAXIS as QA

# code_list = QA.QA_fetch_stock_list().index.tolist()
code_list = ["000001", "600000"]
current_date = str(datetime.date.today())
current_daily_bar = pd.DataFrame()
# 前置数据时间点
start_date = QA.QA_util_get_pre_trade_date(current_date, n=100)
# 前置数据准备
daily_data = QA.QA_fetch_stock_day_adv(
    code=code_list, start=start_date, end=current_date
).to_qfq()

# 方便计算，数据转换为相应透视数据
closes = daily_data.pivot("close")
highs = daily_data.pivot("high")
lows = daily_data.pivot("low")


def get_short_strength(
    highs: pd.DataFrame, lows: pd.DataFrame, closes: pd.DataFrame, period: int = 27
) -> pd.Series:
    """
    计算空方力量

    :param highs: high 透视数据
    :param closes: close 透视数据
    :param period: 周期
    """
    A = highs.apply(QA.HHV, N=period, axis=0)
    B = lows.apply(QA.LLV, N=period, axis=0)
    return 100 * (A.iloc[-1] - closes.ffill().iloc[-1]) / (A.iloc[-1] - B.iloc[-1])


def get_long_strength(
    highs: pd.DataFrame, lows: pd.DataFrame, closes: pd.DataFrame, period: int = 27
) -> pd.Series:
    """
    计算多方力量
    """
    A = highs.apply(QA.HHV, N=period, axis=0)
    B = lows.apply(QA.LLV, N=period, axis=0)
    C = ((closes - B) / (A - B)).apply(QA.QAIndicator.base.SMA, N=5, M=1, axis=0) * 100
    return (3 * C - 2 * C.apply(QA.QAIndicator.base.SMA, N=3, M=1)).iloc[-1]


def get_RSV(
    highs: pd.DataFrame, lows: pd.DataFrame, closes: pd.DataFrame, period: int = 30
) -> pd.DataFrame:
    """
    计算 RSV 值
    """
    return (
        100
        * (closes - lows.apply(QA.LLV, N=period, axis=0))
        / (highs.apply(QA.HHV, N=period, axis=0) - lows.apply(QA.LLV, N=period, axis=0))
    )


def get_KDJ(
    highs: pd.DataFrame,
    lows: pd.DataFrame,
    closes: pd.DataFrame,
    period: int = 30,
    M1: int = 3,
    M2: int = 3,
) -> dict:
    """
    获取 KDJ 值
    """
    RSV = get_RSV(highs, lows, closes, period)
    KDJ_K = RSV.apply(QA.QAIndicator.base.SMA, N=3, M=1, axis=0)
    KDJ_D = KDJ_K.apply(QA.QAIndicator.base.SMA, N=3, M=1, axis=0)
    KDJ_J = 3 * KDJ_K - 2 * KDJ_D
    return {"KDJ_K": KDJ_K, "KDJ_D": KDJ_D, "KDJ_J": KDJ_J}