<a href="https://colab.research.google.com/github/shelan-de-livera/finalprojecttestrepo/blob/main/test_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install ta

Collecting ta
  Downloading ta-0.11.0.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ta
  Building wheel for ta (setup.py) ... [?25l[?25hdone
  Created wheel for ta: filename=ta-0.11.0-py3-none-any.whl size=29412 sha256=a233584d3d327c30289ddd6a0b6ff1a508b3c30043c1822398d9bc0befd949d4
  Stored in directory: /root/.cache/pip/wheels/5f/67/4f/8a9f252836e053e532c6587a3230bc72a4deb16b03a829610b
Successfully built ta
Installing collected packages: ta
Successfully installed ta-0.11.0


In [2]:
! pip install pytrends

Collecting pytrends
  Downloading pytrends-4.9.2-py3-none-any.whl (15 kB)
Installing collected packages: pytrends
Successfully installed pytrends-4.9.2


# database.entities.crypto - crypto.py

In [3]:
class Crypto:
    def __init__(self, symbol: str, name: str, start_year: int):
        self._symbol = symbol
        self._name = name
        self._start_year = start_year

    @property
    def symbol(self) -> str:
        return self._symbol

    @property
    def name(self) -> str:
        return self._name

    @property
    def start_year(self) -> int:
        return self._start_year



# database.preprocessing.preprocessing - preprocessing.py

In [4]:
import pandas as pd
from abc import ABC, abstractmethod


class DatasetPreprocessing(ABC):
    @abstractmethod
    def preprocess(self, dataset_df: pd.DataFrame) -> pd.DataFrame:
        pass


# database.preprocessing.gtrends.gtrends - gtrends.py

In [5]:
import warnings
import numpy as np
import pandas as pd
# from database.preprocessing.preprocessing import DatasetPreprocessing


class GoogleTrendsPreprocessing(DatasetPreprocessing):
    def __init__(
            self,
            impute_missing_scores: bool = False,
            imputing_percentage_threshold: float = 0.1
    ):
        assert 0.0 < imputing_percentage_threshold < 1.0, \
            'AssertionError: imputing_percentage_threshold is expected to be a float value between [0.0, 1.0], ' \
            f'got: {imputing_percentage_threshold}'

        self._impute_missing_scores = impute_missing_scores
        self._imputing_percentage_threshold = imputing_percentage_threshold

    def _impute_missing_trend_scores(self, trend_scores: pd.Series) -> pd.Series:
        missing_scores_percentage = trend_scores.isna().mean()

        if missing_scores_percentage < self._imputing_percentage_threshold:
            trend_scores[trend_scores == 0] = np.nan
            trend_scores.interpolate(method='polynomial', order=5, inplace=True)
            trend_scores[trend_scores == np.nan] = 0
        else:
            warnings.warn('Expected missing scores percentage to be less than '
                          f'{self._imputing_percentage_threshold}, got {missing_scores_percentage}%. '
                          f'Imputation process is skipped')
        return trend_scores

    def preprocess(self, trends_df: pd.DataFrame) -> pd.DataFrame:
        trends_df.rename(columns={trends_df.columns[1]: 'trends'}, inplace=True)

        if self._impute_missing_scores:
            trends_df['trends'] = self._impute_missing_trend_scores(trend_scores=trends_df['trends'])
        return trends_df


# database.preprocessing.coinapi.ohlcv - ohlcv.py

In [6]:
import numpy as np
import pandas as pd
# from database.preprocessing.preprocessing import DatasetPreprocessing


class OHLCVPreprocessing(DatasetPreprocessing):
    def __init__(self):
        super().__init__()

    @staticmethod
    def _preprocess_ohlcv_columns(ohlcv_df: pd.DataFrame):
        ohlcv_df.drop(columns=['time_period_start', 'time_open', 'time_close'], inplace=True)
        ohlcv_df.rename(columns={
            'time_period_end': 'date',
            'price_open': 'open',
            'price_high': 'high',
            'price_low': 'low',
            'price_close': 'close',
            'volume_traded': 'volume',
            'trades_count': 'trades'
        }, inplace=True)
        ohlcv_df['date'] = ohlcv_df['date'].apply(lambda date: date.split('.')[0].replace('T', ' '))
        ohlcv_df['hour'] = ohlcv_df['date'].apply(lambda date: int(date.split(' ')[1].split(':')[0]))
        return ohlcv_df

    @staticmethod
    def _append_ohlcv_log_returns_to_df(ohlcv_df: pd.DataFrame) -> pd.DataFrame:
        ohlcv_df['open_log_returns'] = np.log(ohlcv_df['open']).diff()
        ohlcv_df['high_log_returns'] = np.log(ohlcv_df['high']).diff()
        ohlcv_df['low_log_returns'] = np.log(ohlcv_df['low']).diff()
        ohlcv_df['close_log_returns'] = np.log(ohlcv_df['close']).diff()
        ohlcv_df['volume_log_returns'] = np.log(ohlcv_df['volume']).diff()
        ohlcv_df['trades_log_returns'] = np.log(ohlcv_df['trades']).diff()
        return ohlcv_df

    def preprocess(self, ohlcv_df: pd.DataFrame) -> pd.DataFrame:
        ohlcv_df = self._preprocess_ohlcv_columns(ohlcv_df=ohlcv_df)
        ohlcv_df = self._append_ohlcv_log_returns_to_df(ohlcv_df=ohlcv_df)
        return ohlcv_df


# database.preprocessing.ta.ta - ta.py

In [7]:
import pandas as pd
# from database.preprocessing.preprocessing import DatasetPreprocessing


class TechnicalAnalysisPreprocessing(DatasetPreprocessing):
    def __init__(self, closes: pd.Series):
        self._closes = closes

    def preprocess(self, ta_df: pd.DataFrame) -> pd.DataFrame:
        ta_df_columns = set(ta_df.columns)

        if 'dema' in ta_df_columns:
            ta_df['close_dema'] = self._closes - ta_df['dema']
        if 'vwap' in ta_df_columns:
            ta_df['close_vwap'] = self._closes - ta_df['vwap']
        if 'bband_up' and 'bband_down' in ta_df_columns:
            ta_df['bband_up_close'] = ta_df['bband_up'] - self._closes
            ta_df['close_bband_down'] = self._closes - ta_df['bband_down']
        if 'adl' in ta_df_columns:
            ta_df['adl_diffs'] = ta_df['adl'].diff()
        if 'obv' in ta_df_columns:
            ta_df['obv_diffs'] = ta_df['obv'].diff()
        return ta_df


# analysis.technical.indicators.indicator - indicator.py

In [8]:
import pandas as pd
from abc import ABC, abstractmethod


class TechnicalIndicator(ABC):
    def __init__(self, name: str or tuple[str, str]):
        self._name = name

    @property
    def name(self) -> str or tuple[str, str]:
        return self._name

    def __hash__(self) -> int:
        return hash(self.name)

    def __call__(self, **kwargs) -> pd.Series or tuple[pd.Series, pd.Series]:
        return self.compute_indicator_values(**kwargs)

    def compute_indicator_values(self, **kwargs) -> pd.Series or tuple[pd.Series, pd.Series]:
        indicator_values = self._compute_indicator_values(**kwargs)

        assert (isinstance(self.name, str) and isinstance(indicator_values, pd.Series)) or \
               (isinstance(self.name, tuple) and isinstance(indicator_values, tuple)), \
            'AssertionError Indicator names does not match indicator values'

        return indicator_values

    @abstractmethod
    def _compute_indicator_values(self, **kwargs) -> pd.Series or tuple[pd.Series, pd.Series]:
        pass


# analysis.technical.indicators.adl - ADL.py

In [9]:
import pandas as pd
from ta.volume import AccDistIndexIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class ADL(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='adl')

    def _compute_indicator_values(
            self, highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            volumes: pd.Series
    ) -> pd.Series:
        return AccDistIndexIndicator(
            high=highs,
            low=lows,
            close=closes,
            volume=volumes
        ).acc_dist_index()


# analysis.technical.indicators.adx - ADX.py

In [10]:
import pandas as pd
from ta.trend import ADXIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class ADX(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='adx')

    def _compute_indicator_values(
            self,
            highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            window: int = 14
    ) -> pd.Series:
        return ADXIndicator(high=highs, low=lows, close=closes, window=window).adx()


# analysis.technical.indicators.aroons - AROONS.py

In [11]:
import pandas as pd
from ta.trend import AroonIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class AROONS(TechnicalIndicator):
    def __init__(self):
        super().__init__(name=('aroon_up', 'aroon_down'))

    def _compute_indicator_values(self, closes: pd.Series, window: int = 25) -> tuple[pd.Series, pd.Series]:
        aroon_values = AroonIndicator(close=closes, window=window)
        return aroon_values.aroon_up(), aroon_values.aroon_down()


# analysis.technical.indicators.bbands - BBANDS.py

In [12]:
import pandas as pd
from ta.volatility import BollingerBands
# from analysis.technical.indicators.indicator import TechnicalIndicator


class BBANDS(TechnicalIndicator):
    def __init__(self):
        super().__init__(name=('bband_up', 'bband_down'))

    def _compute_indicator_values(
            self, closes: pd.Series,
            window: int = 20,
            window_deviation: int = 2
    ) -> tuple[pd.Series, pd.Series]:
        bbands_values = BollingerBands(close=closes, window=window, window_dev=window_deviation)
        return bbands_values.bollinger_hband(), bbands_values.bollinger_lband()


# analysis.technical.indicators.cci - CCI.py

In [13]:
import pandas as pd
from ta.trend import CCIIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class CCI(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='cci')

    def _compute_indicator_values(
            self,
            highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            window: int = 20
    ) -> pd.Series:
        return CCIIndicator(
            high=highs,
            low=lows,
            close=closes,
            window=window
        ).cci()


# analysis.technical.indicators.dema - DEMA.py

In [14]:
import pandas as pd
from ta.trend import EMAIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class DEMA(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='dema')

    def _compute_indicator_values(self, closes: pd.Series, window: int = 15) -> pd.Series:
        ema = EMAIndicator(close=closes, window=window).ema_indicator()
        ema_of_ema = EMAIndicator(close=ema, window=window).ema_indicator()
        return 2*ema - ema_of_ema


# analysis.technical.indicators.ema - EMA.py

In [15]:
import pandas as pd
from ta.trend import EMAIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class EMA(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='ema')

    def _compute_indicator_values(self, closes: pd.Series, window: int = 14) -> pd.Series:
        return EMAIndicator(close=closes, window=window).ema_indicator()


# analysis.technical.indicators.macd - MACDSignalDiffs.py

In [16]:
import pandas as pd
from ta.trend import MACD
# from analysis.technical.indicators.indicator import TechnicalIndicator


class MACDSignalDiffs(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='macd_signal_diffs')

    def _compute_indicator_values(
            self, closes: pd.Series,
            short_window: int = 12,
            long_window: int = 26,
            signal_period=9
    ) -> pd.Series:
        return MACD(
            close=closes,
            window_slow=long_window,
            window_fast=short_window,
            window_sign=signal_period
        ).macd_diff()


# analysis.technical.indicators.obv - OBV.py


In [17]:
import pandas as pd
from ta.volume import OnBalanceVolumeIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class OBV(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='obv')

    def _compute_indicator_values(self, closes: pd.Series, volumes: pd.Series) -> pd.Series:
        return OnBalanceVolumeIndicator(
            close=closes,
            volume=volumes
        ).on_balance_volume()


# analysis.technical.indicators.rsi - RSI.py

In [18]:
import pandas as pd
from ta.momentum import RSIIndicator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class RSI(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='rsi')

    def _compute_indicator_values(self, closes: pd.Series, window: int = 14) -> pd.Series:
        return RSIIndicator(close=closes, window=window).rsi()


# analysis.technical.indicators.stoch - STOCH.py

In [19]:
import pandas as pd
from ta.momentum import StochasticOscillator
# from analysis.technical.indicators.indicator import TechnicalIndicator


class STOCH(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='stoch')

    def _compute_indicator_values(
            self,
            highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            window: int = 14
    ) -> pd.Series:
        return StochasticOscillator(high=highs, low=lows, close=closes, window=window).stoch()


# analysis.technical.indicators.vwap - VWAP.py

In [20]:
import pandas as pd
from ta.volume import VolumeWeightedAveragePrice
# from analysis.technical.indicators.indicator import TechnicalIndicator


class VWAP(TechnicalIndicator):
    def __init__(self):
        super().__init__(name='vwap')

    def _compute_indicator_values(
            self,
            highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            volumes: pd.Series,
            window: int = 10
    ) -> pd.Series:
        return VolumeWeightedAveragePrice(
            high=highs,
            low=lows,
            close=closes,
            volume=volumes,
            window=window
        ).volume_weighted_average_price()


# analysis.technical.technical - technical.py

In [21]:
import pandas as pd
# from analysis.technical.indicators.indicator import TechnicalIndicator


class TechnicalAnalysis:
    def __init__(self, dates: pd.Series):
        self._dates = dates

    def compute_technical_indicators(self, ta_config_dict: dict[TechnicalIndicator, dict]) -> pd.DataFrame:
        ta = {'date': self._dates}

        for indicator, params in ta_config_dict.items():
            indicator_name = indicator.name
            indicator_values = indicator.compute_indicator_values(**params)

            if isinstance(indicator_name, str):
                ta[indicator_name] = indicator_values
            else:
                for name, values in zip(indicator_name, indicator_values):
                    ta[name] = values
        return pd.DataFrame(data=ta)


# analysis.technical.configs.config - config.py

In [22]:
import pandas as pd
from abc import ABC, abstractmethod
# from analysis.technical.indicators.indicator import TechnicalIndicator


class TAConfig(ABC):
    @abstractmethod
    def get_technical_analysis_config_dict(
            self,
            opens: pd.Series,
            highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            volumes: pd.Series
    ) -> dict[TechnicalIndicator, dict]:
        pass


# analysis.technical.configs.standard - standard.py

In [23]:
import pandas as pd
# from analysis.technical.configs.config import TAConfig
# from analysis.technical.indicators.indicator import TechnicalIndicator
# from analysis.technical.indicators.dema import DEMA
# from analysis.technical.indicators.vwap import VWAP
# from analysis.technical.indicators.macd import MACDSignalDiffs
# from analysis.technical.indicators.rsi import RSI
# from analysis.technical.indicators.stoch import STOCH
# from analysis.technical.indicators.cci import CCI
# from analysis.technical.indicators.adx import ADX
# from analysis.technical.indicators.aroons import AROONS
# from analysis.technical.indicators.bbands import BBANDS
# from analysis.technical.indicators.adl import ADL
# from analysis.technical.indicators.obv import OBV


class StandardTAConfig(TAConfig):
    def __init__(
            self,
            dema_window: int = 15,
            vwap_window: int = 10,
            macd_short_window: int = 12,
            macd_long_window: int = 26,
            macd_signal_period: int = 9,
            rsi_window: int = 14,
            stoch_window: int = 14,
            cci_window: int = 20,
            adx_window: int = 14,
            aroons_window: int = 25,
            bbands_window: int = 20
    ):
        super().__init__()

        self._dema_window = dema_window
        self._vwap_window = vwap_window
        self._macd_short_window = macd_short_window
        self._macd_long_window = macd_long_window
        self._macd_signal_period = macd_signal_period
        self._rsi_window = rsi_window
        self._stoch_window = stoch_window
        self._cci_window = cci_window
        self._adx_window = adx_window
        self._aroons_window = aroons_window
        self._bbands_window = bbands_window

    def get_technical_analysis_config_dict(
            self,
            opens: pd.Series,
            highs: pd.Series,
            lows: pd.Series,
            closes: pd.Series,
            volumes: pd.Series
    ) -> dict[TechnicalIndicator, dict]:
        return {
            DEMA(): {'closes': closes, 'window': self._dema_window},
            VWAP(): {
                'highs': highs,
                'lows': lows,
                'closes': closes,
                'volumes': volumes,
                'window': self._vwap_window
            },
            MACDSignalDiffs(): {
                'closes': closes,
                'short_window': self._macd_short_window,
                'long_window': self._macd_long_window,
                'signal_period': self._macd_signal_period
            },
            RSI(): {'closes': closes, 'window': self._rsi_window},
            STOCH(): {'highs': highs, 'lows': lows, 'closes': closes, 'window': self._stoch_window},
            CCI(): {'highs': highs, 'lows': lows, 'closes': closes, 'window': self._cci_window},
            ADX(): {'highs': highs, 'lows': lows, 'closes': closes, 'window': self._adx_window},
            AROONS(): {'closes': closes, 'window': self._aroons_window},
            BBANDS(): {'closes': closes, 'window': self._bbands_window},
            ADL(): {'highs': highs, 'lows': lows, 'closes': closes, 'volumes': volumes},
            OBV(): {'closes': closes, 'volumes': volumes}
        }


# database.network.network - network.py

In [24]:
import pandas as pd
# from database.entities.crypto import Crypto
from abc import ABC, abstractmethod


class DatasetDownloader(ABC):
    def __init__(self, date_column_name: str, verbose: bool):
        self._date_column_name = date_column_name
        self._verbose = verbose

    @property
    def date_column_name(self) -> str:
        return self._date_column_name

    @property
    def verbose(self) -> bool:
        return self._verbose

    def _store_dataset(self, dataset_df: pd.DataFrame, filepath: str, columns: list or None = None):
        assert not dataset_df.duplicated(subset=self.date_column_name).any(), \
            f'AssertionError: Date column is expected to be unique, got duplicates'

        assert dataset_df[self.date_column_name].is_monotonic_increasing, \
            f'AssertionError: Date column is expected to be monotonic and increasing'

        dataset_df.to_csv(filepath, columns=columns, index=False)

    @abstractmethod
    def download_historical_data(self, crypto: Crypto, history_filepath: str) -> bool:
        pass

    @abstractmethod
    def update_historical_data(self, crypto: Crypto, history_filepath: str) -> bool:
        pass


# database.network.coinapi.coinapi - coinapi.py

In [25]:
import requests
from abc import ABC, abstractmethod
from urllib.parse import urlencode
# from database.network.network import DatasetDownloader


class CoinAPIDownloader(DatasetDownloader, ABC):
    def __init__(self, verbose: bool):
        super().__init__(date_column_name=self._get_date_column_name(), verbose=verbose)

        self._api_key_list = [
            '70E10174-E29D-449F-9F2E-6E8362931DD9',
            '27E5E40C-7A6B-45EB-A5C8-8311B049A741',
            '8F6252DE-0AD7-478F-91C7-141141E8BE8B',
            '3B49210E-100B-4F8D-9011-2BA5D38274BA',
            'BF6BF46F-B44B-416E-9656-2D2AAFBC058B',
            'B21A98A2-C953-4C73-84CF-CFFB6F712200',
            '51667E99-7686-4496-B23D-6DA54F7E37AE',
            '0921F87B-BF55-4B78-B8B0-E023B4D7A2E2',
            '3F9E3251-029C-457A-9ADA-7F21A440AAF9',
            '41EBEA2D-1A4B-4654-8A41-186639B9AB9F',
            '6B93AEC2-910C-4064-80FB-91AED487AB97',
            '83049379-23DE-4CB0-8299-7137BB836D48',
            'B08FCA1F-F454-4C34-AC01-42F16354BCBC',
            '12E5D72C-25A6-4ED6-8384-7C291EC43768',
            '4F287859-5A00-47EF-AC91-8A2629F8C6A1',
            '3744F705-2C4A-406C-AA96-EB1B557A84EF',
            '3F77D500-457E-4A96-9CE1-1DEF3FC7033B',
            '455C2228-0D6F-4B62-8336-4BAA24C1A46E',
            '7E37E058-670C-4ED6-B7BE-DC00F309D9FF',
            '0F517C3D-162C-4C5E-AE18-544B201C9BC0'
        ]

    @property
    def api_key_list(self) -> list[str]:
        return self._api_key_list

    @abstractmethod
    def _get_date_column_name(self) -> str:
        pass

    @abstractmethod
    def _get_request_params(self) -> dict[str, str]:
        pass

    @staticmethod
    def _encode_request_url(base_url: str, request_params: dict, api_key: str) -> str:
        request_params['apikey'] = api_key
        encoded_params = urlencode(request_params)
        return f'{base_url}?{encoded_params}'

    def _get_response(self, base_url: str, request_params: dict) -> requests.Response or None:
        for api_key in self._api_key_list:
            if self._verbose:
                print(f'Using apikey: {api_key}')

            encoded_request_url = self._encode_request_url(
                base_url=base_url,
                request_params=request_params,
                api_key=api_key
            )
            response = requests.get(encoded_request_url)

            if self._verbose:
                print(f'Response Status: {response.status_code} - {response.reason}')

            if response.status_code == 200:
                return response
        return None


# database.network.coinapi.ohlcv - ohlcv.py

In [26]:
import io
import pandas as pd
from enum import Enum
# from database.entities.crypto import Crypto
# from database.network.coinapi.coinapi import CoinAPIDownloader


class OHLCVDownloader(CoinAPIDownloader):
    class HistoricalFrequency(Enum):
        MINUTE = '1MIN'
        HOUR = '1HRS'

    def __init__(self, historical_frequency: HistoricalFrequency or str, verbose: bool):
        if isinstance(historical_frequency, str):
            if historical_frequency == '1HRS':
                self._historical_frequency = self.HistoricalFrequency.HOUR
            elif historical_frequency == '1MIN':
                self._historical_frequency = self.HistoricalFrequency.MINUTE
            else:
                raise NotImplementedError(f'"{historical_frequency}" frequency has not been implemented yet')
        else:
            self._historical_frequency = historical_frequency

        super().__init__(verbose=verbose)

        self._history_request_url = 'https://rest.coinapi.io/v1/ohlcv/{}/USD/history'
        self._latest_request_url = 'https://rest.coinapi.io/v1/ohlcv/{}/USD/latest'
        self._download_limit = 100000
        self._update_limit = 1000

    def _get_date_column_name(self) -> str:
        return 'time_period_end'

    def _get_request_params(self) -> dict:
        return {
            'period_id': self._historical_frequency.value,
            'output_format': 'csv',
            'csv_set_delimiter': ',',
            'time_start': '{}-{}-{}T00:00:00',
            'limit': '{}'
        }

    def download_historical_data(self, crypto: Crypto, history_filepath: str) -> bool:
        if self.verbose:
            print(f'Downloading {crypto.name} market history data for {crypto.start_year}')

        request_params = self._get_request_params()
        request_params['time_start'] = request_params['time_start'].format(crypto.start_year, '01', '01')
        request_params['limit'] = request_params['limit'].format(self._download_limit)
        base_url = self._history_request_url.format(crypto.symbol)

        response = self._get_response(
            base_url=base_url,
            request_params=request_params
        )
        if response is not None and response.status_code == 200:
            ohlcv_df = pd.read_csv(io.StringIO(response.text), sep=',')
            super()._store_dataset(dataset_df=ohlcv_df, filepath=history_filepath)
            return True
        else:
            return False

    def update_historical_data(self, crypto: Crypto, history_filepath: str) -> bool:
        if self.verbose:
            print(f'Updating {crypto.name} market latest data for {crypto.start_year}')

        request_params = self._get_request_params()
        request_params['limit'] = request_params['limit'].format(self._update_limit)
        del request_params['time_start']
        base_url = self._latest_request_url.format(crypto.symbol)

        response = self._get_response(
            base_url=base_url,
            request_params=request_params
        )

        if response is not None and response.status_code == 200:
            history_df = pd.read_csv(history_filepath)
            latest_df = pd.read_csv(io.StringIO(response.text), sep=',').sort_values(
                by=self.date_column_name, ascending=True
            )
            merged_df = pd.concat((history_df, latest_df), ignore_index=True)
            merged_df.drop_duplicates(subset=self.date_column_name, inplace=True)

            super()._store_dataset(dataset_df=merged_df, filepath=history_filepath)
            return True
        return False


# database.datasets.builder - builder.py

In [27]:
import numpy as np
import pandas as pd
from functools import reduce
# from analysis.technical.configs.config import TAConfig
# from analysis.technical.configs.standard import StandardTAConfig
# from analysis.technical.technical import TechnicalAnalysis
# from database.preprocessing.gtrends.gtrends import GoogleTrendsPreprocessing
# from database.preprocessing.coinapi.ohlcv import OHLCVPreprocessing
# from database.preprocessing.ta.ta import TechnicalAnalysisPreprocessing


class DatasetBuilder:
    def __init__(self):
        self._key_column = 'date'

    @staticmethod
    def _import_ohlcv_dataset(ohlcv_history_filepath: str) -> pd.DataFrame:
        ohlcv_df = pd.read_csv(ohlcv_history_filepath)
        ohlcv_df = OHLCVPreprocessing().preprocess(ohlcv_df=ohlcv_df)
        return ohlcv_df

    @staticmethod
    def _compute_technical_indicators(
            ohlcv_df: pd.DataFrame,
            ta_config: TAConfig,
    ) -> pd.DataFrame:
        ta_config_dict = ta_config.get_technical_analysis_config_dict(
            opens=ohlcv_df['open'],
            highs=ohlcv_df['high'],
            lows=ohlcv_df['low'],
            closes=ohlcv_df['close'],
            volumes=ohlcv_df['volume']
        )
        ta_df = TechnicalAnalysis(dates=ohlcv_df['date']).compute_technical_indicators(ta_config_dict=ta_config_dict)
        ta_df = TechnicalAnalysisPreprocessing(closes=ohlcv_df['close']).preprocess(ta_df=ta_df)
        return ta_df

    @staticmethod
    def _import_gtrends_dataset(
            gtrends_history_filepath: str,
            impute_missing_gtrends: bool,
            gtrends_imputing_percentage_threshold: float
    ):
        trends_df = pd.read_csv(gtrends_history_filepath)
        return GoogleTrendsPreprocessing(
            impute_missing_scores=impute_missing_gtrends,
            imputing_percentage_threshold=gtrends_imputing_percentage_threshold
        ).preprocess(trends_df=trends_df)

    def _merge_datasets(self, dataset_df_list: list) -> pd.DataFrame:
        for df in dataset_df_list:
            assert self._key_column in df.columns, \
                f'AssertionError: Key column: "{self._key_column}" is missing from a dataset. Cannot merge datasets'

        return dataset_df_list[0] if len(dataset_df_list) == 1 else \
            reduce(lambda left, right: pd.merge(left, right, on=self._key_column, how='left'), dataset_df_list)

    @staticmethod
    def _handle_missing_values(dataset_df: pd.DataFrame) -> pd.DataFrame:
        dataset_df['hour'] = dataset_df['date'].apply(lambda date: int(date.split(' ')[1].split(':')[0]))

        if 'trends' in dataset_df.columns:
            dataset_df['trends'].replace({np.nan: 0.0}, inplace=True)

        dataset_df.dropna(inplace=True)

        assert not dataset_df.isna().any().any(), \
            f'AssertionError: Imputation failed or incomplete. ' \
            f'Found missing values at columns: {dataset_df.columns[dataset_df.isna().any()]}'

        return dataset_df

    def build_dataset(
            self,
            ohlcv_history_filepath: str,
            gtrends_history_filepath: str or None,
            dataset_save_filepath: str,
            ta_config: TAConfig = StandardTAConfig,
            impute_missing_gtrends: bool = True,
            gtrends_imputing_percentage_threshold: float = 0.1
    ):
        ohlcv_df = self._import_ohlcv_dataset(ohlcv_history_filepath=ohlcv_history_filepath)
        ta_df = self._compute_technical_indicators(ohlcv_df=ohlcv_df, ta_config=ta_config)
        gtrends_df = self._import_gtrends_dataset(
            gtrends_history_filepath=gtrends_history_filepath,
            impute_missing_gtrends=impute_missing_gtrends,
            gtrends_imputing_percentage_threshold=gtrends_imputing_percentage_threshold
        )

        num_expected_samples = ohlcv_df.shape[0]
        merged_dataset_df = self._merge_datasets(dataset_df_list=[ohlcv_df, ta_df, gtrends_df])

        assert num_expected_samples == merged_dataset_df.shape[0], \
            'AssertionError: Merged dataset size mismatch: ' \
            f'Expected {num_expected_samples} samples, got {merged_dataset_df.shape[0]}'

        dataset_df = self._handle_missing_values(dataset_df=merged_dataset_df)
        dataset_df.to_csv(dataset_save_filepath, index=False)


# database.network.gtrends.gtrends - gtrends.py

In [28]:
import pandas as pd
from datetime import date
from pytrends.request import TrendReq
# from database.entities.crypto import Crypto
# from database.network.network import DatasetDownloader


class GoogleTrendsDownloader(DatasetDownloader):
    def __init__(self, verbose: bool = True):
        super().__init__(date_column_name='date', verbose=verbose)

    def _download_year_trends(self, keyword: str, year: int) -> pd.DataFrame:
        today = date.today()

        if year == today.year:
            month_end = today.month
            day_end = today.day
        else:
            month_end = 12
            day_end = 31

        return TrendReq().get_historical_interest(
            keywords=[keyword],
            year_start=year,
            month_start=1,
            day_start=1,
            hour_start=0,
            year_end=year,
            month_end=month_end,
            day_end=day_end,
            hour_end=23
        ).reset_index().drop_duplicates(subset=self.date_column_name)

    def download_historical_data(self, crypto: Crypto, history_filepath: str) -> bool:
        if self.verbose:
            print(f'Downloading {crypto.name} trends. It might take some time...')

        trends_df_list = []
        today_year = date.today().year
        for year in range(crypto.start_year, today_year + 1):
            trends_df_list.append(self._download_year_trends(keyword=crypto.name, year=year))

        trends_df = pd.concat(trends_df_list, ignore_index=True)

        super()._store_dataset(
            dataset_df=trends_df,
            filepath=history_filepath,
            columns=[self.date_column_name, crypto.name]
        )
        return True

    def update_historical_data(self, crypto: Crypto, history_filepath: str) -> bool:
        if self.verbose:
            print(f'Updating {crypto.name} trends history')

        current_year = date.today().year
        history_df = pd.read_csv(history_filepath)
        history_df = history_df[history_df[self.date_column_name] < str(current_year)]
        latest_df = self._download_year_trends(keyword=crypto.name, year=current_year)
        merged_df = pd.concat((history_df, latest_df), ignore_index=True)
        merged_df.drop_duplicates(subset=self.date_column_name, inplace=True)

        assert not merged_df.duplicated(subset=self.date_column_name).any(), \
            'AssertionError: Duplicates found on Google Trends dates'

        super()._store_dataset(
            dataset_df=merged_df,
            filepath=history_filepath,
            columns=[self.date_column_name, crypto.name]
        )
        return True


# database.network.downloader - downloader.py

In [29]:
# from database.entities.crypto import Crypto
# from database.network.coinapi.ohlcv import OHLCVDownloader
# from database.network.gtrends.gtrends import GoogleTrendsDownloader


class CryptoDatasetDownloader:
    def download_crypto_datasets(
            self,
            crypto: Crypto,
            ohlcv_history_filepath: str,
            gtrends_history_filepath: str,
            historical_frequency: OHLCVDownloader.HistoricalFrequency or str,
            verbose: bool = True
    ) -> bool:
        ohlcv_downloader = OHLCVDownloader(historical_frequency=historical_frequency, verbose=verbose)
        gtrends_downloader = GoogleTrendsDownloader(verbose=verbose)

        if ohlcv_downloader.download_historical_data(
                crypto=crypto,
                history_filepath=ohlcv_history_filepath.format(crypto.symbol)
        ) and gtrends_downloader.download_historical_data(
            crypto=crypto,
            history_filepath=gtrends_history_filepath.format(crypto.symbol)
        ):
            if verbose:
                print(f'Successfully downloaded {crypto.symbol} dataset')

            return True

        if verbose:
            print('Download failed')

        return False


# config - import config.py

In [30]:
# from database.entities.crypto import Crypto
# from database.network.coinapi.ohlcv import OHLCVDownloader

# --- Database ---
supported_cryptos = {
    'BTC': Crypto(symbol='BTC', name='bitcoin', start_year=2017),
    'ETH': Crypto(symbol='ETH', name='ethereum', start_year=2017),
    'SOL': Crypto(symbol='SOL',  name='solana', start_year=2020),
    'ADA': Crypto(symbol='ADA', name='ada', start_year=2017),
    'BNB': Crypto(symbol='BNB', name='bnb', start_year=2019),
    'XRP': Crypto(symbol='XRP', name='xrp', start_year=2019),
    'DOGE': Crypto(symbol='DOGE', name='doge', start_year=2020),
    'MATIC': Crypto(symbol='MATIC', name='polygon', start_year=2020),
    'TRON': Crypto(symbol='TRON', name='tron', start_year=2018),
    'LTC': Crypto(symbol='LTC', name='litecoin', start_year=2018),
    'DOT': Crypto(symbol='DOT', name='polkadot', start_year=2021),
    'AVAX': Crypto(symbol='AVAX', name='avalanche', start_year=2021),
    'XMR': Crypto(symbol='XMR', name='monero', start_year=2018),
    'BAT': Crypto(symbol='BAT', name='basic authentication token', start_year=2018),
    'LRC': Crypto(symbol='LRC', name='loopring', start_year=2018)
}

ohlcv_dataset_period_id = OHLCVDownloader.HistoricalFrequency.HOUR
ohlcv_history_filepath = 'database/storage/downloads/ohlcv/{}.csv'
gtrends_history_filepath = 'database/storage/downloads/gtrends/{}.csv'
dataset_save_filepath = 'database/storage/datasets/{}.csv'
all_features = [
    'date', 'open', 'high', 'low', 'close', 'volume', 'trades',
    'open_log_returns', 'high_log_returns', 'low_log_returns',
    'close_log_returns', 'volume_log_returns', 'trades_log_returns', 'hour',
    'dema', 'vwap', 'bband_up', 'bband_down', 'adl', 'obv',
    'macd_signal_diffs', 'stoch', 'aroon_up', 'aroon_down', 'rsi', 'adx', 'cci',
    'close_dema', 'close_vwap', 'bband_up_close', 'close_bband_down', 'adl_diffs2', 'obv_diffs2', 'trends'
]
regression_features = [
    'open_log_returns', 'high_log_returns', 'low_log_returns',
    'close_log_returns', 'volume_log_returns', 'trades_log_returns', 'hour',
    'macd_signal_diffs', 'stoch', 'aroon_up', 'aroon_down', 'rsi', 'adx', 'cci',
    'close_dema', 'close_vwap', 'bband_up_close', 'close_bband_down', 'adl_diffs2', 'obv_diffs2', 'trends'
]

# --- Model ---
checkpoint_dir = 'database/storage/checkpoints/'

# --- Clustering ---
crypto_clusters = [
    ['BTC', 'ETH', 'SOL', 'ADA', 'XPR', 'DOGE', 'DOT', 'AVAX', 'BAT', 'LRC'],
    ['ETH', 'BNB', 'MATIC', 'TRON', 'LTC', 'XMR']
]




# download_datasets.py

In [31]:
import warnings
# import config
# from analysis.technical.configs.standard import StandardTAConfig
# from database.datasets.builder import DatasetBuilder
# from database.network.downloader import CryptoDatasetDownloader

warnings.filterwarnings('ignore')

ta_config = StandardTAConfig()
import_gtrends = True
impute_missing_gtrends = True
gtrends_imputing_percentage_threshold = 0.1
verbose = True


def main():
    downloader = CryptoDatasetDownloader()
    builder = DatasetBuilder()

    for crypto_symbol, crypto in supported_cryptos.items():
        if downloader.download_crypto_datasets(
            crypto=crypto,
            ohlcv_history_filepath=ohlcv_history_filepath,
            gtrends_history_filepath=gtrends_history_filepath,
            historical_frequency=ohlcv_dataset_period_id,
            verbose=verbose
        ):
            builder.build_dataset(
                ohlcv_history_filepath=ohlcv_history_filepath.format(crypto_symbol),
                gtrends_history_filepath=gtrends_history_filepath.format(crypto_symbol),
                dataset_save_filepath=dataset_save_filepath.format(crypto_symbol),
                ta_config=ta_config,
                impute_missing_gtrends=impute_missing_gtrends,
                gtrends_imputing_percentage_threshold=gtrends_imputing_percentage_threshold
            )

            if verbose:
                print(f'Successfully has built {crypto_symbol} dataset')
        else:
            if verbose:
                print(f'Download of {crypto_symbol} has been aborted')


if __name__ == '__main__':
    main()


Downloading bitcoin market history data for 2017
Using apikey: 70E10174-E29D-449F-9F2E-6E8362931DD9
Response Status: 404 - Not Found
Using apikey: 27E5E40C-7A6B-45EB-A5C8-8311B049A741
Response Status: 404 - Not Found
Using apikey: 8F6252DE-0AD7-478F-91C7-141141E8BE8B
Response Status: 404 - Not Found
Using apikey: 3B49210E-100B-4F8D-9011-2BA5D38274BA
Response Status: 404 - Not Found
Using apikey: BF6BF46F-B44B-416E-9656-2D2AAFBC058B
Response Status: 404 - Not Found
Using apikey: B21A98A2-C953-4C73-84CF-CFFB6F712200
Response Status: 404 - Not Found
Using apikey: 51667E99-7686-4496-B23D-6DA54F7E37AE
Response Status: 401 - Unauthorized
Using apikey: 0921F87B-BF55-4B78-B8B0-E023B4D7A2E2
Response Status: 401 - Unauthorized
Using apikey: 3F9E3251-029C-457A-9ADA-7F21A440AAF9
Response Status: 401 - Unauthorized
Using apikey: 41EBEA2D-1A4B-4654-8A41-186639B9AB9F
Response Status: 401 - Unauthorized
Using apikey: 6B93AEC2-910C-4064-80FB-91AED487AB97
Response Status: 401 - Unauthorized
Using apikey