In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import os
import requests
import numpy as np

# Default arguments for DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 2, 24),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# File paths for storing data
RAW_DATA_PATH = "/opt/airflow/data/raw"
KAGGLE_CSV_PATH = os.path.join(RAW_DATA_PATH, "kaggle_telco_churn.csv")
SYNTHETIC_CSV_PATH = os.path.join(RAW_DATA_PATH, "synthetic_telco_churn.csv")

# Ensure raw data directory exists
os.makedirs(RAW_DATA_PATH, exist_ok=True)

# Kaggle dataset URL
kaggle_dataset_url = "https://raw.githubusercontent.com/dsrscientist/DSData/master/Telecom_customer_churn.csv"

# Function to fetch Kaggle dataset
def fetch_kaggle_data():
    response = requests.get(kaggle_dataset_url)
    if response.status_code == 200:
        with open(KAGGLE_CSV_PATH, 'wb') as f:
            f.write(response.content)
        print(f"Kaggle dataset saved at {KAGGLE_CSV_PATH}")
    else:
        print("Failed to fetch Kaggle dataset.")

# Function to generate synthetic dataset
def generate_synthetic_data(num_rows=1000):
    np.random.seed(42)

    synthetic_data = pd.DataFrame({
        "customerID": [f"ID{i}" for i in range(num_rows)],
        "gender": np.random.choice(["Male", "Female"], num_rows),
        "SeniorCitizen": np.random.choice([0, 1], num_rows),
        "Partner": np.random.choice(["Yes", "No"], num_rows),
        "Dependents": np.random.choice(["Yes", "No"], num_rows),
        "tenure": np.random.randint(1, 72, num_rows),
        "PhoneService": np.random.choice(["Yes", "No"], num_rows),
        "MultipleLines": np.random.choice(["Yes", "No", "No phone service"], num_rows),
        "InternetService": np.random.choice(["DSL", "Fiber optic", "No"], num_rows),
        "OnlineSecurity": np.random.choice(["Yes", "No", "No internet service"], num_rows),
        "OnlineBackup": np.random.choice(["Yes", "No", "No internet service"], num_rows),
        "DeviceProtection": np.random.choice(["Yes", "No", "No internet service"], num_rows),
        "TechSupport": np.random.choice(["Yes", "No", "No internet service"], num_rows),
        "StreamingTV": np.random.choice(["Yes", "No", "No internet service"], num_rows),
        "StreamingMovies": np.random.choice(["Yes", "No", "No internet service"], num_rows),
        "Contract": np.random.choice(["Month-to-month", "One year", "Two year"], num_rows),
        "PaperlessBilling": np.random.choice(["Yes", "No"], num_rows),
        "PaymentMethod": np.random.choice(["Electronic check", "Mailed check", "Bank transfer", "Credit card"], num_rows),
        "MonthlyCharges": np.round(np.random.uniform(18.25, 118.75, num_rows), 2),
        "TotalCharges": np.round(np.random.uniform(20.0, 8600.0, num_rows), 2),
        "Churn": np.random.choice(["Yes", "No"], num_rows)
    })

    synthetic_data.to_csv(SYNTHETIC_CSV_PATH, index=False)
    print(f"Synthetic dataset saved at {SYNTHETIC_CSV_PATH}")

# Define Airflow DAG
dag = DAG(
    'data_ingestion',
    default_args=default_args,
    description='Fetch Kaggle and Synthetic Telco Customer Churn Data',
    schedule_interval='@daily',
)

# Define tasks
fetch_kaggle_task = PythonOperator(
    task_id='fetch_kaggle_data',
    python_callable=fetch_kaggle_data,
    dag=dag,
)

generate_synthetic_task = PythonOperator(
    task_id='generate_synthetic_data',
    python_callable=generate_synthetic_data,
    dag=dag,
)

# Set task dependencies
fetch_kaggle_task >> generate_synthetic_task
