# Crypto Queries

## Utility classes used in ingestion

Used to return a filestream to a bucket in GCP

In [1]:
import io, traceback

from google.cloud import storage


class BucketFileStreamReader:

    def __init__(self,bucket_name):

        self._bucket_name = bucket_name

    def read(self,filename):

        # get client connection
        client = storage.Client()

        try:
            # get handle to the bucket
            bucket = client.get_bucket(self._bucket_name)
            # get the blob from the bucket
            blob = bucket.get_blob(filename)

            # Return the file as a stream to the caller
            return io.BytesIO(blob.download_as_string())

        except:
            print(traceback.format_exc())
            raise


Write/Upload files to bucket

In [2]:
import io
import traceback

from google.cloud import storage


class BucketFileWriter:

    def __init__(self,bucket_name):

        self._bucket_name = bucket_name

    def upload(self,filename):

        # get client connection
        client = storage.Client()

        try:
            # get handle to the bucket
            bucket = client.get_bucket(self._bucket_name)
            # get the blob from the bucket
            blob = bucket.blob(filename)
            blob.upload_from_filename(filename=filename)

        except:
            print(traceback.format_exc())
            raise



This reads in the .yml config file

In [3]:
import traceback, yaml
# from BucketFileStreamReader import BucketFileStreamReader


class ConfigReader:


    def __init__(self):

        self._bucket_file_reader = BucketFileStreamReader(bucket_name="gcp-challenge-javen-caserta")


    @property
    def config(self):
        #TODO: cache the config
        return self._read_config()

    def _read_config(self):

        with self._bucket_file_reader.read(filename="config.yml") as stream:
            try:
                return yaml.load(stream)
            except:
                print(traceback.format_exc())
                raise

In [4]:
class CryptoIngesterError(Exception):
    pass

Identify errors which are due to API response

In [5]:
class CryptoIngestResponseError(Exception):
    pass

This class is the main engine. It is designed to run indefinitely and update every 5-10 minutes. 

In [8]:
import datetime, time, traceback, urllib.parse, json
import pandas as pd
from pandas.io.json import json_normalize
from requests import get
import numpy as np

# from BucketFileWriter import BucketFileWriter
# from ConfigReader import ConfigReader
# from CryptoIngestResponseError import CryptoIngestResponseError
# from CryptoIngesterError import CryptoIngesterError


