# Cryptocurrency Data Ingestion Pipeline

### Authors

|    Student Name                 |    Student Number  |
|---------------------------------|--------------------|
| Raj Sandhu                      | 101111960          |
| Akaash Kapoor                   | 101112895          |
| Ali Alvi                        | 101114940          |
| Hassan Jallad                   | 101109334          |
| Areeb Ul Haq                    | 101115337          |
| Ahmad Abuoudeh                  | 101072636          |


## Libraries to Import

In [1]:
import requests
import json
import pandas as pd
import re
import time
import logging
from datetime import datetime, date
import os
import path

## Initialize Logging Configurations

In [2]:
#This data ingestion notebook incorporates logging in order to observw which the data that is being ingested, as well as to note any errors that may
#occur. This is following best practices in industry to ensure only the required data is being ingested, and any errors that occur can be easily
#resolved.

#Configure path for the log file to be stored.

parent_folder = os.path.dirname(os.path.dirname(os.getcwd()))
log_parent_folder = "references"
log_folder = "logs"
log_file_path = os.path.join(parent_folder, log_parent_folder, log_folder)

if os.path.isdir(log_file_path) == False :
    os.mkdir(log_file_path) #Need to create log directory since it was not pre-made.

#Functions to print informative messages and errors if they occur in the log file.
def log_info(message) :
    log.info(message)
    
def log_warning(message) :
    log.warning(message)

def log_error(message):
    log.error(message)
    log.info("Notebook has ended.")
    logstream.close()

#Set log handlers and remove any preexisting handlers if they exist
log = logging.getLogger("cryptocurrency_data_ingestion")
log.setLevel(logging.INFO)

if log.hasHandlers():
    for handler in log.handlers:
        log.removeHandler(handler)

logstream = open(log_file_path + "\data-ingestion-log.log", mode = "w")
handler = logging.StreamHandler(logstream)
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("%(asctime)s | %(name)s | %(levelname)s | %(message)s"))
log.addHandler(handler)

## Compute Dates

In [3]:
#These dates represent dynamic parameters to be fed into the appropriate API url to retreive relevant coin information from the current date to one 
#year ago in the past.
log_info("Notebook has begun...")
log_info("Aquiring configurable dates.")
current_date = datetime.today().strftime("%Y-%m-%d") #Todays date
current_year = datetime.today().strftime("%Y")
current_month = datetime.today().strftime("%m")
current_day = datetime.today().strftime("%d")
prev_year = str(int(current_year) - 1)
prev_date = prev_year + "-" + current_month + "-" + current_day #Date one year from today.

## Configure Destination Paths for Ingested Data to be Stored

In [4]:
log_info("Creating destination paths.")
data_parent_folder = "data"
raw_data_folder = "raw"
interim_data_folder = "interim"

raw_data_file_path = os.path.join(parent_folder, data_parent_folder, raw_data_folder)
interim_data_file_path = os.path.join(parent_folder, data_parent_folder, interim_data_folder)

## Data Ingestion

In [5]:
#The first step in the ingestion process with be to get both the name and unique slug of the coin. The unique slug is what will be passed to the API 
#to query the relevant data for the specific coin.

coin_name = [] #Store the coin names here.
coin_slug = [] #Store to unique coin slugs here
coin_description = [] #Store coin descriptions as a single line here.

log_info("Retreiving coin names.")

try :
    coin_name_slug = json.loads(requests.get("https://coincodex.com/apps/coincodex/cache/all_coins.json").text) #Api endpoint to get coin names and slug.
    for i in range(0, 200) : #Get names and slug of 200 different coins. Allows for room for error with API querying.
        slug = coin_name_slug[i]['ccu_slug']
        name = coin_name_slug[i]["name"]
        coin_slug.append(slug)
        coin_name.append(name.lower().replace(" ", "-").replace("/", "-"))#Cleans name so it can be used as file name.
        time.sleep(2) #Invoke a sleep for 2 seconds to abide by ethical scraping to not overload the server with requests.
    log_info("Retreived all coin names and slugs.")
    
