<a href="https://colab.research.google.com/github/ociponan/streamlit/blob/main/stock_data_retriever.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import pandas_datareader as pdr
import datetime as dt
from dateutil.relativedelta import relativedelta
import yfinance as yf
import numpy as np
from pandas.tseries.frequencies import to_offset

In [2]:
def read_ticker_file(file="./tickers.csv"):
    tickers = pd.read_csv(file)
    tickers["現在値"] = tickers["現在値"].replace("-", None)
    tickers = tickers.dropna(subset=["現在値"], axis=0).reset_index(drop=True)
    tickers = tickers.loc[
        (tickers["市場"] == "東P") | (tickers["市場"] == "東S") |
        (tickers["市場"] == "東G") | (tickers["市場"] == "東ETF") |
        (tickers["市場"] == "東REIT") | (tickers["市場"] == "東優") |
        (tickers["市場"] == "東IF"), :
    ].reset_index(drop=True)
    return tickers

def _retrieve_data(code, start=None, end=None, src="yfinance"):
    if src == "yfinance":
        if f"{code}"[0].isdigit():
            ticker = f"{code}.T"
        else:
            ticker = f"{code}"
        stock = yf.Ticker(ticker)
        if start is None:
            data = stock.history(period="max").tz_localize(None)
        else:
            if end is None:
                data = stock.history(start=start, end=dt.date.today()).tz_localize(None)
            else:
                data = stock.history(start=start, end=end + relativedelta(days=1)).tz_localize(None)
        # data = yf.download(f"{ticker}.T", start=start, end=dt.date.today() + relativedelta(days=1)).tz_localize(None).sort_index()
    else:
        if f"{code}"[0].isdigit():
            ticker = f"{code}.JP"
        else:
            ticker = f"{code}"
        data = pdr.DataReader(ticker, data_source="stooq", start=start).sort_index()
    return data

def get_stock_price(code, start, end=dt.date.today()+dt.timedelta(days=1)):
    if code[0].isdigit():
        ticker = f"{code}.T"
    else:
        ticker = code
    stock = yf.Ticker(ticker)
    if start is not None:
        data = stock.history(start=start).tz_localize(None)
    else:
        data = stock.history(period="max").tz_localize(None)

    return data


def read_index_data(filename):
    df = pd.read_csv(filename)
    df["Date"] = pd.to_datetime(df["Date"])
    df.set_index("Date", inplace=True)
    return df


def get_resample(df, interval):
    if interval.lower() in ("w", "weekly"):
        rdf = df.resample("W").agg({
        "Open": "first", "High": "max", "Low": "min",
        "Close": "last", "Volume": "sum"
        })
        rdf.index = rdf.index - to_offset("2D")
    elif interval.lower() in ("m", "monthly"):
        rdf = df.resample("ME").agg({
        "Open": "first", "High": "max", "Low": "min",
        "Close": "last", "Volume": "sum"
        })
    elif interval.lower() in ("d", "daily"):
        rdf = df

    return rdf

In [3]:
interval = "Daily"
def _historical_high(ser, window):
    return ser.rolling(window=window).max()

def _historical_low(ser, window):
    return ser.rolling(window=window).min()

def _sma(ser, window=9):
    sma_series = ser.rolling(window=window).mean()
    return sma_series

def _rma(ser, period=9):
    """
    Calculate the Relative Moving Average (RMA) of a time series.

    Parameters:
    series (pd.Series): The input time series.
    period (int): The length of the RMA.

    Returns:
    pd.Series: The RMA of the input time series.
    """

    rma_series = ser.ewm(com=(period-1), min_periods=period).mean()
    return rma_series

def _ema(ser, span=9):
    ema_series = ser.ewm(span=span, min_periods=span, adjust=False).mean()
    return ema_series

def _multiple_mavs(df, ma_period):
    data = pd.DataFrame(index=df.index)
    ma_period = sorted(ma_period)
    for period in ma_period:
        col = f"ma{period}"
        data[col] = _sma(df["Close"], period)
    return data

