In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, concat, col, lit, from_json, udf
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, DateType, BooleanType, ArrayType, List
from time import sleep
from textblob import TextBlob

dataSchemaString = StructType(
        [
             StructField("user_name", StringType(), True),
             StructField("user_location", StringType(), True),
             StructField("user_description", StringType(), True),
             StructField("user_created", StringType(), True),
             StructField("user_followers", StringType(), True),
             StructField("user_friends", StringType(), True),
             StructField("user_favourites", StringType(), True),
             StructField("user_verified", StringType(), True),
             StructField("date", StringType(), True),
             StructField("text", StringType(), True),
             StructField("hashtags", StringType(), True),
             StructField("source", StringType(), True),         
             StructField("is_retweet", StringType(), True),     
            ]
)

schema = StructType(
        [
             StructField("tweet", ArrayType(dataSchemaString), True),     
        ]
)

In [None]:
def return_sentiment(tweet):
    if tweet is not None:
        tweet = TextBlob(tweet)
        polarity = tweet.sentiment.polarity

        if polarity > 0:
            return 'positive'
        elif polarity < 0:
            return 'negative'
        elif polarity == 0:
            return 'neutral'
        else:
            return ''
    else:
        return ''
        
udf_func = udf(lambda z: return_sentiment(z), returnType=StringType())

def sentiment_analysis(df, batch_id):  
    df = df \
      .withColumn("sentiment", lit(udf_func(col("text")))) \
      .withColumn("date",df.date.cast('date'))

    print('posted')
    df.show()
    df \
      .write.format('bigquery') \
      .option('table', 'de2022-362622.assignmentDatasets.tweetSentiments') \
      .mode("overwrite") \
      .save()

In [None]:
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("SparkStreamer")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "de_jads_temp-401"
spark.conf.set('temporaryGcsBucket', bucket)

# Read the whole dataset as a batch
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "tweet") \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", "false") \
        .load()

df = df.select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
          .withColumn("exploded", explode(col("parsed_value.tweet"))) \
          .drop(col("parsed_value")).select("exploded.*")

query = df \
    .writeStream \
    .foreachBatch(sentiment_analysis) \
    .start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")
except Exception as e:
    query.stop()
    # Stop the spark context
    spark.stop()
    print(e)
    print("Unexpected error")
    print("Stoped the streaming query and the spark context")