except Exception as e :
    log_error("ERROR: " + str(e))
    raise #Terminates program if there is an error.

#From here, the next step in the data ingestion pipeline will be to use the names and slugs obtained to ingest data of 150 coins. However, note that 
#sometimes the API returns the wrong slug for a coin. To deal with this, the names and slugs of 200 coins are obtained, to allow for a margin for error
#such that if the written code detects an error, it will simply move on to the next coin.


time.sleep(5) #Once again, for ethical reasons, sleep for 10 seconds this time as to not overwhelm the server.
counter = 0 #Want to ingest data for 150 coins only

#The next step in the ingestion process will be the actual ingestion of the data itself, and saving it.

new_name = [] #Represent names of coins whose data is ingested.

for slug, name in zip(coin_slug, coin_name) :
    #Call to API endpoint to receive data for a coin with the specified slug.
    #Pass the unique slug of the coin, and the date one year ago and todays date to the API.
    
    try :
        response = requests.get("https://coincodex.com/api/coincodexcoins/get_historical_data_by_slug/" + slug + "/" + prev_date + "/" + current_date + "/1?t=5459791")
        if response.status_code != 200 :
            log_warning("Invalid slug obtained from API with coin name: " + name + ". Finding new coin.") #Sometimes API returns incorrect slug.
        else :
            coin_data = json.loads(requests.get("https://coincodex.com/api/coincodexcoins/get_historical_data_by_slug/" + slug + "/" + prev_date + "/" + current_date + "/1?t=5459791").text)
            coin_dataframe = pd.DataFrame(coin_data['data']) #Select all data for specified coin.
            new_coin_dataframe = coin_dataframe.iloc[:, 2:9] #Gives all of the data in a standard format (USD).
                
            #Save coin data for a specific coin in a dataframe as raw data.
        
            with open(os.path.join(raw_data_file_path, name + ".csv"), "w") as file:
                new_coin_dataframe.to_csv(file, index = False)
            log_info("SUCCESS: Data aquired for: " + name)

            coin_text_data = coin_data['coin']['description'] #Get the description for the specified coin.
            if len(coin_text_data.strip()) == 0 :
                coin_description.append("Coin: " + name + " has no description.") #Default if API does not return coin description.
                log_info("SUCCESS: Description aquired for: " + name)
                new_name.append(name)
                counter +=1
                time.sleep(3) #Sleep for 3 seconds for ethical reasons, as to not overwhelm the server with requests.
            else :
                clean_coin_text = re.compile('<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});') #Use a regex expression to remove html tags from the description.
                clean_text = re.sub(clean_coin_text, '', coin_text_data) #Apply the regex expression.
                clean_text = "".join(clean_text.splitlines()) #Put description on a single line in order to add it to a dataframe.
                coin_description.append(clean_text)
                new_name.append(name)
                log_info("SUCCESS: Description aquired for: " + name)
                counter +=1
                time.sleep(3) #Sleep for 3 seconds for ethical reasons, as to not overwhelm the server with requests.
            
            if counter == 150 : 
                #Counter to break out of program once the data for 150 coins has been ingested.
                log_info("Data for 150 coins has been obtained.")
                break
    
    except Exception as e:
        log_error("ERROR: " + str(e) + slug)
        raise

coin_description_dictionary = {"name": new_name, "description": coin_description} #Create dictionary where key is coin name and value is description.

coin_description_dataframe = pd.DataFrame.from_dict(coin_description_dictionary) #Convert dictionary into pandas dataframe.

with open(os.path.join(interim_data_file_path, "coin-descriptions.csv"), "w") as f :
    coin_description_dataframe.to_csv(f, index = False) #Write it to interim folder since data has been transformed and undergone intermediate cleaning.

#Begin ending notebook.
log_info("SUCCESS: All Coin data descriptions aquired.")
log.info("Notebook ending.")
logstream.close()