class CryptoDataIngesterEarlyTerm:
    """
    This class queries a crypto currency rate source. It will update it at most once every 5 minutes (as that is the
    rate at which the data is updated). The data once (and as) it is queried, is persisted to a data store through a
    data interactor, or to multiple locations through a collection of data interactors. This is not the most performant
    approach, but extra effort was given to make sure that the API's robot rules are obeyed.

    Numerous embedded (scoped) classes were used to control their use to this classes (intent).

    API called has docs here: https://coinmarketcap.com/api/
    """


    class InitializedStruct:

        @property
        def first_ticker_added(self):
            return self._first_ticker_added

        @first_ticker_added.setter
        def first_ticker_added(self,value):
            if isinstance(value, bool):
                self._first_ticker_added = value
            else:
                raise CryptoIngesterError(
                    'InitializedStruct.first_ticker_added must be set as a bool, not as a {}'.format(type(value)))

        @property
        def initialized(self):
            return self.listing_initialized and self.tickers_initialized and self.global_content_initialized

        @property
        def listing_initialized(self):
            return self._listing_initialized

        @listing_initialized.setter
        def listing_initialized(self, value):
            if isinstance(value, bool):
                self._listing_initialized = value
            else:
                raise CryptoIngesterError(
                    'InitializedStruct.listing_initialized must be set as a bool, not as a {}'.format(type(value)))

        @property
        def tickers_initialized(self):
            return self._tickers_initialized

        @tickers_initialized.setter
        def tickers_initialized(self, value):
            if isinstance(value, bool):
                self._tickers_initialized = value
            else:
                raise CryptoIngesterError(
                    'InitializedStruct.tickers_initialized must be set as a bool, not as a {}'.format(type(value)))

        @property
        def global_content_initialized(self):
            return self._global_content_initialized

        @global_content_initialized.setter
        def global_content_initialized(self, value):
            if isinstance(value, bool):
                self._global_content_initialized = value
            else:
                raise CryptoIngesterError(
                    'InitializedStruct.global_content_initialized must be set as a bool, not as a {}'.format(type(value)))

        def __init__(self):
            self.listing_initialized = False
            self.tickers_initialized = False
            self.global_content_initialized = False
            self.first_ticker_added = False

    class PageTracker:
        def __init__(self, page_length, max_page):
            self._page_length = page_length
            self._max_page = max_page

            self._next_page = None
            self._completed_full_cycle = False
            self._last_page = None
            self._comping_page = None

        @property
        def last_page(self):
            return self._last_page

        @property
        def max_page(self):
            return self._max_page

        @max_page.setter
        def max_page(self,value):
            self._max_page = value

        @property
        def completed_full_cycle(self):
            return self._completed_full_cycle

        @property
        def next_page(self):

            if self._next_page is None:
                page_to_return = 1
                self._last_page = page_to_return
                self._next_page = self._page_length + 1
            else:
                page_to_return = self._next_page
                self._last_page = page_to_return
                self._next_page = self._next_page + self._page_length
                if self._next_page > self.max_page:
                    self._next_page = 1
                    self._completed_full_cycle = True
                pass

            return page_to_return

        @property
        def coming_page(self):
            return self._next_page

    class CSV_Persistor:

        def __init__(self,bucket):
            self.bucket_writer = BucketFileWriter(bucket_name=bucket)

        def persist(self,blob_filename,dataframe):
            if not isinstance(dataframe,pd.DataFrame):
                raise ValueError('dataframe must be of type pandas.DataFrame')

            import os
            print(dataframe.columns)
            dataframe.to_csv(blob_filename, index = False)

            self.bucket_writer.upload(blob_filename)

    class Logger: #TODO This is blocking and simple. Change to operate on thread and write to BigTable, Mongo, or other noSQL DB at a later time
        '''
          This logger handles thrtee cases:

          1: errors - these are real error from execution
          2. api_errors - these are errors encountered in the API calls
          3. event_log - these are general, usually infomrational, events.
        '''

        error_log = 'errors.json'
        api_errors = 'api_errors.json'
        event_log = 'events.json'

        def __init__(self,bucket):
            self.bucket_writer = BucketFileWriter(bucket_name=bucket)

        def log(self,message,type_of_message):
            if type_of_message=='error':
                with open(self.error_log,'a') as fh:
                    fh.write(message)
                self.bucket_writer.upload(self.error_log)
            elif type_of_message=='api_error':
                with open(self.api_errors,'a') as fh:
                    fh.write(message)
                self.bucket_writer.upload(self.api_errors)
            elif type_of_message=='event':
                with open(self.event_log,'a') as fh:
                    fh.write(message)
                self.bucket_writer.upload(self.event_log)
            else:
                pass

    def __init__(self):

        # This has been added to provide a way to terminate. Later in code it will terminate not after first creation, but after first update.
        # Note that this is not in the git repo version of the code.
        self._num_updates = 0

        config_reader = ConfigReader()
        config = config_reader.config

        self._listing_config = config['api']['listings']
        self._ticker_config = config['api']['ticker']
        self._global_config = config['api']['global']
        self._persist_config = config['persist']

        self.pull_frequency_minimum_interval = config['api']['pull_frequency_minimum_interval']

        self._error_timeout = config['api']['error_timeout']
        self._timeout = config['api']['timeout']
        self._refresh_timeout = self._ticker_config['refresh_period']
        self._timeout_start = None
        self._in_timeout = False

        self._currency_df = pd.DataFrame.empty

        self._meta_data = None

        self._last_pull = self.current_time

        self._last_listing_update = self.current_time
        self._initilization_tracker = self.InitializedStruct()

        self.page_tracker = self.PageTracker(page_length=config['api']['ticker']['page_length'],
                                             max_page=None)

        self._persistor = self.CSV_Persistor(bucket=self._persist_config['bucket'])
        self._logger = self.Logger(bucket=self._persist_config['bucket'])

        self.running = False

        self._logger.log(message=json.dumps({'message': 'crypto data ingester crated',
                                  'time': self.current_time.strftime('%s'),
                                  'type': 'info'}),
                         type_of_message='event')

    @property
    def current_time(self):
        return datetime.datetime.now()

    @property
    def pull_allowed(self):
        condition_1 = (self.current_time - self._last_pull).total_seconds() > self.pull_frequency_minimum_interval
        condition_2 = self._timeout_start is None or ((self.current_time - self._timeout_start).total_seconds() > self._refresh_timeout)

        self._in_timeout = not condition_2

        return condition_1 and condition_2

    def run_updater(self):
        """
        This is the engine. It will make sure everything is initialized (data) and that it keep up to date. Any handlable
        exception will be handled and logged. This software should only shutdown on KeyboardInterrupt.
        """

        self._logger.log(message=json.dumps({'message': 'data updater started',
                                  'time': self.current_time.strftime('%s'),
                                  'type': 'info'}),
                         type_of_message='event')

        self.running = True

        while(self.running):

            try:
                self._logger.log(message=json.dumps({'message': 'Next processing step',
                                          'time': self.current_time.strftime('%s'),
                                          'type': 'info'}),
                                 type_of_message='event')
                if self._initilization_tracker.initialized:
                    self._update_data()
                else:
                    self._initialize_data()
                time.sleep(self._timeout)

            except KeyboardInterrupt:
                self._logger.log(message=json.dumps({'message': 'Keyboard interrupt: software shutting down',
                                          'time': self.current_time.strftime('%s'),
                                          'type': 'info'}),
                                 type_of_message='error')
                print('exiting software')
                msg = json.dumps({'message':'Exiting software by Keyboard Interrupt.','time':self.current_time.strftime('%s')})
                self._logger.log(message=msg,
                                 type_of_message='error')
                self.running = False

            except CryptoIngestResponseError as ex:
                self._logger.log(message=json.dumps({'message': 'error with ingestion',
                                          'time': self.current_time.strftime('%s'),
                                          'type': 'error'}),
                                 type_of_message='event')

                self._logger.log(message=str(ex),type_of_message='api_error')
                print('Encountered API error. Sleeping for {0} seconds.'.format(self._error_timeout))
                time.sleep(self._error_timeout)

            except:

                print(traceback.format_exc())
                self._logger.log(message=json.dumps({'message': 'an error has occurred',
                                          'time': self.current_time.strftime('%s'),
                                          'type': 'error'}),
                                 type_of_message='event')

                msg = json.dumps({'message':traceback.format_exc(),'time':self.current_time.strftime('%s')})
                self._logger.log(message=msg,
                                 type_of_message='error')
                print('Encountered error. Sleeping for {0} seconds.'.format(self._error_timeout))
                time.sleep(self._error_timeout)

    def _initialize_data(self):

        if not self._initilization_tracker.global_content_initialized:
            if self._update_global(override_update=True):
                self._logger.log(message=json.dumps({'message': 'cglobal content initialized',
                                          'time': self.current_time.strftime('%s'),
                                          'type': 'info'}),
                                 type_of_message='event')
                self._initilization_tracker.global_content_initialized = True
        elif not self._initilization_tracker.listing_initialized:
            if self._update_currency_list(override_update=True):
                self._logger.log(message=json.dumps({'message': 'ticker listings have been initialized',
                                          'time': self.current_time.strftime('%s'),
                                          'type': 'info'}),
                                 type_of_message='event')
                self._initilization_tracker.listing_initialized = True
        elif not self._initilization_tracker.tickers_initialized:
            self._update_next_ticker_set()
            if self.page_tracker.completed_full_cycle:
                self._logger.log(message=json.dumps({'message': 'ticker data has been initialized',
                                         'time': self.current_time.strftime('%s'),
                                         'type': 'info'}),
                                type_of_message='event')
                self._initilization_tracker.tickers_initialized = True

    def _update_data(self):
        self._update_currency_list()

        self._update_next_ticker_set()

    def _update_global(self, override_update = False):

        try:

            time = self.current_time

            if ((time - self._last_listing_update).total_seconds() > self._global_config['pull_frequency'] or override_update) \
                    and self.pull_allowed:
                result = get(self._global_config['address']).json()
                self._last_pull = time
                if result['data']:
                    self.page_tracker.max_page = result['data']['active_cryptocurrencies']
                else:
                    raise CryptoIngestResponseError(json.dumps(result['metadata']))
                self._logger.log(message=json.dumps({'message': 'global data base been updated',
                                         'time': self.current_time.strftime('%s'),
                                         'type': 'info'}),
                                type_of_message='event')
                return True
            else:
                return False

        except:
            self._logger.log(message=json.dumps({'message': 'error encountered when updating global data',
                                     'time': self.current_time.strftime('%s'),
                                     'type': 'error'}),
                            type_of_message='event')
            print(traceback.format_exc())
            raise

    def _update_next_ticker_set(self):
        try:

            if (self.pull_allowed):

                time = self.current_time

                query = '?start={0}&sort=id&structure=array'.format(self.page_tracker.next_page)
                get_url = urllib.parse.urljoin(self._ticker_config['address'],query)

                result = get(get_url).json()
                self._last_pull = time

                if result['data']:
                    records = json_normalize(result['data'])
                    response_df = pd.DataFrame.from_records(records,index='id')
                    response_df['last_updated'] = response_df['last_updated'].apply(
                        lambda x: datetime.datetime.fromtimestamp(x).strftime('%Y-%m-%d %H:%M:%S') if not np.isnan(x) else None)
                else:
                    raise CryptoIngestResponseError(json.dumps(result['metadata']))

                if self._initilization_tracker.first_ticker_added:
                    self._currency_df.update(response_df)
                else:
                    self._currency_df = pd.merge(left=self._currency_df, right=response_df, on=['name', 'symbol', 'website_slug'], how='left')
                    self._initilization_tracker.first_ticker_added = True

            self._logger.log(message=json.dumps({'message': 'ticker data updated',
                                     'time': self.current_time.strftime('%s'),
                                     'type': 'info'}),
                            type_of_message='event')
        except:
            self._logger.log(message=json.dumps({'message': 'error updating ticker info',
                                     'time': self.current_time.strftime('%s'),
                                     'type': 'error'}),
                            type_of_message='event')
            print(traceback.format_exc())
            raise

        finally:
            if self.page_tracker.coming_page == 1 and not self._in_timeout:
                # Following three lines added to make sure this terminates. Not in git repo.
                if self._num_updates == 1:
                    self.running = False
                self._num_updates = self._num_updates + 1
                self._timeout_start = self.current_time
                self._persistor.persist(blob_filename=self._persist_config['filename'],
                                        dataframe=self._currency_df)
                self._in_timeout = True

    def _update_currency_list(self, override_update = False):
        try:

            time = self.current_time

            if ((time - self._last_listing_update).total_seconds() > self._listing_config['pull_frequency'] or override_update)\
                    and self.pull_allowed:

                result = get(self._listing_config['address']).json()
                self._last_pull = time
                if result['data']:
                    response_df = pd.DataFrame.from_records( result['data'], index='id' )
                else:
                    raise CryptoIngestResponseError(json.dumps(result['metadata']))

                if self._currency_df == pd.DataFrame.empty:
                    self._currency_df = response_df
                else:
                    self._currency_df = pd.merge(left=self._currency_df, right=result, on=['name', 'symbol', 'website_slug'], how='right')

                self._logger.log(message=json.dumps({'message': 'ticker list updated',
                                         'time': self.current_time.strftime('%s'),
                                         'type': 'info'}),
                                type_of_message='event')

                return True
            else:
                return False

        except:
            self._logger.log(message=json.dumps({'message': 'error encountered while updating ticker list',
                                     'time': self.current_time.strftime('%s'),
                                     'type': 'info'}),
                            type_of_message='event')
            print(traceback.format_exc())
            raise

