In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
import os

In [None]:
def run_notebook(notebook_path, output_path):
    with open(notebook_path) as nb_file:
        nb_contents = nbformat.read(nb_file, as_version=4)
    
    ep = ExecutePreprocessor(timeout=600, kernel_name='python3')
    ep.preprocess(nb_contents, {'metadata': {'path': notebook_path}})
    
    with open(output_path, mode='wt') as f:
        nbformat.write(nb_contents, f)

In [None]:
# Define the default_args dictionary
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['vaibhavshanbhag96@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

In [None]:
# Define the DAG
dag = DAG(
    'user_interactions_etl_dag',
    default_args=default_args,
    description='ETL pipeline for user interactions data',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 7, 7),
    tags=['user interactions', 'transformation'],
)

In [21]:
# Define the task using PythonOperator
notebook_path = '../scripts/run_etl_script.ipynb'

run_notebook_task = PythonOperator(
    task_id='run_notebook',
    python_callable=run_notebook,
    op_args=[notebook_path],
    dag=dag,
)

In [None]:
# Set the task dependencies
run_notebook_task