In [56]:
import requests
import pyspark.sql as ps
from dotenv import load_dotenv
import os
import psycopg
import logging
import json

In [33]:
load_dotenv()

True

In [62]:
# Logging setup (write logs to file with timestamps)
LOG_FILE = "crypto_data.log"
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler(LOG_FILE),  # Write logs to a file
        logging.StreamHandler()  # Also log to console
    ],
)

In [34]:
API_KEY = os.getenv("API_KEY")

DB_PARAMS = {
    "dbname": os.getenv("DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "host": os.getenv("DB_HOST"),
    "port": os.getenv("DB_PORT"),
}

In [42]:
# Fetch data from the API

def fetch_data(API_KEY,CURRENCY):

    headers = {
    "accept": "application/json",
    "x-cg-pro-api-key": API_KEY
    }
    url = f"https://api.coingecko.com/api/v3/coins/markets?vs_currency={CURRENCY}"
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to fetch data: {response.status_code}")

In [43]:
CURRENCY = "usd"
crypto_data = fetch_data(API_KEY,CURRENCY)

In [49]:
crypto_data[0]

{'id': 'bitcoin',
 'symbol': 'btc',
 'name': 'Bitcoin',
 'image': 'https://coin-images.coingecko.com/coins/images/1/large/bitcoin.png?1696501400',
 'current_price': 96734,
 'market_cap': 1920858747880,
 'market_cap_rank': 1,
 'fully_diluted_valuation': 1920860198757,
 'total_volume': 30636770766,
 'high_24h': 97341,
 'low_24h': 95368,
 'price_change_24h': 1365.92,
 'price_change_percentage_24h': 1.43227,
 'market_cap_change_24h': 28531166050,
 'market_cap_change_percentage_24h': 1.50773,
 'circulating_supply': 19858953.0,
 'total_supply': 19858968.0,
 'max_supply': 21000000.0,
 'ath': 108786,
 'ath_change_percentage': -11.06318,
 'ath_date': '2025-01-20T09:11:54.494Z',
 'atl': 67.81,
 'atl_change_percentage': 142580.75464,
 'atl_date': '2013-07-06T00:00:00.000Z',
 'roi': None,
 'last_updated': '2025-05-02T09:31:25.267Z'}

In [63]:
# Insert Crypto data into PostgreSQL
def insert_crypto_data(crypto_data):
    """Insert holiday data into PostgreSQL with batch processing.
    
    - If a holiday has no subdivisions, insert it for all subdivisions in the country.
    - If a holiday has specific subdivisions, insert it only for those subdivisions.
    """
    if not crypto_data:
        logging.info(f"No data to insert for {CURRENCY}.")
        return

    try:
        with psycopg.connect(**DB_PARAMS) as conn:
            with conn.cursor() as cur:
                insert_query = """
                    INSERT INTO crypto_data (
                        id, symbol, name, image, current_price, market_cap, market_cap_rank, 
                        fully_diluted_valuation, total_volume, high_24h, low_24h, 
                        price_change_24h, price_change_percentage_24h, 
                        market_cap_change_24h, market_cap_change_percentage_24h, 
                        circulating_supply, total_supply, max_supply, 
                        ath, ath_change_percentage, ath_date, 
                        atl, atl_change_percentage, atl_date, 
                        roi, last_updated
                    ) 
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE;
                    """
                values = []

                for crypto in crypto_data:
                    id_ = crypto.get("id")
                    symbol = crypto.get("symbol")
                    name = crypto.get("name")
                    image = crypto.get("image")
                    current_price = crypto.get("current_price")
                    market_cap = crypto.get("market_cap")
                    market_cap_rank = crypto.get("market_cap_rank")
                    fully_diluted_valuation = crypto.get("fully_diluted_valuation")
                    total_volume = crypto.get("total_volume")
                    high_24h = crypto.get("high_24h")
                    low_24h = crypto.get("low_24h")
                    price_change_24h = crypto.get("price_change_24h")
                    price_change_percentage_24h = crypto.get("price_change_percentage_24h")
                    market_cap_change_24h = crypto.get("market_cap_change_24h")
                    market_cap_change_percentage_24h = crypto.get("market_cap_change_percentage_24h")
                    circulating_supply = crypto.get("circulating_supply")
                    total_supply = crypto.get("total_supply")
                    max_supply = crypto.get("max_supply")
                    ath = crypto.get("ath")
                    ath_change_percentage = crypto.get("ath_change_percentage")
                    ath_date = crypto.get("ath_date")
                    atl = crypto.get("atl")
                    atl_change_percentage = crypto.get("atl_change_percentage")
                    atl_date = crypto.get("atl_date")
                    roi = crypto.get("roi")  # might be None or dict
                    roi_json = json.dumps(roi) if roi else None
                    last_updated = crypto.get("last_updated")

                    values.append((
                        id_, symbol, name, image, current_price, market_cap, market_cap_rank,
                        fully_diluted_valuation, total_volume, high_24h, low_24h,
                        price_change_24h, price_change_percentage_24h,
                        market_cap_change_24h, market_cap_change_percentage_24h,
                        circulating_supply, total_supply, max_supply,
                        ath, ath_change_percentage, ath_date,
                        atl, atl_change_percentage, atl_date,
                        roi_json, last_updated
                    ))

                # Ensure there is data to insert
                if values:
                    # print(f"Values to be inserted: {values[0]}")
                    # cur.execute(insert_query, values[0])
                    cur.executemany(insert_query, values)
                    conn.commit()
                    logging.info(f"Inserted {len(values)} crypto records.")
                else:
                    logging.info("No valid crypto data to insert.")

    except Exception as e:
        logging.error(f"Error inserting Crypto Data for {CURRENCY}: {e}")

In [64]:
insert_crypto_data(crypto_data)