# CoinGecko API OHLC Price Ingestion Notebook
--------------------------------------------------------------------------------
## Description

This notebook fetches Open-High-Low-Close (OHLC) price data for selected cryptocurrencies from the CoinGecko API. It handles batching, basic rate limit mitigation, and writes the price data as JSON files to a raw landing volume for downstream processing.

--------------------------------------------------------------------------------
### Key Features:

-  Supports configurable batch size to control API request volume  
-  Allows flexible time ranges for OHLC data using the `days` parameter  
-  Implements basic retry logic for CoinGecko API rate limits  
-  Extracts OHLC price records and structures them for ingestion  
-  Writes raw JSON files to Databricks Volumes with timestamp-based unique filenames  
-  Serves as the upstream source for the Crypto Prices DLT pipeline  

In [0]:
# --------------------------------------------------------------------------------
# Import Required Libraries
# --------------------------------------------------------------------------------
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType
from datetime import datetime
import time
import requests


# --------------------------------------------------------------------------------
# Define Coins to Track and Global Variables
# --------------------------------------------------------------------------------
coins = [
    ("bitcoin", "BTC"),
    ("ethereum", "ETH"),
    ("binancecoin", "BNB"),
    ("solana", "SOL"),
    ("ripple", "XRP"),
    ("dogecoin", "DOGE"),
    ("cardano", "ADA"),
    ("polkadot", "DOT")
]

vs_currency = "usd"
ohlc_records = []


# --------------------------------------------------------------------------------
# Notebook Parameters: Batch Size and Days
# --------------------------------------------------------------------------------
dbutils.widgets.text("batch_size", "3", "Batch Size")
dbutils.widgets.text("days", "30", "Days for OHLC Data")

batch_size = int(dbutils.widgets.get("batch_size"))
days = dbutils.widgets.get("days").lower()

# Allowed values as per CoinGecko documentation
allowed_days = {"1", "7", "14", "30", "90", "180", "365", "max"}
if days not in allowed_days:
    raise ValueError(f"Invalid days parameter: {days}. Allowed values are: {', '.join(allowed_days)}")



# --------------------------------------------------------------------------------
# Fetch OHLC Data in Batches with Basic Rate Limit Handling
# --------------------------------------------------------------------------------
for i in range(0, len(coins), batch_size):
    batch = coins[i:i + batch_size]
    for coin_id, symbol in batch:
        url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc?vs_currency={vs_currency}&days={days}"

        def fetch_ohlc():
            response = requests.get(url)
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                print(f"Rate limit hit for {symbol}. Retrying in 60s...")
                time.sleep(60)
                response_retry = requests.get(url)
                if response_retry.status_code == 200:
                    return response_retry.json()
                else:
                    print(f"Still failed to fetch OHLC data for {symbol}: {response_retry.status_code} - {response_retry.text}")
            else:
                print(f"Failed to fetch OHLC data for {symbol}: {response.status_code} - {response.text}")
            return None

        data = fetch_ohlc()
        if data:
            for entry in data:
                ohlc_records.append(\
                    (symbol, datetime.utcfromtimestamp(entry[0] / 1000), float(entry[1]), float(entry[2]), float(entry[3]), float(entry[4]))
                )

        time.sleep(1)

    print("Batch complete. Waiting before next batch...")
    time.sleep(5)



# --------------------------------------------------------------------------------
# Create Spark DataFrame and Write to Raw Landing Volume
# --------------------------------------------------------------------------------
if ohlc_records:
    schema = StructType([
        StructField("symbol", StringType(), False),
        StructField("timestamp", TimestampType(), True),
        StructField("open", FloatType(), True),
        StructField("high", FloatType(), True),
        StructField("low", FloatType(), True),
        StructField("close", FloatType(), True)
    ])

    df = spark.createDataFrame(ohlc_records, schema)

    volume_path = "/Volumes/tabular/dataexpert/sakethg/capstone/raw/crypto_prices"
    timestamp_str = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    unique_filename = f"coingecko_ohlc_{timestamp_str}.json"
    full_path = f"{volume_path}/{unique_filename}"

    df.write.mode("overwrite").json(full_path)

    print(f"OHLC price data written to {full_path}")
else:
    print("No OHLC records inserted.")