</p>

# ETL Pipeline for Real-time Crypto Market Data Analysis

This is an ETL pipeline project that extracts data on crypto coin market from <a href="https://coinmarketcap.com/api/documentation/v1/#">CoinMarketCap API</a>, transforms the data by cleaning and normalizing it, and loads it into a CSV database. The data can then be used to create a bar chart or line graph visualization to show the consumption trends over time.

<h2 id="import_data">Import Libararies</h2>

In [None]:
# Import libraries

from requests import Request, Session
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import json
import pandas as pd
import os
import datetime

<h2 id="import_data">1. Extract</h2>

The extract stage is where data is gathered from the API. In this code snippet, data is extracted from the CoinMarketCap API using the **requests** library. The API endpoint, parameters, and headers are defined, and a session is created to handle the API request. The API response is then parsed into a JSON object and returned.

In [2]:
#Pull data from CoinMarketCap API

def extract_data():
    url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest' 
    #Original Sandbox Environment: 'https://sandbox-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'
    parameters = {
        'start':'1',
        'limit':'10', #limit to top 10 crypto coins
        'convert':'USD'
    }
    headers = {
        'Accepts': 'application/json',
        'X-CMC_PRO_API_KEY': '0ad53085-1cb2-4eb8-ad9e-3ffbd7e56509',
    }
    session = Session()
    session.headers.update(headers)
    try:
        response = session.get(url, params=parameters)
        data = json.loads(response.text)
        print(data)
        return data
    except (ConnectionError, Timeout, TooManyRedirects) as e:
        print(e)

<h2 id="import_data">2. Transform</h2>

The transform stage is where the data is cleaned, transformed, and prepared for loading into the database. In this code snippet, the data is first normalized using the **pandas** library, then various data cleaning and transformation operations are performed such as calculating the mean of different columns, stacking the dataframe, renaming columns and replacing values. The transformed data is then returned.

In [None]:
def transform_data(data):
    try:
        # convert the nested JSON object into dataframe
        df = pd.json_normalize(data['data'])
        # look at the coin trends over time
        df2 = df.groupby('name', sort=False)[['quote.USD.percent_change_1h','quote.USD.percent_change_24h','quote.USD.percent_change_7d','quote.USD.percent_change_30d','quote.USD.percent_change_60d','quote.USD.percent_change_90d']].mean()
        # stacked the data to be able to visualize
        df3 = df2.stack()
        # convert from series to dataframe and reset the index
        df3 = df3.to_frame(name='values').reset_index()
        # rename the column to represent the coin trend
        df3.rename(columns={'level_1':'percent_change'}, inplace = True)
        # rename ambiguous title to simplified ones
        df3['percent_change'] = df3['percent_change'].replace(['quote.USD.percent_change_1h','quote.USD.percent_change_24h','quote.USD.percent_change_7d','quote.USD.percent_change_30d','quote.USD.percent_change_60d','quote.USD.percent_change_90d'],['1h','24h','7d','30d','60d','90d'])
        # add time stamp the data was collected
        df3['timestamp'] = pd.to_datetime('now')
        return df3
    except Exception as e:
        print(e)

<h2 id="import_data">3. Load</h2>

The load stage is where the transformed data is loaded into the database. In this code snippet, the transformed data is saved as a CSV file using the **pandas** library, and then read back into a dataframe for further analysis or visualization. The function also calls **run_api_repeatedly()** to run the ETL pipeline again and again.

In [None]:
def load_data(transformed_data):
    try:
        print(transformed_data)
        # create a csv and append data to it
        if not os.path.isfile(r'C:\Users\Muham\Portfolio Projects\CryptoETL.csv'):
            transformed_data.to_csv(r'C:\Users\Muham\Portfolio Projects\CryptoETL.csv', header = 'column_names')
        else:
            transformed_data.to_csv(r'C:\Users\Muham\Portfolio Projects\CryptoETL.csv', mode = 'a', header = False)
        loaded_data = pd.read_csv(r'C:\Users\Muham\Portfolio Projects\CryptoETL.csv')
        run_api_repeatedly()
        return loaded_data
    except Exception as e:
        print(e)

#Run the API function based on the specific criteria      
def run_api_repeatedly():
    import os
    from time import time
    from time import sleep
    for i in range(333): #CoinMarketCap API allow 333 runs per day
        print('API Runner completed successfully')
        sleep(60) #sleep for 1 minute
    exit()

<h2 id="import_data">Calling ETL Pipeline</h2>

This function is responsible for executing all the steps of the ETL pipeline and also it takes care of any errors that occur during the process, it returns a message indicating whether the pipeline was executed successfully or not.

In [5]:
def run_etl():
    try:
        data = extract_data()
        transformed_data = transform_data(data)
        loaded_data = load_data(transformed_data)
        print("ETL pipeline completed successfully.")
        return loaded_data
    except Exception as e:
        print("Error in ETL pipeline: {}".format(str(e)))

In [6]:
#Call the function to execute ETL pipeline
run_etl()

{'status': {'timestamp': '2023-01-17T04:29:24.588Z', 'error_code': 0, 'error_message': None, 'elapsed': 18, 'credit_count': 1, 'notice': None, 'total_count': 8865}, 'data': [{'id': 1, 'name': 'Bitcoin', 'symbol': 'BTC', 'slug': 'bitcoin', 'num_market_pairs': 9933, 'date_added': '2013-04-28T00:00:00.000Z', 'tags': ['mineable', 'pow', 'sha-256', 'store-of-value', 'state-channel', 'coinbase-ventures-portfolio', 'three-arrows-capital-portfolio', 'polychain-capital-portfolio', 'binance-labs-portfolio', 'blockchain-capital-portfolio', 'boostvc-portfolio', 'cms-holdings-portfolio', 'dcg-portfolio', 'dragonfly-capital-portfolio', 'electric-capital-portfolio', 'fabric-ventures-portfolio', 'framework-ventures-portfolio', 'galaxy-digital-portfolio', 'huobi-capital-portfolio', 'alameda-research-portfolio', 'a16z-portfolio', '1confirmation-portfolio', 'winklevoss-capital-portfolio', 'usv-portfolio', 'placeholder-ventures-portfolio', 'pantera-capital-portfolio', 'multicoin-capital-portfolio', 'parad

  df3['timestamp'] = pd.to_datetime('now')


API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully
API Runner completed successfully


KeyboardInterrupt: 