In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, count, sum
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType
import time

In [5]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spark Structured Streaming Example") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# Define schema for tweets
tweet_schema = StructType() \
    .add("id", StringType()) \
    .add("created_at", StringType()) \
    .add("followers_count", StringType()) \
    .add("location", StringType()) \
    .add("favorite_count", StringType()) \
    .add("retweet_count", StringType())

# Read Stream from Kafka
data_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test-amazon-crawler") \
    .option("startingOffsets", "latest") \
    .load()

# Process the Kafka data stream
data_stream_cleaned = data_stream \
    .selectExpr("CAST(value AS STRING) as string_value") \
    .selectExpr("split(string_value, ';') as parts") \
    .selectExpr(
        "CAST(parts[0] AS STRING) AS id",
        "CAST(parts[1] AS TIMESTAMP) AS created_at",
        "CAST(parts[2] AS INT) AS followers_count",
        "CAST(parts[3] AS STRING) AS location",
        "CAST(parts[4] AS INT) AS favorite_count",
        "CAST(parts[5] AS INT) AS retweet_count"
    ) \
    .filter(col("created_at") > current_date()) \
    .groupBy("location") \
    .agg(
        count("id").alias("count_id"),
        sum("followers_count").alias("sum_followers_count"),
        sum("favorite_count").alias("sum_favorite_count"),
        sum("retweet_count").alias("sum_retweet_count")
    )

# Define output sink and trigger
query = data_stream_cleaned.writeStream \
    .format("memory") \
    .queryName("demo") \
    .trigger(processingTime="30 seconds") \
    .outputMode("complete") \
    .start()

# Check the status of the query (e.g., to debug or monitor)
while query.isActive:
    time.sleep(10)
    print(query.status)

# Connect to AWS Redshift
redshift_url = "jdbc:redshift://data-warehouse.c3glymsgdgty.us-east-1.redshift.amazonaws.com:5439/lambda"
properties = {
    "user": "dorian",
    "password": "Demo1234",
    "driver": "com.amazon.redshift.jdbc42.Driver"
}

# Export data to Redshift every hour
def export_to_redshift():
    df = spark.sql("SELECT * FROM demo")
    df.write.jdbc(url=redshift_url, table="speed_layer", mode="overwrite", properties=properties)

# Schedule the function to run every hour
while True:
    export_to_redshift()
    time.sleep(3600)  # Run every hour

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.