In [14]:
import requests
import pandas as pd

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

from kraken_api import get_market_data

In [15]:
TICKER = "XXBTZUSD"
START_DATE = datetime(2025, 1, 1)
RETRY_DELAY = timedelta(minutes=5)

In [17]:
def get_market_data(**kwargs):
    # Daily interval
    interval = str(60*24)

    # Construct the API URL for fetching OHLC data
    url = f"https://api.kraken.com/0/public/OHLC?pair={ticker}&interval={interval}"

    # Set up request headers
    headers = {'Accept': 'application/json'}

    try:
        # Make the request to the Kraken API
        response = requests.request(
            "GET", url, headers=headers,
            data={}, timeout=10
        )
    except requests.exceptions.Timeout as e:
        # Print timeout exception message
        print(e)


    kwargs["ti"].xcom_push(
        key="api_data",
        value=response.json()['result'][TICKER])



def convert_json_to_df(**kwargs):
    
    data = kwargs["ti"].xcom_pull(
        key="api_data",
        task_ids="fetch_data_task"
    )
    
    # Parse the JSON response into a DataFrame
    df = pd.DataFrame(
        data,
        columns=['timestamp', 'open', 'high', 'low',
                    'close', 'vwap', 'volume', 'count'],
        dtype=float
    )

    # Convert timestamps to datetime and set as index
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
    
    # save to csv
    today = datetime.now().strftime("%Y-%m-%d")
    file_path = f"btc_data_{today}.csv"
    
    df.to_csv(
        path_or_buf=file_path,
        index=False
    )
    
    print(f"csv saved on {today} to {file_path}")

In [20]:
default_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": RETRY_DELAY
}

# Define a DAG to fetch market data
with DAG(
    'api_to_csv_dag',
    description="Fetch daily Kraken market API BTC data",
    schedule="@daily",
    start_date=START_DATE,
    catchup=False
) as dag:
    
    # Extract: Fetch data from Kraken API
    fetch_data_task = PythonOperator(
        task_id="fetch_data_task",
        python_callable=get_market_data,
    )
    
    # Transform: Convert JSON to Dataframe
    json_to_df_task = PythonOperator(
        task_id="json_to_df_task",
        python_callable=convert_json_to_df
    )
    
    # task dependencies
    fetch_data_task >> json_to_df_task