In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
import requests
import json
from kafka import KafkaProducer
import os

# Default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 2, 12),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# Kafka Configuration
KAFKA_BROKER = "localhost:9092"
KAFKA_TOPIC = "stock-data"
API_URL = "https://www.alphavantage.co/query"
API_KEY = "Y234F551SWLWHNQK"

def fetch_and_produce_to_kafka():
    """Fetch stock data and produce to Kafka"""
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER,
        value_serializer=lambda v: json.dumps(v).encode("utf-8")
    )
    
    params = {
        "function": "TIME_SERIES_INTRADAY",
        "symbol": "IBM",
        "interval": "5min",
        "outputsize": "compact",
        "apikey": API_KEY
    }
    
    try:
        response = requests.get(API_URL, params=params)
        data = response.json()
        
        if "Time Series (5min)" in data:
            for timestamp, values in data["Time Series (5min)"].items():
                record = {
                    "timestamp": timestamp,
                    "open": values["1. open"],
                    "high": values["2. high"],
                    "low": values["3. low"],
                    "close": values["4. close"],
                    "volume": values["5. volume"]
                }
                producer.send(KAFKA_TOPIC, record)
            producer.flush()
    except Exception as e:
        raise Exception(f"Error in fetch_and_produce_to_kafka: {str(e)}")
    finally:
        producer.close()

def start_streamlit():
    """Start Streamlit dashboard"""
    os.system("streamlit run /path/to/your/streamlit_app.py")

# Create DAG
with DAG(
    'stock_data_pipeline',
    default_args=default_args,
    description='Pipeline for stock data processing',
    schedule_interval='*/5 * * * *',  # Run every 5 minutes
    catchup=False
) as dag:

    # Task 1: Fetch data and produce to Kafka
    fetch_and_produce = PythonOperator(
        task_id='fetch_and_produce',
        python_callable=fetch_and_produce_to_kafka
    )

    # Task 2: Spark job to consume from Kafka and write to S3
    spark_consume = SparkSubmitOperator(
        task_id='spark_consume',
        application='/path/to/your/spark_consumer.py',
        conn_id='spark_default',
        conf={
            'spark.jars.packages': 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.apache.hadoop:hadoop-aws:3.2.0',
            'spark.hadoop.fs.s3a.access.key': 'YOUR_AWS_ACCESS_KEY',
            'spark.hadoop.fs.s3a.secret.key': 'YOUR_AWS_SECRET_KEY',
            'spark.hadoop.fs.s3a.endpoint': 's3.amazonaws.com'
        }
    )

    # Task 3: Start Streamlit dashboard
    streamlit_dashboard = PythonOperator(
        task_id='streamlit_dashboard',
        python_callable=start_streamlit
    )

    # Set dependencies
    fetch_and_produce >> spark_consume >> streamlit_dashboard

In [None]:
import os
import time
import json
import requests
from datetime import datetime, timedelta
from kafka import KafkaProducer
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# Default Arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 2, 12),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# Kafka Configuration
KAFKA_BROKER = "localhost:9092"
KAFKA_TOPIC = "stock-data"
API_URL = "https://www.alphavantage.co/query"
API_KEY = "Y234F551SWLWHNQK"

# Function: Fetch Data and Send to Kafka
def fetch_and_produce_to_kafka():
    """Fetch stock data from API and produce to Kafka"""
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER,
        value_serializer=lambda v: json.dumps(v).encode("utf-8")
    )

    params = {
        "function": "TIME_SERIES_INTRADAY",
        "symbol": "IBM",
        "interval": "5min",
        "outputsize": "compact",
        "apikey": API_KEY
    }

    try:
        response = requests.get(API_URL, params=params)
        data = response.json()

        if "Time Series (5min)" in data:
            for timestamp, values in data["Time Series (5min)"].items():
                record = {
                    "timestamp": timestamp,
                    "open": values["1. open"],
                    "high": values["2. high"],
                    "low": values["3. low"],
                    "close": values["4. close"],
                    "volume": values["5. volume"]
                }
                producer.send(KAFKA_TOPIC, record)
                time.sleep(0.5)  # Prevent hitting API rate limits
            producer.flush()
    except Exception as e:
        raise Exception(f"Error in fetch_and_produce_to_kafka: {str(e)}")
    finally:
        producer.close()

# Function: Start Streamlit in Background
def start_streamlit():
    """Start Streamlit dashboard as a background process"""
    os.system("nohup streamlit run /path/to/your/streamlit_app.py &")

# Define DAG
with DAG(
    'stock_data_pipeline',
    default_args=default_args,
    description='Stock data processing pipeline with Kafka, Spark, S3, and Streamlit',
    schedule_interval='*/5 * * * *',  # Run every 5 minutes
    catchup=False
) as dag:

    # Task 1: Fetch Stock Data and Send to Kafka
    fetch_and_produce = PythonOperator(
        task_id='fetch_and_produce',
        python_callable=fetch_and_produce_to_kafka
    )

    # Task 2: Spark Job to Consume Data from Kafka and Write to S3
    spark_consume = SparkSubmitOperator(
        task_id='spark_consume',
        application='/path/to/your/spark_consumer.py',
        conn_id='spark_default',
        conf={
            'spark.jars.packages': 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.apache.hadoop:hadoop-aws:3.2.0',
            'spark.hadoop.fs.s3a.access.key': 'YOUR_AWS_ACCESS_KEY',
            'spark.hadoop.fs.s3a.secret.key': 'YOUR_AWS_SECRET_KEY',
            'spark.hadoop.fs.s3a.endpoint': 's3.amazonaws.com'
        },
        verbose=True
    )

    # Task 3: Start Streamlit Dashboard
    streamlit_dashboard = PythonOperator(
        task_id='streamlit_dashboard',
        python_callable=start_streamlit
    )

    # DAG Execution Flow
    fetch_and_produce >> spark_consume >> streamlit_dashboard
