In [4]:
#!/usr/bin/env python
# coding: utf-8

import requests # type: ignore
import json
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import insert

from datetime import datetime

# ETL Pipeline for loading cryptocurrency data

# 1. API Call to Fetch Cryptocurrency Data
def fetch_crypto_data(api_key, url, parameters, headers):
    try:
        response = requests.get(url, headers=headers, params=parameters)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from API: {e}")
        return None

# 2. Process the Raw Data into a DataFrame
def process_crypto_data(raw_data):
    organized_data = []
    for item in raw_data.get('data', []):
        btc_info = {
            "id": item['id'],
            "name": item['name'],
            "symbol": item['symbol'],
            "cmc_rank": item['cmc_rank'],
            "price": item['quote']['USD']['price'],
            "volume_24h": item['quote']['USD']['volume_24h'],
            "market_cap": item['quote']['USD']['market_cap'],
            "market_cap_dominance": item['quote']['USD']['market_cap_dominance'],
            "circulating_supply": item['circulating_supply'],
            "max_supply": item['max_supply'],
            "percent_change_1h": item['quote']['USD']['percent_change_1h'],
            "percent_change_24h": item['quote']['USD']['percent_change_24h'],
            "percent_change_7d": item['quote']['USD']['percent_change_7d'],
            "last_updated": item['quote']['USD']['last_updated']
        }
        organized_data.append(btc_info)
    df = pd.DataFrame(organized_data)
    # Convert last_updated to datetime and add pulled_at
    df["last_updated"] = pd.to_datetime(df["last_updated"])
    df["pulled_at"] = datetime.now()
    return df

# 3. Validate and Truncate Data to Match Database Constraints
def validate_and_truncate(df):
    df["symbol"] = df["symbol"].str[:10]
    df["name"] = df["name"].str[:50]
    df["price"] = df["price"].clip(upper=10**12 - 1)
    df["volume_24h"] = df["volume_24h"].clip(upper=10**12 - 1)
    df["market_cap"] = df["market_cap"].clip(upper=10**12 - 1)
    df["market_cap_dominance"] = df["market_cap_dominance"].clip(upper=999.99)
    df["circulating_supply"] = df["circulating_supply"].clip(upper=10**12 - 1)
    df["max_supply"] = df["max_supply"].clip(upper=10**12 - 1)
    df["percent_change_1h"] = df["percent_change_1h"].clip(-9999.9999, 9999.9999)
    df["percent_change_24h"] = df["percent_change_24h"].clip(-9999.9999, 9999.9999)
    df["percent_change_7d"] = df["percent_change_7d"].clip(-9999.9999, 9999.9999)
    df = df.dropna(subset=["symbol", "last_updated"])
    return df

# 4. Upsert Data into the Database
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.postgresql import insert

def upsert_crypto_data(df, table_name, engine):
    """
    Upsert data into the database table, ensuring no duplicate records are inserted.
    Records are considered unique based on 'symbol' and 'last_updated'.
    """
    # Drop the 'id' column if it exists, as it should be auto-generated by the database
    if "id" in df.columns:
        df = df.drop(columns=["id"])

    # Convert 'last_updated' to UTC timezone to ensure consistency
    df["last_updated"] = pd.to_datetime(df["last_updated"]).dt.tz_convert("UTC")

    # Convert DataFrame to a list of dictionaries for bulk insert
    records = df.to_dict(orient="records")

    # Create a MetaData instance and reflect the table schema
    metadata = MetaData()
    metadata.reflect(bind=engine)
    table = Table(table_name, metadata, autoload_with=engine)  # Define the table object

    # Prepare the insert statement with ON CONFLICT logic
    stmt = insert(table).values(records)
    stmt = stmt.on_conflict_do_nothing(index_elements=["symbol", "last_updated"])

    # Execute the upsert operation
    with engine.connect() as connection:
        try:
            result = connection.execute(stmt)
            print(f"Upsert completed. {result.rowcount} new records inserted.")
        except Exception as e:
            print(f"Error during upsert: {e}")


