# yfinance 数据限制
## 每分钟 (1m) 数据：只能获取最近 30 天内的数据，并且每次请求只能获取 7 天的数据。

##  只能获取日间交易数据


In [None]:
pip install dash yfinance plotly dash-bootstrap-components pandas pandas_ta

In [None]:
import yfinance as yf
import pandas as pd
import datetime
from datetime import datetime, timedelta

class StockIndicators:
    def __init__(self, ticker, start, end, interval='1m'):
        """
        params:
        ticker (str): 股票代码
        start (str): 开始日期，格式为 'YYYY-MM-DD'
        end (str): 结束日期，格式为 'YYYY-MM-DD'
        interval (str): 数据时间间隔，默认为 '1d'。可选值有 '1m', '2m', '5m', '15m', '30m', '60m', '90m', '1h', '1d', '5d', '1wk', '1mo', '3mo'
        """
        self.ticker = ticker
        self.start = start
        self.end = end
        self.interval = interval
        self.data = self.get_data()

    # def get_data(self):
    #     data = yf.download(self.ticker, start=self.start, end=self.end, interval=self.interval)
    #     return data

    def get_data(self):
        start_date = datetime.strptime(self.start, '%Y-%m-%d')
        end_date = datetime.strptime(self.end, '%Y-%m-%d')
        all_data = []

        while start_date < end_date:
            temp_end_date = min(start_date + timedelta(days=7), end_date)
            temp_data = yf.download(self.ticker, start=start_date.strftime('%Y-%m-%d'), end=temp_end_date.strftime('%Y-%m-%d'), interval=self.interval)
            all_data.append(temp_data)
            start_date = temp_end_date

        data = pd.concat(all_data)
        print("Downloaded data:")
        print(data.head())
        return data

    def filter_trading_hours(self, data):
        data.index = pd.to_datetime(data.index).tz_localize(None)  
        trading_hours = data.between_time('09:30', '16:00')
        
        return trading_hours


    def calculate_volume_delta(self, period='1m'):
        trading_data = self.filter_trading_hours(self.data)

        if period != '1m':
            resampled_data = trading_data.resample(period).agg({'Volume': 'sum'}).dropna(how='all')
        else:
            resampled_data = trading_data
        
        resampled_data['Volume_Delta'] = resampled_data['Volume'].diff()
        resampled_data = resampled_data[resampled_data['Volume'] != 0]  # 过滤掉 Volume 为 0 的行
        resampled_data = resampled_data.reset_index()
        return resampled_data

    # 计算 n 天的收盘价差
    def calculate_n_days_diff(self, n):
        trading_data = self.filter_trading_hours(self.data)
        daily_data = trading_data.resample('1d').agg({'Close': 'last'}).dropna(how='all')

        daily_data[f'Close_{n}d_diff_forward'] = daily_data['Close'] - daily_data['Close'].shift(n)
        daily_data[f'Close_{n}d_diff_backward'] = daily_data['Close'].shift(-n) - daily_data['Close']
        
        return daily_data[[f'Close_{n}d_diff_forward', f'Close_{n}d_diff_backward']]
    
    # 计算 n 天涨跌百分比
    def calculate_price_change_percentage(self, period='1m'):
        trading_data = self.filter_trading_hours(self.data)

        if period != '1m':
            resampled_data = trading_data.resample(period).agg({'Close': 'last'}).dropna(how='all')
        else:
            resampled_data = trading_data

        resampled_data['Price_Change_Percentage'] = resampled_data['Close'].pct_change() * 100
        resampled_data = resampled_data.reset_index()
        return resampled_data
    
    # 计算 CR 指标
    def calculate_cr(self, period='1d', window=14):
        resampled_data = self.data.resample(period).agg({'High': 'max', 'Low': 'min', 'Close': 'last'}).dropna()

        # 确保至少有 'window' 个数据点用于计算 CR
        if len(resampled_data) < window:
            print("数据不足以计算 CR 指标")
            return pd.DataFrame(columns=['Datetime', 'CR'])

        resampled_data['Previous_Close'] = resampled_data['Close'].shift(1)
        resampled_data['H'] = resampled_data['High'] - resampled_data['Previous_Close']
        resampled_data['L'] = resampled_data['Previous_Close'] - resampled_data['Low']
        
        resampled_data['CR'] = resampled_data['H'].rolling(window=window).sum() / resampled_data['L'].rolling(window=window).sum()
        resampled_data.dropna(inplace=True)
        resampled_data.reset_index(inplace=True)
        
        return resampled_data[['Date', 'CR']]

    # 计算n时间（分、小时、天）成交量内最大值和最小值
    def calculate_volume_max_min(self, period='1m', window=14):
        trading_data = self.filter_trading_hours(self.data)
        print("Filtered trading hours data (first 5 rows):")
        print(trading_data.head())

        if period != '1m':
            resampled_data = trading_data.resample(period).agg({'Volume': 'sum'}).dropna(how='all')
        else:
            resampled_data = trading_data

        print(f"Resampled data ({period}):")
        print(resampled_data.head())

        resampled_data['Volume_Max'] = resampled_data['Volume'].rolling(window=window).max()
        resampled_data['Volume_Min'] = resampled_data['Volume'].rolling(window=window).min()
        resampled_data = resampled_data.dropna(subset=['Volume_Max', 'Volume_Min'])
        resampled_data = resampled_data.reset_index()
        return resampled_data[['Datetime', 'Volume_Max', 'Volume_Min']]

    # 计算 KDJ 指标
    def calculate_kdj(self, period=9):
        data = self.data.copy()
        data['Low_min'] = data['Low'].rolling(window=period).min()
        data['High_max'] = data['High'].rolling(window=period).max()
        data['RSV'] = (data['Close'] - data['Low_min']) / (data['High_max'] - data['Low_min']) * 100

        data['K'] = data['RSV'].ewm(alpha=1/3).mean()
        data['D'] = data['K'].ewm(alpha=1/3).mean()
        data['J'] = 3 * data['K'] - 2 * data['D']

        data.dropna(inplace=True)
        data.reset_index(inplace=True)
        return data[['Date', 'K', 'D', 'J']]
    
    #计算 SMA 指标
    def calculate_sma(self, period=14):
        data = self.filter_trading_hours(self.data)
        data['SMA'] = data['Close'].rolling(window=period).mean()
        data = data.reset_index()
        return data[['Datetime', 'Close', 'SMA']]

    #计算 MACD 指标
    def calculate_macd(self, short_period=12, long_period=26, signal_period=9):
        trading_data = self.filter_trading_hours(self.data)
        trading_data['EMA_short'] = trading_data['Close'].ewm(span=short_period, adjust=False).mean()
        trading_data['EMA_long'] = trading_data['Close'].ewm(span=long_period, adjust=False).mean()
        trading_data['MACD'] = trading_data['EMA_short'] - trading_data['EMA_long']
        trading_data['Signal'] = trading_data['MACD'].ewm(span=signal_period, adjust=False).mean()
        trading_data['MACD_Hist'] = trading_data['MACD'] - trading_data['Signal']
        
        return trading_data[['Close', 'MACD', 'Signal', 'MACD_Hist']].dropna().reset_index()

    #计算 BOLL 指标
    def calculate_boll(self, period=20, std_dev=2):
        trading_data = self.filter_trading_hours(self.data)
        trading_data['Middle_Band'] = trading_data['Close'].rolling(window=period).mean()
        trading_data['Upper_Band'] = trading_data['Middle_Band'] + (trading_data['Close'].rolling(window=period).std() * std_dev)
        trading_data['Lower_Band'] = trading_data['Middle_Band'] - (trading_data['Close'].rolling(window=period).std() * std_dev)
        
        return trading_data[['Close', 'Middle_Band', 'Upper_Band', 'Lower_Band']].dropna().reset_index()

    # 计算 RSI 指标
    def calculate_rsi(self, period=14):
        trading_data = self.filter_trading_hours(self.data)
        delta = trading_data['Close'].diff()
        gain = (delta.where(delta > 0, 0)).fillna(0)
        loss = (-delta.where(delta < 0, 0)).fillna(0)
        
        avg_gain = gain.rolling(window=period).mean()
        avg_loss = loss.rolling(window=period).mean()
        
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
        
        trading_data['RSI'] = rsi
        return trading_data[['Close', 'RSI']].dropna().reset_index()

    # 计算 W%R 指标
    def calculate_wr(self, period=14):
        trading_data = self.data
        high_max = trading_data['High'].rolling(window=period).max()
        low_min = trading_data['Low'].rolling(window=period).min()
        # 计算 %R
        wr = (high_max - trading_data['Close']) / (high_max - low_min) * -100
        trading_data['W%R'] = wr

        result = trading_data[['Close', 'W%R']].dropna().reset_index()
        result.rename(columns={'index': 'Date'}, inplace=True)
        return result
    
    #计算 CCI 指标
    def calculate_cci(self, period=20):
        trading_data = self.data
        tp = (trading_data['High'] + trading_data['Low'] + trading_data['Close']) / 3
        sma_tp = tp.rolling(window=period).mean()
        
        # 计算平均绝对偏差
        def mean_deviation(x):
            return (x - x.mean()).abs().mean()
        
        mean_deviation_tp = tp.rolling(window=period).apply(mean_deviation)
        cci = (tp - sma_tp) / (0.015 * mean_deviation_tp)
        trading_data['CCI'] = cci

        result = trading_data[['Close', 'CCI']].dropna().reset_index()
        result.rename(columns={'index': 'Date'}, inplace=True)
        return result

    #计算 ATR 指标（平均真实波幅）
    def calculate_atr(self, period=14):
        trading_data = self.data
        high_low = trading_data['High'] - trading_data['Low']
        high_close = (trading_data['High'] - trading_data['Close'].shift()).abs()
        low_close = (trading_data['Low'] - trading_data['Close'].shift()).abs()

        tr = high_low.to_frame('high_low').join(high_close.to_frame('high_close')).join(low_close.to_frame('low_close')).max(axis=1)
        atr = tr.rolling(window=period).mean()
        trading_data['ATR'] = atr

        result = trading_data[['Close', 'ATR']].dropna().reset_index()
        result.rename(columns={'index': 'Date'}, inplace=True)
        return result

    #计算 DMA 指标（DMA 是两条移动平均线的差）
    def calculate_dma(self, short_period=5, long_period=10, displacement=5):
        trading_data = self.data

        # 计算短期和长期移动平均线
        trading_data['SMA_Short'] = trading_data['Close'].rolling(window=short_period).mean()
        trading_data['SMA_Long'] = trading_data['Close'].rolling(window=long_period).mean()

      
        # 平移移动平均线
        trading_data['DMA_Short'] = trading_data['SMA_Short'].shift(displacement)
        trading_data['DMA_Long'] = trading_data['SMA_Long'].shift(displacement)

        result = trading_data[['Close', 'DMA_Short', 'DMA_Long']].dropna().reset_index()
        result.rename(columns={'index': 'Date'}, inplace=True)
        return result
     

