In [3]:
# Imports
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType, IntegerType
from time import sleep

# Configure spark session
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("twitter-streamer")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# Create a spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()


# Define the schema of the source data
dataSchema = StructType([
    StructField("id", LongType(), True),
    StructField("user_id", LongType(), True),
    StructField("created_at", FloatType(), True),
    StructField("party", StringType(), True),
    StructField("sentiment", FloatType(), True),
    StructField("tweet", StringType(), True)])

# Connect to the kafka source input
df_raw = spark.readStream.format("kafka")\
              .option("kafka.bootstrap.servers", "kafka1:9093")\
              .option("subscribe", "twitter_politics")\
              .option("startingOffsets", "earliest")\
              .load()

# Convert input bytes into string and convert to columns
lines = df_raw.selectExpr("CAST(value AS STRING)")
df = lines.select(from_csv(lines.value, dataSchema.simpleString()))
df.printSchema()

df_gs = df.select(col("from_csv(value).*"))
df_gs.printSchema()

# Create event_time column for use with a Window
df_gs = df_gs.selectExpr("*", "CAST(created_at as timestamp) as event_time")
df_gs.printSchema()

# Compute the average score for each party
df_results = df_gs.groupBy(window(col("event_time"), "7 days"), "party")\
                  .agg(avg("sentiment").alias("value"))
df_results = df_results.select(
    col("party").alias("key"),
    concat(col("party"), lit(","), col("window").cast("string"), lit(","), col("value")).cast("string").alias("value"))

df_results.printSchema()


query = df_results.writeStream.format("kafka")\
                  .option("kafka.bootstrap.servers", "kafka1:9093")\
                  .option("checkpointLocation", "/home/jovyan/checkpoint")\
                  .option("topic", "avg_sentiment")\
                  .outputMode("complete")\
                  .start()

#Write the output to a stream
# df_query = df_results.writeStream.queryName("debugging")\
#                      .format("memory")\
#                      .outputMode("complete")\
#                      .start()

# try:
#     for x in range(10):
#         spark.sql("SELECT * FROM debugging").show()
#         sleep(10)
# except KeyboardInterrupt:
#     df_query.stop()
#     spark.stop()
#     print("Stopped the streaming query and spark context")
# finally:
#     spark.stop()


root
 |-- from_csv(value): struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- user_id: long (nullable = true)
 |    |-- created_at: float (nullable = true)
 |    |-- party: string (nullable = true)
 |    |-- sentiment: float (nullable = true)
 |    |-- tweet: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- created_at: float (nullable = true)
 |-- party: string (nullable = true)
 |-- sentiment: float (nullable = true)
 |-- tweet: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- created_at: float (nullable = true)
 |-- party: string (nullable = true)
 |-- sentiment: float (nullable = true)
 |-- tweet: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [18]:
query.stop()

In [19]:
spark.stop()