Back in time, storing data was expansive.
Therefore, data had to be transformed before being ingested into the DB.
With the advent of Cloud Computing, these costs have decreased and storing data has become affordable, leading to the development of ELT pipelines: now, data is directly processed in the database.
Here, the aim is to use Python only to ingest data and transform it with SQL.
Ultimately, it is to show that many tools can be used.
However, I think that extracting and ingesting data is easier with Python (but would like to try with Java or JS).


In [1]:
import os
import requests
import json
import pandas as pd
from datetime import datetime
import time

In [2]:
API_KEY = os.environ.get("CRYPTO_API_KEY")
CRYPTO_CURRENCY = "BTC"
LOCAL_CURRENCY = "EUR"
BATCH_SIZE = 2000
API_ENDPOINT = "histohour"

In [3]:
def get_historical_earliest_date() -> int:
    
    request = requests.get(f"https://min-api.cryptocompare.com/data/v2/histoday?fsym={CRYPTO_CURRENCY}&tsym={LOCAL_CURRENCY}&allData=true&{API_KEY}")
    rep = json.loads(request.content)

    return rep["Data"]["TimeFrom"]

In [4]:
def get_hourly_batch_data(toTs: int) -> int | list:

    request = requests.get(f"https://min-api.cryptocompare.com/data/v2/{API_ENDPOINT}?fsym={CRYPTO_CURRENCY}&tsym={LOCAL_CURRENCY}&limit={str(BATCH_SIZE)}&toTs={toTs}&{API_KEY}")
    
    if request.status_code != 200:
        raise Exception(request.content)

    rep = json.loads(request.content)
    data = rep["Data"]["Data"]
    batch_earliest_date = rep["Data"]["TimeFrom"]

    return (data, batch_earliest_date)

# 1. Extract

In [5]:
historical_earliest_date = get_historical_earliest_date()

now = time.time()
hourly_data, batch_earliest_date = get_hourly_batch_data(now)
hourly_data, batch_earliest_date

while batch_earliest_date > historical_earliest_date:

    prev_hourly_data, batch_earliest_date = get_hourly_batch_data(batch_earliest_date)
    hourly_data += list(prev_hourly_data)

hourly_df = pd.json_normalize(hourly_data)

In [6]:
hourly_df

Unnamed: 0,time,high,low,open,volumefrom,volumeto,close,conversionType,conversionSymbol
0,1739512800,92643.500,92435.230,92531.210,30.85,2854809.59,92602.050,direct,
1,1739516400,92775.010,92580.810,92602.050,48.29,4475433.31,92685.170,direct,
2,1739520000,92740.570,92411.940,92685.170,72.07,6671331.42,92524.750,direct,
3,1739523600,92747.700,92499.730,92524.750,58.07,5378501.04,92691.800,direct,
4,1739527200,92810.430,92609.820,92691.800,53.86,4994069.73,92740.640,direct,
...,...,...,...,...,...,...,...,...,...
122056,1314698400,6.398,6.398,6.398,0.00,0.00,6.398,direct,
122057,1314702000,6.398,6.398,6.398,0.00,0.00,6.398,direct,
122058,1314705600,6.398,6.398,6.398,0.00,0.00,6.398,direct,
122059,1314709200,6.398,6.398,6.398,0.00,0.00,6.398,direct,


# 2. Load

SQLAlchemy is a Python library that provides a high-level abstraction for interacting with relational databases. It allows developers to write database-agnostic code by unifying access to different systems like SQLite, PostgreSQL, or MySQL through a consistent interface. Instead of managing separate drivers for each database, you can use SQLAlchemy to connect, insert data, and run queries in a way that’s both flexible and scalable, whether using raw SQL or Python-based table models (ORM).

In [7]:
from sqlalchemy import create_engine

In [8]:
PG_USERNAME = "postgres"
PG_PASSWORD = os.environ.get("PG_PASSWORD")
DB_NAME = "dev"

In [None]:
# Create an "Engine" which is an object enabling the connection with the database, like connect in sqlite3 library
# engine = create_engine("sqlite:///../database/crypto.db")
engine = create_engine(f"postgresql+pg8000://{PG_USERNAME}:{PG_PASSWORD}@localhost:5432/{DB_NAME}")

In [14]:
hourly_df.to_sql("crypto_hourly_elt", engine, if_exists="append", index=False)

122061