def _mtt_signal(df, ma_period, lookback):
    data = _multiple_mavs(df, ma_period)
    mav = data.columns

    data["flg1"] = (df["Close"] > data[mav[1]]) & (df["Close"] > data[mav[2]])
    data["flg2"] = data[mav[1]] > data[mav[2]]
    data["flg3"] = data[mav[2]] > data[mav[2]].shift(lookback[0])
    data["flg4"] = (data[mav[0]] >= data[mav[1]]) & (data[mav[0]] > data[mav[2]])
    data["flg5"] = df["Close"] > data[mav[0]]
    data["flg6"] = df["Close"] > (1.25 * _historical_low(df["Low"], lookback[1]))
    data["flg7"] = df["Close"] < (1.25 * _historical_high(df["High"], lookback[1]))

    data["mtt_signal"] = np.where(
        data["flg1"] & data["flg2"] & data["flg3"] & data["flg4"] & data["flg5"] & data["flg6"] & data["flg7"],
        1, 0
    )
    return data.drop(mav, axis=1)

def _dorsey_rs(df, interval, numeraire="TOPIX"):
    global topix
    if numeraire.upper() in ("TPX", "TOPIX"):
        df_m = topix
    elif numeraire.upper() in ("NIKKEI", "N225", "NIKKEI225", "NKX", "^N225", "^NKX"):
        df_m = read_index_data("./n225.csv")

    if interval != "Daily":
        df_m = get_resample(df_m, interval)

    if len(df_m) > len(df):
        if type(df.index) is pd.core.indexes.datetimes.DatetimeIndex:
            df_m = df_m.loc[df.first_valid_index().date():]
        else:
            df_m = df_m.loc[min(df["Date"]).date():]
    return 100 * df.Close / df_m.Close


def _mansfield_rs(df, period, interval, numeraire="TOPIX"):
    """Calculate Mansfield Relative Strength
    period: 200 for daily, 52 for weekly, 12 for monthly
    https://x.gd/WWmfn
    """
    global topix
    if numeraire.upper() in ("TPX", "TOPIX"):
        df_m = topix
    elif numeraire.upper() in ("NIKKEI", "N225", "NIKKEI225", "NKX", "^N225", "^NKX"):
        df_m = read_index_data("./n225.csv")

    if interval != "Daily":
        df_m = get_resample(df_m, interval)

    d_rs = _dorsey_rs(df, interval, numeraire)
    window = min(len(df), period)
    return (d_rs / _sma(d_rs, window=window) - 1) * 100

def get_relative_performance(df, interval):
    length = len(df)
    if interval == "Daily":
        if length > 200:
            period = 200
        elif length > 100:
            period = length
        else:
            period = 200
    elif interval == "Weekly":
        period = 52
    elif interval == "Monthly":
        period = 12
    numeraire = "TOPIX"
    return _mansfield_rs(df, period, interval, numeraire)


def _relative_strength(df, q1, q2, q3, q4):
    close = df.Close
    return 100 * (0.4 * (close.pct_change(q1, fill_method=None))
         + 0.2 * (close.pct_change(q2, fill_method=None))
         + 0.2 * close.pct_change(q3, fill_method=None)
          + 0.2 * close.pct_change(q4, fill_method=None))


def get_relative_strength(df, interval):
    length = len(df)
    if interval == "Daily":
        if length > 252:
            params = (63, 126, 189, 252)
        elif length > 100:
            params = np.linspace(0, length-1, 5, dtype=int).tolist()[1:]
        else:
            params = (63, 126, 189, 252)
    elif (interval == "Daily") and (len(df) <= 252):
        params = np.linspace(0, len(df) - 1, 5, dtype=int)[1:]
    elif (interval == "Weekly") and (len(df) > 52):
        params = np.linspace(0, len(df) - 1, 5, dtype=int)[1:]
    elif (interval == "Weekly") and (len(df) <= 52):
        params = (13, 26, 39, 52)
    else:
        params = (3, 6, 9, 12)

    return _relative_strength(df, *params)


In [4]:
tickers =read_ticker_file("screener_result.csv")
tickers["コード"] = tickers["コード"].astype(str)
ticker_lst = tickers["コード"].to_list()

jpx = pd.read_excel("https://www.jpx.co.jp/markets/statistics-equities/misc/tvdivq0000001vg2-att/data_j.xls")
jpx["コード"] = jpx["コード"].astype(str)
jpx = jpx[["コード", "銘柄名", "市場・商品区分", "33業種区分", "17業種区分", "規模区分"]]

In [6]:
start=dt.date(2009, 1, 1)
topix = pdr.DataReader("^TPX", start=start, data_source="stooq").sort_index()
topix.to_csv("./data/topix.csv")

OSError: Cannot save file into a non-existent directory: 'data'