In [1]:
!pip install requests mysql-connector-python



In [1]:
!pip install pymysql

Collecting pymysql
  Downloading PyMySQL-1.1.1-py3-none-any.whl.metadata (4.4 kB)
Downloading PyMySQL-1.1.1-py3-none-any.whl (44 kB)
   ---------------------------------------- 0.0/45.0 kB ? eta -:--:--
   ------------------ --------------------- 20.5/45.0 kB ? eta -:--:--
   ---------------------------------------- 45.0/45.0 kB 739.0 kB/s eta 0:00:00
Installing collected packages: pymysql
Successfully installed pymysql-1.1.1


In [3]:
import pymysql

try:
    print("🔌 Connecting using PyMySQL...")
    conn = pymysql.connect(
        host="127.0.0.1",
        port=3306,
        user="root",
        password="root",
        database="crypto_db",
        connect_timeout=5
    )
    print("✅ Connected using PyMySQL!")
    conn.close()
except Exception as e:
    print("❌ Error:", e)


🔌 Connecting using PyMySQL...
✅ Connected using PyMySQL!


In [5]:
import requests
import pymysql
from datetime import datetime

try:
    # Step 1: Fetch BTC price from CoinGecko API
    print("📡 Fetching BTC price...")
    url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd"
    response = requests.get(url)
    response.raise_for_status()
    data = response.json()
    
    symbol = 'BTC'
    price = data['bitcoin']['usd']
    timestamp = datetime.now()

    print(f"💰 BTC Price: ${price} at {timestamp}")

    # Step 2: Connect to MySQL
    print("🔌 Connecting to MySQL...")
    conn = pymysql.connect(
        host="127.0.0.1",
        port=3306,
        user="root",
        password="root",
        database="crypto_db"
    )
    cursor = conn.cursor()

    # Step 3: Insert price into the database
    print("📝 Inserting into database...")
    cursor.execute("""
        INSERT INTO crypto_prices (symbol, price, timestamp)
        VALUES (%s, %s, %s)
    """, (symbol, price, timestamp))

    conn.commit()
    conn.close()

    print("✅ BTC price inserted successfully!")

except Exception as e:
    print("❌ Error:", e)


📡 Fetching BTC price...
💰 BTC Price: $85006 at 2025-04-14 15:41:14.137743
🔌 Connecting to MySQL...
📝 Inserting into database...
✅ BTC price inserted successfully!


In [7]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pymysql

# Default arguments
default_args = {
    'owner': 'airflow',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

# Function to fetch BTC price and store in MySQL
def fetch_and_store_btc_price():
    # Get BTC price
    url = "https://api.coindesk.com/v1/bpi/currentprice/BTC.json"
    response = requests.get(url)
    data = response.json()
    btc_price = data["bpi"]["USD"]["rate_float"]
    timestamp = data["time"]["updatedISO"]

    # Connect to MySQL
    conn = pymysql.connect(
        host="host.docker.internal",  # maps to your local machine
        user="root",
        password="root",
        database="crypto"
    )
    cursor = conn.cursor()

    # Insert data
    cursor.execute("CREATE TABLE IF NOT EXISTS btc_price (timestamp DATETIME, price FLOAT)")
    cursor.execute("INSERT INTO btc_price (timestamp, price) VALUES (%s, %s)", (timestamp, btc_price))
    conn.commit()
    cursor.close()
    conn.close()

# Define DAG
with DAG(
    dag_id='btc_price_fetcher',
    default_args=default_args,
    description='Fetch BTC price and load into MySQL every 10 minutes',
    schedule_interval='*/10 * * * *',  # Every 10 minutes
    start_date=datetime(2024, 4, 1),
    catchup=False,
    tags=['btc', 'mysql']
) as dag:

    fetch_price = PythonOperator(
        task_id='fetch_and_store_btc_price',
        python_callable=fetch_and_store_btc_price
    )


ImportError: cannot import name 'DAG' from 'airflow' (unknown location)