In [None]:
# 调用
ticker = 'AAPL'
start = '2024-06-05'
end = '2024-07-01'
interval = '1d'  # 或 '1m', '1h', '1d' 等

short_period = 5  # 短期 DMA 的周期
long_period = 10  # 长期 DMA 的周期
displacement = 5  # 平移周期

indicators = StockIndicators(ticker, start, end, interval)

# 计算 DMA 指标
dma_indicators = indicators.calculate_dma(short_period=short_period, long_period=long_period, displacement=displacement)
print(f"{interval} 间隔:")
print(dma_indicators)

In [None]:
# 示例用法
ticker = 'AAPL'
start = '2024-07-01'
end = '2024-07-04'
interval = '1m'

indicators = StockIndicators(ticker, start, end, interval)

# 计算每天的 Volume 和 Volume Delta
volume_indicators = indicators.calculate_volume_delta(period='1d')
print('volume_indicators:', volume_indicators, '\n')

# 计算前后 1 天的收盘价差
n = 1
n_days_diff_data = indicators.calculate_n_days_diff(n)
print('n_days_diff_data:', n_days_diff_data, '\n')

# 计算每天的涨跌百分比
price_change_percentage = indicators.calculate_price_change_percentage(period='1d')
print('price_change_percentage:', price_change_percentage, '\n')

