In [1]:
import pandas as pd
import requests
import json
from urllib.parse import urlencode
from pytezos.michelson.micheline import blind_unpack
from pprint import pprint
import asyncio


class CoinbaseAPI:
    def __init__(self, uri='https://api.pro.coinbase.com'):
        self.uri = uri

    def get_history_prices(self, pair, start=None, stop=None, granularity=900):
        """ requests history data from coinbase, creates dataframe and makes
            all processing and type transformations """

        params = {
            'granularity': granularity
        }

        if start:
            params['start'] = start

        if stop:
            params['stop'] = stop

        method = f'{self.uri}/products/{pair}/candles?{urlencode(params)}'
        response = requests.get(method)
        headers = ['time', 'low', 'high', 'open', 'close', 'volume']
        df = pd.DataFrame(json.loads(response.text), columns=headers)

        assert not df.time.duplicated().any()

        return df.set_index('time')


class CoinbaseData:

    def __init__(self, pair_name='XTZ-USD'):
        self.pair_name = pair_name
        self.responses_history = []
        self.api = CoinbaseAPI()
        self.update()


    def update(self):
        self.last_prices = self.api.get_history_prices(pair=self.pair_name, granularity=60)

        # looks like history data in API can be changed,
        # to check this I am storing prev requested data:
        self.responses_history.append(self.last_prices)
        self.responses_history = self.responses_history[-100:]

        # Kolibri data using integers with 10**6 precision:
        self.last_prices_int = (self.last_prices * 1_000_000).astype(int)


    def has_close_timestamp(self, timestamp):
        return timestamp in self.last_prices_int.index


    def check_candle(self, candle, check_columns=['low', 'high', 'open', 'close', 'volume']):
        """ Checks that provided candle is the same as in self.last_prices """

        timestamp = candle.close_timestamp
        true_candle = self.last_prices_int.loc[ timestamp ][ check_columns ]
        candle = candle[ check_columns ]

        is_same_values = all(true_candle == candle)

        if not is_same_values:
            print(f'{timestamp}: data from kolibri differs from coinbase:')
            print('kolibri_candle: \n', candle)
            print('coinbase_candle: \n', true_candle)
            return False
        else:
            print(f'{timestamp}: data in kolibri is the same as in coinbase')
            return True


    async def autoupdate(self, period=30):
        while True:
            self.update()
            await asyncio.sleep(period)


    def get_timestamp_request_history(self, timestamp):
        """ Returns all history data for given timestamp """

        return pd.DataFrame([
            df.assign(last_index=max(df.index)).loc[timestamp]
            for df in self.responses_history
            if timestamp in df.index
        ])


class WrongDataException(Exception):
    pass


def unpack_xtz_price(oracle_data, pair_name):
    unpacked_data = [
        blind_unpack(bytes.fromhex(msg))
        for msg in oracle_data['messages']
    ]

    data = {
        pair[0]: pair for pair in unpacked_data
        # checking len of the pair because if it is > 2: this is signature string instead of data
        if len(pair) == 2
    }

    if not pair_name in data:
        raise WrongDataException(f'kolibri data have no {pair_name} pair: {data}')

    pair = data[pair_name]

    return pd.Series({
        'open_timestamp': pair[1][0],
        'close_timestamp': pair[1][1][0],
        'open': pair[1][1][1][0],
        'high': pair[1][1][1][1][0],
        'low': pair[1][1][1][1][1][0],
        'close': pair[1][1][1][1][1][1][0],
        'volume': pair[1][1][1][1][1][1][1]
    })


def get_last_kolibri_data(pair_name):
    response = requests.get('https://oracle-data.kolibri.finance/data.json')
    kolibri_raw = json.loads(response.text)
    return unpack_xtz_price(kolibri_raw, pair_name)


async def check_candle_while_succeed_task(coinbase_data, candle, attempts=10):
    for _ in range(attempts):
        if coinbase_data.has_close_timestamp(candle.close_timestamp):
            return coinbase_data.check_candle(candle)
        await asyncio.sleep(30)


async def check_data_in_cycle(coinbase_data, pair_name):
    last_close_timestamp = 0
    # coinbase_data = CoinbaseData()
    autoupdate_task = asyncio.create_task(coinbase_data.autoupdate())
    stats = {
        'same': 0,
        'different': 0
    }

    while True:
        try:
            kolibri_candle = get_last_kolibri_data(pair_name)

        except WrongDataException as e:
            """ This is strange exception when data in kolibri contains
                signature instead of candle data """

            # print(f'wrong data in kolibri oracle, {str(e)}')
            await asyncio.sleep(10)
            continue

        is_new_data = kolibri_candle['close_timestamp'] > last_close_timestamp

        if is_new_data:
            is_same = await check_candle_while_succeed_task(coinbase_data, kolibri_candle)

            if is_same:
                stats['same'] += 1
            else:
                stats['different'] += 1

            print(f"stats --- same: {stats['same']}, different: {stats['different']}")

        last_close_timestamp = kolibri_candle['close_timestamp']

        await asyncio.sleep(30)

    await autoupdate_task

### Running data checker:

In [None]:
# PAIR_NAME = 'XTZ-USD'
PAIR_NAME = 'BTC-USD'

coinbase_data = CoinbaseData(pair_name=PAIR_NAME)
await check_data_in_cycle(coinbase_data, pair_name=PAIR_NAME)

1626107280: data in kolibri is the same as in coinbase
stats --- same: 1, different: 0


### Check coinbase API requests history for given timestamp:

In [3]:
df = coinbase_data.get_timestamp_request_history(1626105000)
(df == df.iloc[0]).all()

low           True
high          True
open          True
close         True
volume        True
last_index    True
dtype: bool

In [4]:
df

Unnamed: 0,low,high,open,close,volume,last_index
1626105000,2.8417,2.8449,2.8417,2.8432,2201.67,1626105000.0
1626105000,2.8417,2.8449,2.8417,2.8432,2201.67,1626105000.0
1626105000,2.8417,2.8449,2.8417,2.8432,2201.67,1626105000.0
1626105000,2.8417,2.8449,2.8417,2.8432,2201.67,1626105000.0
1626105000,2.8417,2.8449,2.8417,2.8432,2201.67,1626105000.0