## Execute the engine

In [9]:
di = CryptoDataIngesterEarlyTerm()

di.run_updater()

Index(['name', 'symbol', 'website_slug', 'circulating_supply', 'last_updated',
       'max_supply', 'quotes.USD.market_cap', 'quotes.USD.percent_change_1h',
       'quotes.USD.percent_change_24h', 'quotes.USD.percent_change_7d',
       'quotes.USD.price', 'quotes.USD.volume_24h', 'rank', 'total_supply'],
      dtype='object')
Index(['name', 'symbol', 'website_slug', 'circulating_supply', 'last_updated',
       'max_supply', 'quotes.USD.market_cap', 'quotes.USD.percent_change_1h',
       'quotes.USD.percent_change_24h', 'quotes.USD.percent_change_7d',
       'quotes.USD.price', 'quotes.USD.volume_24h', 'rank', 'total_supply'],
      dtype='object')


## Move to Big Query

This was accomplished through the UI (web console). Done more out of time than necessity. I spent a little time research transfering via API from google buckets to bigquery

## Answer questions

* How many coins have a values (In USD) greater than $8000

In [10]:
%%bq query
SELECT COUNT(*) as Number_Of_Coins_With_Value_Greater_Than_8000USD  FROM `crypto.crypto` 
  WHERE quotes_USD_price IS NOT NULL 
  AND quotes_USD_price > 8000