# 计算每小时的 Volume 和 Volume Delta
hourly_volume_indicators = indicators.calculate_volume_delta(period='1H')
print('hourly_volume_indicators:', hourly_volume_indicators, '\n')

# 计算每小时的涨跌百分比
hourly_price_change_percentage = indicators.calculate_price_change_percentage(period='1H')
print('hourly_price_change_percentage:', hourly_price_change_percentage, '\n')

# 计算每分钟的 Volume 和 Volume Delta
minute_volume_indicators = indicators.calculate_volume_delta(period='1m')
print('minute_volume_indicators:', minute_volume_indicators, '\n')

# 计算每分钟的涨跌百分比
minute_price_change_percentage = indicators.calculate_price_change_percentage(period='1m')
print('minute_price_change_percentage:', minute_price_change_percentage, '\n')


# 计算每天的 CR 指标
# interval = '1d' 
cr_indicators = indicators.calculate_cr(period='1d', window=10)
print(cr_indicators)

# 计算每分钟成交量的最大值和最小值
volume_max_min_indicators_min = indicators.calculate_volume_max_min(period='1m', window=14)
print("每分钟成交量的最大值和最小值:")
print(volume_max_min_indicators_min)

# 计算每小时成交量的最大值和最小值
volume_max_min_indicators_hour = indicators.calculate_volume_max_min(period='1H', window=14)
print("每小时成交量的最大值和最小值:")
print(volume_max_min_indicators_hour)

