In [None]:
from confluent_kafka import Producer
import pandas as pd
import time
import json

# Replace 'your_file.csv' with the actual path to your CSV file
file_path = 'air_quality_index.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(file_path)

# Kafka configuration
bootstrap_servers = 'localhost:9092'

topic = 'air_quality'

# Kafka producer settings
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'client.id': 'kafka-producer.csv'
}

# Create Kafka producer
producer = Producer(producer_config)

# Function to send messages to Kafka topic with a 15-second delay
def send_data_to_kafka(row):
    # Convert the row to a JSON string
    message = json.dumps(row.to_dict())
    
    # Produce the message to the Kafka topic
    producer.produce(topic, value=message)
    
    # Flush the producer to ensure the message is sent
    producer.flush()

    # Add a delay of 15 seconds
    time.sleep(15)

# Iterate over rows and send data to Kafka
for index, row in df.iterrows():
    send_data_to_kafka(row)

# Close the Kafka producer
producer.close()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("KafkaSparkStreaming").getOrCreate()

# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic = 'air_quality'

# Define schema for the incoming JSON messages
schema = StructType().add("City", StringType()).add("Datetime", StringType()).add("PM2.5", StringType()).add("PM10", StringType())  # Add other columns accordingly

# Read from Kafka topic using Spark Structured Streaming
kafka_stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrap_servers).option("subscribe", topic).load()

# Parse JSON messages
parsed_stream_df = kafka_stream_df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data")).select("data.*")

# Display the data in the Jupyter Notebook
query = (
    parsed_stream_df
    .writeStream
    .outputMode("append")
    .format("memory")  # Store the results in memory table
    .queryName("air_quality_table")  # Define a query name
    .start()
)

# Use display function to show the data
display(spark.sql("SELECT * FROM air_quality_table"))

# Await termination
query.awaitTermination()