# In the above query, the idea is to find coins which have a value greater than 8000, given that they do not have a null valuation.

Number_Of_Coins_With_Value_Greater_Than_8000USD
2


* What is the total market cap of the top 100 cryptocurrencies (USD)

In [11]:
%%bq query
SELECT SUM(quotes_USD_market_cap) as Top_100_Market_Cap FROM 
( SELECT * FROM `crypto.crypto` 
  WHERE quotes_USD_market_cap IS NOT NULL 
  ORDER BY quotes_USD_market_cap DESC limit 100) as T

# The idea above is to first create a subquery (could have created a temp table as well) to obtain 
# all non-null market caps and order them in descending order - and only return the first 100. Summing
# over these 100 will give the market cap of the top 100 cryptocurrencies.

Top_100_Market_Cap
331867007388.0


* Which coins have an available supply > $5M (USD)

In [12]:
%%bq query
SELECT name, T.Computed_Market_Cap FROM 
(SELECT name, quotes_USD_price*circulating_supply as Computed_Market_Cap,quotes_USD_price,circulating_supply,quotes_USD_market_cap FROM `crypto.crypto` 
WHERE quotes_USD_price is not NULL
AND circulating_supply is not NULL) as T
WHERE T.Computed_Market_Cap < 5000000
ORDER BY T.Computed_Market_Cap DESC

