In [1]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.sql.types import *
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *
import json
import textblob
from textblob import TextBlob

In [2]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName("Reddit-News")\
                    .getOrCreate()

In [3]:
pythonSchema = StructType() \
          .add("id", StringType(), True) \
          .add("submission", StringType(), True) \
          .add("comment_number", IntegerType(), True) \
          .add("score", IntegerType(), True)

In [4]:
awsAccessKeyId = ""
awsSecretKey = ""
kinesisStreamName = "" 
kinesisRegion = ""

In [5]:
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "latest") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()

In [6]:
# Data sourced from kinesis will come into the Spark Streaming DF here
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("news")  \
  .start()

In [7]:
df.status

In [8]:
#sentiment
news = spark.sql("select cast(data as string) from news")

In [9]:
def parse_submission(x):
  data = json.loads(x)
  id = data['id']
  submission = data['submission']
  comment_number = data['comment_number'] 
  score = data['score'] 
  return (id, submission, comment_number, score)

In [10]:
getID = UserDefinedFunction(lambda x: parse_submission(x)[0])
getSubmission = UserDefinedFunction(lambda x: parse_submission(x)[1], StringType())
getCommentNum = UserDefinedFunction(lambda x: parse_submission(x)[2], IntegerType())
getScore = UserDefinedFunction(lambda x: parse_submission(x)[3], IntegerType())

In [11]:
news_normalized = (news.withColumn('id', getID(col("data")))
                .withColumn('submission', getSubmission(col("data")))
                .withColumn('comment_number', getCommentNum(col("data")))
                .withColumn('score', getScore(col("data"))))

In [12]:
news_normalized.show(5)

In [13]:
def get_sentiment(submission):
    sub = TextBlob(submission)
    if sub.sentiment.polarity < 0:
      sentiment = "NEG"
    elif sub.sentiment.polarity == 0:
        sentiment = "NEUT"
    else:
        sentiment = "POS"
    return sentiment
  

In [14]:
getSentiment = UserDefinedFunction(lambda x: get_sentiment(x), StringType())
news_normalized = news_normalized.withColumn('sentiment', getSentiment(col("submission")))

In [15]:
news_normalized.show(5)

In [16]:
news_normalized.createOrReplaceTempView("news_normalized_sentiment")

In [17]:
%sql
select sentiment, count(*) as submissions from news_normalized_sentiment group by sentiment

sentiment,submissions
NEG,31
POS,59
NEUT,105
