In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

# Initialize Spark session
spark = SparkSession.builder.appName("KafkaToRDS").getOrCreate()

# Kafka connection settings
kafka_bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
kafka_topic = "topic-1"
api_key = "GQGFJEW2MP5AVUNX"
api_secret = "qJl8MMSkoeX7w5XMbgduyFXjJy1yU9I96QaWmiqjehoOV2TpoqnuF7Q4chJGSvMB"

kafka_security_options = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": f"""org.apache.kafka.common.security.plain.PlainLoginModule required username="{api_key}" password="{api_secret}";"""
}

# Updated schema for parsing the stock data
message_schema = StructType([
    StructField("ticker", StringType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("adj_close", DoubleType(), True),
    StructField("volume", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("current_time", StringType(), True)
])

# RDS Connection Options
rds_url = "jdbc:mysql://database-2.czw88k040mub.us-east-1.rds.amazonaws.com:3306/kafka"
rds_user = "admin"
rds_password = "riya1234"
db_properties = {
    "user": rds_user,
    "password": rds_password,
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Read from Kafka
df = spark.readStream.format("kafka").options(**kafka_security_options).option("startingOffsets", "latest").load()

# Select and transform the value column
df_events = df.selectExpr("CAST(value AS STRING) AS value_column")

# Parse the JSON in the value column
df_parsed = df_events.withColumn("parsed_value", from_json(col("value_column"), message_schema)).select(
    col("parsed_value.ticker").alias("ticker"),
    col("parsed_value.open").alias("open"),
    col("parsed_value.high").alias("high"),
    col("parsed_value.low").alias("low"),
    col("parsed_value.close").alias("close"),
    col("parsed_value.adj_close").alias("adj_close"),
    col("parsed_value.volume").alias("volume"),
    col("parsed_value.timestamp").alias("timestamps"),
    col("parsed_value.current_time").alias("currenttime")
)

# Convert timestamp to a proper TimestampType
df_final = df_parsed.withColumn("timestamps", col("timestamps").cast(TimestampType()))

# Function to write to RDS in foreachBatch
def write_to_rds(batch_df, batch_id):
    try:
        print(f"Writing batch {batch_id} to RDS.")
        batch_df.write.jdbc(
            url=rds_url,
            table="stock_data",
            mode="append",
            properties=db_properties
        )
        print(f"Successfully wrote batch {batch_id} to RDS.")
    except Exception as e:
        print(f"Error writing batch {batch_id} to RDS: {e}")

# Write stream to RDS
query = df_final.writeStream \
    .foreachBatch(write_to_rds) \
    .outputMode("update") \
    .start()

query.awaitTermination()