# 计算 KDJ 指标
# interval = '1d' 
kdj_indicators = indicators.calculate_kdj(period=9)
print("KDJ:")
print(kdj_indicators)

# 计算每小时的 SMA 指标
interval = '1H'
sma_indicators_hourly = indicators.calculate_sma(period=14)
print("计算每小时的 SMA:")
print(sma_indicators_hourly)

# 计算每天的 SMA 指标
interval = '1d'
sma_indicators_daily = indicators.calculate_sma(period=14)
print("计算每天的 SMA:")
print(sma_indicators_daily)

# 计算 MACD 指标
interval = '1m'  
## 自定义长短期 EMA 周期
short_period = 12  # 可以根据需要调整
long_period = 26  # 可以根据需要调整
signal_period = 9  # 可以根据需要调整
macd_indicators = indicators.calculate_macd(short_period=short_period, long_period=long_period, signal_period=signal_period)
print("MACD:")
print(macd_indicators)


# 计算 BOLL 指标
# 自定义 BOLL 周期和标准差倍数
interval = '1m'
boll_period = 20  # 可以根据需要调整
boll_std_dev = 2  # 可以根据需要调整
boll_indicators = indicators.calculate_boll(period=boll_period, std_dev=boll_std_dev)
print("BOLL:")
print(boll_indicators)

# 计算 RSI 指标
interval = '1m' 
rsi_period = 14  # RSI 的周期
indicators = StockIndicators(ticker, start, end, interval)
rsi_indicators = indicators.calculate_rsi(period=rsi_period)
print("RSI:")
print(rsi_indicators)

# 计算 W%R 指标
interval = '1d'  # 或 '1m', '1h', '1d' 等
wr_period = 5  # W%R 的周期
wr_indicators = indicators.calculate_wr(period=wr_period)
print("wr_indicators:")
print(wr_indicators)


# 计算 CCI 指标
interval = '1m'  # 或 '1m', '1h', '1d' 等
cci_period = 20  # CCI 的周期
indicators = StockIndicators(ticker, start, end, interval)
cci_indicators = indicators.calculate_cci(period=cci_period)
print("CCI:")
print(cci_indicators)


# 计算 ATR 指标
interval = '1d'  
atr_period = 14  # ATR 的周期
atr_indicators = indicators.calculate_atr(period=atr_period)
print("ATR：")
print(atr_indicators)



# 计算 DMA 指标
interval = '1d'  # 或 '1m', '1h', '1d' 等
short_period = 5  # 短期 DMA 的周期
long_period = 10  # 长期 DMA 的周期
displacement = 5  # 平移周期
dma_indicators = indicators.calculate_dma(short_period=short_period, long_period=long_period, displacement=displacement)
print(f"{interval} 间隔:")
print(dma_indicators)