# Note that Computed_Market_Cap was used here instead, since it is a derived value (assumed). It was not doen so above for simplicity. If a productoin 
# situation you would only be using computed values to avoid errors which could come from rounding which could have occurred before persistence with 
# pre-computed vlaues.

name,Computed_Market_Cap
OKCash,4798672.6403109
Mintcoin,4740802.813126664
FoldingCoin,4640722.9782347
Golos,4541988.4237833
Sequence,4405733.1470789
BitConnect,4321057.273325
Dotcoin,4284267.128475
Pluton,4279877.5
MUSE,4258047.751866001
AsiaCoin,4209817.77752076


* Which 5 coins have seen the greatest percentage growth in the last week?

In [14]:
%%bq query
SELECT name, quotes_USD_percent_change_7d FROM `crypto.crypto` 
ORDER BY quotes_USD_percent_change_7d DESC
Limit 5

# The quotes_USD_percentage_change_7d contains the weekly change in 
# value of the currency, sorting in descending order and limiting to 
# 5 items gives us what we want.

name,quotes_USD_percent_change_7d
PayCon,254.43
Solarflarecoin,229.45
Debitcoin,120.94
PureVidz,105.71
Master Swiscoin,90.44


* How many ticker symbols contain the letter "X" ?

In [16]:
%%bq query
SELECT count(symbol) as Number_of_Coins_with_X_in_Ticker_Symbol FROM `crypto.crypto` 
WHERE symbol LIKE "%X%"

# Here we want to first find all symbols with X as a substring of their ticker symbol.
# Once we have this we simply count the number of such symbols returned.

Number_of_Coins_with_X_in_Ticker_Symbol
208
