In [2]:
import pandas as pd
import numpy as np
from typing import List

In [3]:
pd.set_option('mode.chained_assignment',  None) 
pd.options.display.float_format = '{:.3f}'.format

소수점 표시를 위한 출력 설정 변경

# 데이터 불러오기

In [4]:
data = []
names = ['bitcoin.csv', 'repl.csv', 'etherium.csv']

for name in names:
    data.append(pd.read_csv(name))

# 결과 데이터를 저장할 데이터 프레임 생성

In [5]:
res = [] # 결과 데이터 저장

for ticker_data in data:
    res.append(ticker_data[['close', 'volume']])

res는 순서대로 bitcoin, reple, etherium의 기술지표를 저장할 리스트

In [6]:
time = data[0][['time']] # 시간을 저장할 데이터, 시간 데이터는 별도의 독립변수로 사용되지 않을 예정

In [7]:
res[0]

Unnamed: 0,close,volume
0,2996000.000,11139.871
1,3073000.000,8279.465
2,3119000.000,10731.677
3,3096000.000,9899.323
4,3083000.000,9472.610
...,...,...
4016,22320000.000,1666.680
4017,22403000.000,1587.818
4018,23004000.000,1675.061
4019,22790000.000,1505.955


In [8]:
class Indicator(object):
    def __init__(self,
                 data: List[pd.DataFrame],
                 res: List[pd.DataFrame]):
        self.data = data
        self.res = res
    
    def get_log_scale(self) -> None:
        self.res[i].loc[:, 'close_log'] = np.log(self.data[i]['close'])
    
    def get_nvi(self) -> None:
        def nvi(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            nvi = [0] * len(coin)
            nvi[0] = 1
            close = coin['close'].copy()
            volume = coin['volume'].copy()
            
            for i in range(1, len(coin)):
                if volume[i] < volume[i-1]:
                    nvi[i] = nvi[i-1] + (close[i] - close[i-1]) * \
                            nvi[i-1] / close[i-1]
                else:
                    nvi[i] = nvi[i-1]
            res['nvi'] = nvi
            return res
        
        for i in range(3):
            self.res[i] = nvi(self.res[i], self.data[i])
            
    def get_pvi(self) -> None:
        def pvi(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            pvi = [0] * len(coin)
            pvi[0] = 100
            volume = coin['volume'].copy()
            
            for i in range(1, len(coin)):
                if volume[i] > volume[i-1]:
                    pvi[i] = pvi[i-1] + (close[i] - close[i-1] / \
                                close[i-1] * pvi[i-1])
                else:
                    pvi[i] = pvi[i-1]
            res.loc[:, 'pvi'] = pvi
            return res
        
        for i in range(3):
            self.res[i] = pvi(self.res[i], self.data[i]) 
    
    def get_ma(self) -> None:
        def ma(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            days = [5, 10, 20, 60]
            close = coin['close']
            ma = pd.DataFrame()

            for day in days:
                ma.loc[:, day] = close.rolling(day).mean()
            ma.rename({day: 'ma_' + str(day) for day in days}, axis=1)
            res = pd.concat([self.res, ma], axis=1)
            
            return res
        
        for i in range(3):
            self.res[i] = ma(self.res[i], self.data[i])
        
    def get_rsi(self, m_N: int=14) -> None:
        def rsi(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            close = coin['close']
            
            U = np.where(close.diff(1) > 0, coin.diff(1), 0)
            D = np.where(close.diff(1) < 0, coin.diff(1) * (-1), 0)

            AU = pd.DataFrame(U).rolling(window=m_N, min_periods=m_N).mean()
            AD = pd.DataFrame(D).rolling(window=m_N, min_periods=m_N).mean()

            rsi = AU.div(AD+AU) * 100

            self.res['rsi'] = rsi[0]
            
            return res
        
        for i in range(3):
            self.res[i] = rsi(self.res[i], self.data[i])
    
    def get_vpt(self) -> None:
        def vpt(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            vpt_list = [0] * len(self.coin)
            vpt_list[-1] = 2402.1359 # 최신 날짜 기준의 vpt
            volume = coin['volume']
            close = coin['close']

            for i in range(len(vpt_list) - 1, 0, -1):
                vpt_list[i-1] = vpt_list[i] - volume.iloc[i] * \
                                (close.iloc[i] - close.iloc[i-1]) / \
                                close.iloc[i-1]
            res.loc[:, 'vpt'] = vpt_list
            
            return res
            
        for i in range(3):
            self.res[i] = vpt(self.res[i], self.data[i])
            
    def get_obv(self) -> None:
        def obv(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            obv_list = [0] * len(coin)
            obv_list[0] = 68734.4525
            coin.loc[0, 'obv'] = 68734.4525
            volume = coin['volume']
            close = coin['close']
            
            for i in range(1, len(coin)):
                if close.iloc[i] > close.iloc[i-1]:
                    obv_list[i] = obv_list[i-1] + volume.iloc[i]
                elif close.iloc[i] == close.iloc[i-1]:
                    obv_list[i] = obv_list[i-1]
                else:
                    obv_list[i] = obv_list[i-1] - volume.iloc[i]
            res.loc[:, 'obv'] = obv_list
            
            return res
        
        for i in range(3):
            self.res[i] = obv(self.res[i], self.data[i])
            
    def get_std(self) -> None:
        def std(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            days = [5, 10, 20, 60]
            close = coin['close']
            deviation = pd.DataFrame()

            for day in days:
                deviation.loc[:, day] = coin['close'].rolling(day).mean()
            deviation.rename({day: 'std_' + str(day)}, axis=1, inplace=True)

            res = pd.concat([self.res, deviation], axis=1)
            
            return res
        
        for i in range(3):
            self.res[i] = std(self.res[i], self.data[i])
        
    def get_mfi(self) -> None:
        def mfi(res: pd.DataFrame, coin: pd.DataFrame,
               period: int=14) -> pd.DataFrame:
            close = coin['close']
            high = coin['high']
            low = coin['low']
            volume = coin['volume']
            
            typical_price = (close + high + low) / 3
            money_flow = typical_price * volume
            positive_flow = []
            negative_flow = []
            
            for i in range(1, len(typical_price)):
                if typical_price[i] > typical_price[i-1]:
                    positive_flow.append(money_flow[i-1])
                    negative_flow.append(0)
                elif typical_price[i] < typical_price[i-1]:
                    positive_flow.append(0)
                    negative_flow.append(money_flow[i-1])
                else:
                    positive_flow.append(0)
                    negative_flow.append(0)
            
            positive_mf = []
            negative_mf = []
            
            for i in range(period-1, len(positive_flow)):
                positive_mf.append(sum(positive_flow[i+1-period:i+1]))
            
            for i in range(period-1, len(negative_flow)):
                negative_mf.append(sum(negative_flow[i+1-period:i+1]))
                
            mfi = 100 * (np.array(positive_mf) / \
                        (np.array(positive_mf) + np.array(negative_mf)))
            res.loc[:, 'mfi'] = np.r_['0, 1', np.full(period, np.nan), mfi]
            
            return res
        
        for i in range(3):
            self.res[i] = mfi(self.res[i], self.data[i])
            
    def get_ema(self) -> None:
        def ema(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            days = [5, 10, 20, 60]
            close = coin['close']
            
            for day in days:
                ema = close.ewm(span=day, adjust=False).mean()
                res.loc[:, 'ema_' + str(day)] = ema
            return res
        
        for i in range(3):
            self.res[i] = ema(self.res[i], self.data[i])
            
    def get_fi(self) -> None:
        def fi(res: pd.DataFrame, coin: pd.DataFrame,
              period: int=14) -> pd.DataFrame:
            close = coin['close']
            volume = coin['volume']
            
            fi = pd.Series(close.diff(period) * volume, name='fi')
            res.loc[:, 'fi'] = fi
            
            return res
        
        for i in range(3):
            self.res[i] = fi(self.res[i], self.data[i])
            
    def get_bb(self) -> None:
        def bb(res: pd.DataFrame, coin: pd.DataFrame) -> pd.DataFrame:
            x = coin['close']
            pass

In [65]:
indicator = Indicator(data, res)

In [66]:
indicator.get_nvi()

# NVI
$$
NVI = NVI_{prev} + \frac{close - close_{prev}} {close_prev} \times NVI_{prev}
$$

# PVI
$$
PVI = PVI_{prev} + \frac{close}{close_prev} \times 100
$$