# CoinCap API ingestion



In [None]:
import requests
import pandas as pd
import os
from datetime import datetime
from sqlalchemy import create_engine, text
import time

In [None]:
class CoinCap_monitor:
    def __init__(self):
        self.__host = os.environ['API_URI']
        self.__key = os.environ['API_KEY']
        self.__pg_uri = os.environ['DB_URI']
        self.__pg_port = os.environ['DB_PORT']
        self.__pg_user = os.environ['POSTGRES_USER']
        self.__pg_password = os.environ['POSTGRES_PASSWORD']
        self.__pg_database = os.environ['POSTGRES_DB']


    def get_endpoint(self, endpoint:str) -> bool:
            
        try:
            if endpoint!='/markets' and endpoint!='/assets' and endpoint!='/exchanges':
                raise Exception('The endpoint is not valid.')

            offset = 0
            response_list=[]
            if endpoint == '/markets':
                while True:
                    params = {"key": self.__key, "limit":2000, 'offset':offset}
                    response = requests.get(self.__host + endpoint, params=params)
                    response.raise_for_status()
                    response_list += response.json().get("data")
                    if len(response.json().get("data")) < 2000:
                        break
                    offset+=2000
            else:
                params = {"key": self.__key}
                response = requests.get(self.__host + endpoint, params=params)
                response.raise_for_status()
                response_list = response.json().get("data")
            data_df = pd.DataFrame(response_list)
            
            if endpoint == '/assets':
                self.raw_assets = data_df
            elif endpoint == '/exchanges':
                self.raw_exchanges = data_df
            elif endpoint == '/markets':
                self.raw_markets = data_df

            return True
            
        except Exception as e:
            print(f"something unexpected happened: {e}")
            return False

    def write_lake(self, layer:str, table:str, df:pd.DataFrame, now:datetime) -> bool:
        try:
            if layer!='raw' and layer!='trusted' and layer!='refined':
                raise Exception('The layer is not valid.')

            if table!='assets' and table!='exchanges' and table!='markets':
                raise Exception('The table is not valid.')
            
            date_str = now.strftime("%Y-%m-%d_%H-%M-%S")
            path = f'/home/jovyan/data/{layer}/{table}'
            file_name = f'{table}-{date_str}.parquet'
            if not os.path.exists(path):
                os.makedirs(path)
            df.to_parquet(path=f'{path}/{file_name}', index=False)
            return True
        except Exception as e:
            print(f"something unexpected happened: {e}")
            return False

    def refine_assets(self, now:datetime) -> bool:
        try:
            df_assets = self.raw_assets.copy()
            df_assets = df_assets[['id', 'rank', 'symbol', 'name', 'supply',
                       'maxSupply', 'marketCapUsd', 'volumeUsd24Hr', 'priceUsd']]
            df_assets.drop_duplicates(subset=['id'], keep='last', inplace=True)
            df_assets = df_assets.astype({'rank': 'int64',
                                          'supply': 'float64',
                                          'maxSupply': 'float64',
                                          'marketCapUsd': 'float64',
                                          'volumeUsd24Hr': 'float64',
                                          'priceUsd': 'float64'})
            df_assets['extracted'] = now

            self.refined_assets = df_assets
            
            return True
        except Exception as e:
            print(f"something unexpected happened: {e}")
            return False

    def refine_exchanges(self, now:datetime) -> bool:
        try:
            df_exchanges = self.raw_exchanges.copy()
            df_exchanges = df_exchanges[df_exchanges.tradingPairs!='0']
            df_exchanges = df_exchanges[['exchangeId', 'name', 'rank', 'percentTotalVolume',
                                         'volumeUsd', 'exchangeUrl', 'updated']]
            df_exchanges.drop_duplicates(subset=['exchangeId'], keep='last', inplace=True)
            df_exchanges = df_exchanges.astype({ 'rank': 'int64',
                                                 'percentTotalVolume': 'float64',
                                                 'volumeUsd': 'float64',
                                                 'updated': 'datetime64[us]'
            })
            df_exchanges.dropna(subset=['percentTotalVolume', 'volumeUsd'])
            
            df_exchanges['extracted'] = now

            self.refined_exchanges = df_exchanges

            return True
        except Exception as e:
            print(f"something unexpected happened: {e}")
            return False

    def refine_markets(self, now:datetime) -> bool:
        try:
            df_markets = self.raw_markets.copy()
            df_markets = df_markets[['exchangeId', 'baseId', 'quoteId', 'priceUsd', 'updated']]
            df_markets.drop_duplicates(subset=['exchangeId', 'baseId', 'quoteId'], keep='last', inplace=True)
            df_markets = df_markets.astype({ 'priceUsd': 'float64',
                                             'updated': 'datetime64[us]'
            })
            
            filter_exchange = df_markets['exchangeId'].isin(self.refined_exchanges['exchangeId'])
            filter_quote = df_markets['quoteId'].isin(self.refined_assets['id'])
            filter_base = df_markets['baseId'].isin(self.refined_assets['id'])
            
            df_markets = df_markets[filter_exchange & filter_quote & filter_base]
            
            df_markets['extracted'] = now

            self.refined_markets = df_markets
            return True
        except Exception as e:
            print(f"something unexpected happened: {e}")
            return False


    def create_tables(self):
        try:
            asset_ddl_path = "./assets_ddl.sql"
            exchanges_ddl_path = "./exchanges_ddl.sql"
            markets_ddl_path = "./markets_ddl.sql"
            engine = create_engine(f'postgresql://{self.__pg_user}:{self.__pg_password}@{self.__pg_uri}:{self.__pg_port}/{self.__pg_database}')
            conn = engine.connect()
            for filepath in [asset_ddl_path, exchanges_ddl_path, markets_ddl_path]:
                with open(filepath, 'r') as file:
                    sql = file.read()
                    conn.execute(text(sql))
                    conn.commit()
                    
        except Exception as e:
             print(f"something unexpected happened: {e}")

    def insert_data_to_pg(self, table_name:str, df:pd.DataFrame) ->bool:
        try:
            engine = create_engine(f'postgresql://{self.__pg_user}:{self.__pg_password}@{self.__pg_uri}:{self.__pg_port}/{self.__pg_database}')
            df.to_sql(table_name, engine, if_exists='append', index=False)
            
        except Exception as e:
            print(f"something unexpected happened: {e}")
        

    def start_monitor(self):

        self.create_tables()
        while True:
            now = datetime.now()
            self.get_endpoint('/assets')
            self.write_lake('raw', 'assets', self.raw_assets, now)
            
            self.get_endpoint('/exchanges')
            self.write_lake('raw', 'exchanges', self.raw_exchanges, now)
            
            self.get_endpoint('/markets')
            self.write_lake('raw', 'markets', self.raw_markets, now)
    
            self.refine_assets(now)
            self.write_lake('refined', 'assets', self.refined_assets, now)
    
            self.refine_exchanges(now)
            self.write_lake('refined', 'exchanges', self.refined_exchanges, now)
    
            self.refine_markets(now)
            self.write_lake('refined', 'markets', self.refined_markets, now)
    
            self.insert_data_to_pg('Assets', self.refined_assets)
            self.insert_data_to_pg('Exchanges', self.refined_exchanges)
            self.insert_data_to_pg('Markets', self.refined_markets)

            time.sleep(60)
        
        


In [None]:
monitor = CoinCap_monitor()

In [None]:
monitor.start_monitor()