# Main function to execute ETL pipeline
def main():
    # API Setup
    api_key = '42485936-1986-4342-9e0a-e854c8b0fe47'
    url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'
    parameters = {'start': '1', 'limit': '100', 'convert': 'USD'}
    headers = {'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': api_key}

    # Database Configuration
    DB_USER = "postgres"
    DB_HOST = "localhost"
    DB_PORT = "5432"
    DB_NAME = "signal"
    table_name = "crypto_price_history"
    DATABASE_URL = f"postgresql+psycopg2://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
    engine = create_engine(DATABASE_URL)

    # Fetch, Process, Validate, and Load Data
    raw_data = fetch_crypto_data(api_key, url, parameters, headers)
    if raw_data:
        df = process_crypto_data(raw_data)
        df_cleaned = validate_and_truncate(df)
        upsert_crypto_data(df_cleaned, table_name, engine)

if __name__ == "__main__":
    main()



Upsert completed. 100 new records inserted.


In [5]:
raw_data = fetch_crypto_data(api_key, url, parameters, headers)
if raw_data:
    print(json.dumps(raw_data, indent=4))  # Print the raw data for debugging


{
    "status": {
        "timestamp": "2024-12-22T22:16:44.362Z",
        "error_code": 0,
        "error_message": null,
        "elapsed": 21,
        "credit_count": 1,
        "notice": null,
        "total_count": 10428
    },
    "data": [
        {
            "id": 1,
            "name": "Bitcoin",
            "symbol": "BTC",
            "slug": "bitcoin",
            "num_market_pairs": 11849,
            "date_added": "2010-07-13T00:00:00.000Z",
            "tags": [
                "mineable",
                "pow",
                "sha-256",
                "store-of-value",
                "state-channel",
                "coinbase-ventures-portfolio",
                "three-arrows-capital-portfolio",
                "polychain-capital-portfolio",
                "binance-labs-portfolio",
                "blockchain-capital-portfolio",
                "boostvc-portfolio",
                "cms-holdings-portfolio",
                "dcg-portfolio",
                "dragonfl

In [9]:
# Database Configuration
DB_USER = "postgres"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "signal"
table_name = "crypto_price_history"
DATABASE_URL = f"postgresql+psycopg2://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(DATABASE_URL)

from sqlalchemy import inspect
inspector = inspect(engine)
if "crypto_price_history" not in inspector.get_table_names():
    print("Table 'crypto_price_history' does not exist.")
else:
    print(inspector.get_columns("crypto_price_history"))



[{'name': 'id', 'type': INTEGER(), 'nullable': False, 'default': "nextval('crypto_price_history_id_seq'::regclass)", 'autoincrement': True, 'comment': None}, {'name': 'symbol', 'type': VARCHAR(length=10), 'nullable': False, 'default': None, 'autoincrement': False, 'comment': None}, {'name': 'name', 'type': VARCHAR(length=50), 'nullable': True, 'default': None, 'autoincrement': False, 'comment': None}, {'name': 'cmc_rank', 'type': INTEGER(), 'nullable': True, 'default': None, 'autoincrement': False, 'comment': None}, {'name': 'price', 'type': NUMERIC(precision=20, scale=8), 'nullable': True, 'default': None, 'autoincrement': False, 'comment': None}, {'name': 'volume_24h', 'type': NUMERIC(precision=20, scale=8), 'nullable': True, 'default': None, 'autoincrement': False, 'comment': None}, {'name': 'market_cap', 'type': NUMERIC(precision=20, scale=8), 'nullable': True, 'default': None, 'autoincrement': False, 'comment': None}, {'name': 'market_cap_dominance', 'type': NUMERIC(precision=5, s

In [28]:
import requests
import pandas as pd


# replace the "demo" apikey below with your own key from https://www.alphavantage.co/support/#api-key
url = 'https://www.alphavantage.co/query?function=TREASURY_YIELD&interval=daily&maturity=10year&apikey=SFRHBUTCXB3RDG5S'
r = requests.get(url)
data = r.json()

df = pd.DataFrame(data['data'])
df.head()


KeyError: 'data'

In [26]:
import requests
import pandas as pd

# replace the "demo" apikey below with your own key from https://www.alphavantage.co/support/#api-key
url = 'https://www.alphavantage.co/query?function=FEDERAL_FUNDS_RATE&interval=daily&apikey=SFRHBUTCXB3RDG5S'
r = requests.get(url)
data = r.json()

df = pd.DataFrame(data['data'])
df.head()

Unnamed: 0,date,value
0,2025-01-02,4.33
1,2025-01-01,4.33
2,2024-12-31,4.33
3,2024-12-30,4.33
4,2024-12-29,4.33


In [24]:
import requests
import pandas as pd

# replace the "demo" apikey below with your own key from https://www.alphavantage.co/support/#api-key
url = 'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=VOO&apikey=SFRHBUTCXB3RDG5S'
r = requests.get(url)
data = r.json()

time_series = data['Time Series (Daily)']

df = pd.DataFrame.from_dict(time_series, orient='index')

df.rename(columns={
    '1. open': 'open',
    '2. high': 'high',
    '3. low': 'low',
    '4. close': 'close',
    '5. volume': 'volume'
}, inplace=True)

df.index = pd.to_datetime(df.index)
df.reset_index(inplace=True)

# Rename the index column to 'date'
df.rename(columns={'index': 'date'}, inplace=True)

# Ensure numerical columns are of the correct type
for col in ['open', 'high', 'low', 'close', 'volume']:
    df[col] = pd.to_numeric(df[col])

df.head()

Unnamed: 0,date,open,high,low,close,volume
0,2025-01-03,540.19,544.88,539.2,544.4,6419663
1,2025-01-02,542.02,543.5399,533.795,537.46,7142698
2,2024-12-31,542.45,543.0651,537.4001,538.81,6040750
3,2024-12-30,540.56,544.09,537.4,540.99,6505089
4,2024-12-27,549.37,549.62,543.2001,547.08,7077135


In [23]:
import requests
import pandas as pd

# replace the "demo" apikey below with your own key from https://www.alphavantage.co/support/#api-key
url = 'https://www.alphavantage.co/query?function=NEWS_SENTIMENT&CRYPTO:BTC&apikey=SFRHBUTCXB3RDG5S'
r = requests.get(url)
data = r.json()

feed_data = data['feed']

df = pd.DataFrame(feed_data)

df.head()

Unnamed: 0,title,url,time_published,authors,summary,banner_image,source,category_within_source,source_domain,topics,overall_sentiment_score,overall_sentiment_label,ticker_sentiment
0,"ROSEN, SKILLED INVESTOR COUNSEL, Encourages Tr...",https://www.benzinga.com/pressreleases/25/01/g...,20250105T172500,[Globe Newswire],"NEW YORK, Jan. 05, 2025 ( GLOBE NEWSWIRE ) -- ...",https://www.benzinga.com/next-assets/images/sc...,Benzinga,News,www.benzinga.com,"[{'topic': 'Energy & Transportation', 'relevan...",0.182901,Somewhat-Bullish,"[{'ticker': 'META', 'relevance_score': '0.0621..."
1,"ROSEN, A LEADING LAW FIRM, Encourages Cassava ...",https://www.benzinga.com/pressreleases/25/01/g...,20250105T171200,[Globe Newswire],"NEW YORK, Jan. 05, 2025 ( GLOBE NEWSWIRE ) -- ...",https://www.benzinga.com/next-assets/images/sc...,Benzinga,News,www.benzinga.com,"[{'topic': 'Life Sciences', 'relevance_score':...",0.217214,Somewhat-Bullish,"[{'ticker': 'META', 'relevance_score': '0.0600..."
2,"'Don't Invest In Rap, Restaurant And Liquor Co...",https://www.benzinga.com/news/25/01/42805180/d...,20250105T170217,[Adrian Volenik],"Mark Cuban, billionaire investor and part-owne...",https://cdn.benzinga.com/files/images/story/20...,Benzinga,News,www.benzinga.com,"[{'topic': 'Life Sciences', 'relevance_score':...",0.464739,Bullish,"[{'ticker': 'ABNB', 'relevance_score': '0.0641..."
3,Before you continue,https://consent.google.com/m,20250105T165341,[],"Looking back to look ahead to 2025: Economy, c...",,Business Standard,GoogleRSS,consent.google.com,"[{'topic': 'Technology', 'relevance_score': '1...",0.086343,Neutral,"[{'ticker': 'GOOG', 'relevance_score': '0.2130..."
4,Ray Dalio's Timeless Advice: 'If You Don't Own...,https://www.benzinga.com/markets/25/01/4280513...,20250105T164715,[Bibhu Pattnaik],"Ray Dalio predicts a looming economic shift, u...",https://cdn.benzinga.com/files/images/story/20...,Benzinga,Markets,www.benzinga.com,"[{'topic': 'Economy - Monetary', 'relevance_sc...",0.1111,Neutral,[]
