In [60]:
# Import required packages
import configparser
from confluent_kafka import Producer
import gensim
from gensim.utils import simple_preprocess
import gensim.corpora as corpora
import json
import logging
from multiprocessing import Process
import numpy as np
import os
import pandas as pd
import praw
import pyspark
from pyspark import broadcast, SparkContext
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover, OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer, Bucketizer
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, LocalLDAModel
from pyspark.ml.functions import vector_to_array
from pyspark.ml.pipeline import PipelineModel
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
# from pyspark.sql import SQLContext
import socket
import re
import sys
import scipy

In [144]:
# Set parameters
broker = "broker:29092"
num_topics = 20
model_cols = ["event_key","event_topic","event_timestamp","title","author_fullname","subreddit_name_prefixed","name","upvote_ratio","ups","created",
              "domain","url_overridden_by_dest","over_18","permalink","parent_whitelist_status","url"]
final_cols = model_cols + ["hour","day"]
cat_cols = ['domain','hour','day']

In [None]:
# Read passwords and secrets from config file
# May not be required
#config_parser = configparser.ConfigParser()
#config_parser.read("src/configuration/config.cfg")

In [5]:
#from pyspark.sql.functions import month, year, mean, count, dayofweek, hour, col, min, max, avg, sum, when, lit, desc, unix_timestamp, from_unixtime, udf, regexp_replace

In [396]:
# Build the spark session
spark = SparkSession.builder \
        .appName('kafka') \
        .getOrCreate()

## Load the raw stream and convert to dataframe objects for processing

In [397]:
# Load raw stream data for submissions and convert to df useable in processing to prediction
## NOTE - no need to do this for comments stream as the model is pre-trained and only applied to submission data

df_s = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("startingOffsets", "earliest") \
  .option("subscribe", "submissions") \
  .load()

In [379]:
df_s.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [398]:
string_stream_df_s = df_s \
    .withColumn("key", df_s["key"].cast(StringType())) \
    .withColumn("value", df_s["value"].cast(StringType()))

In [399]:
# Specify the structure of the value component
schema_submissions = StructType([
    StructField("id", StringType(),  True),
    StructField("author_fullname", StringType(),  True),
    StructField("title", StringType(),  True),
    StructField("subreddit_name_prefixed", StringType(),  True),
    StructField("name", StringType(), True),
    StructField("upvote_ratio", DoubleType(),  True),
    StructField("ups", IntegerType(), True),
    StructField("created", IntegerType(), True),
    StructField("domain", StringType(), True),
    StructField("url_overridden_by_dest", StringType(), True),
    StructField("over_18", StringType(), True),
    StructField("subreddit_id", StringType(),  True),
    StructField("permalink", StringType(),  True),
    StructField("parent_whitelist_status", StringType(),  True),
    StructField("url", StringType(),  True),
    StructField("created_utc", IntegerType(), True)
])

In [400]:
# To json to split our the values in message
json_stream_df = string_stream_df_s.withColumn("value", F.from_json("value", schema_submissions))

In [401]:
tokenizer = Tokenizer(inputCol="title", outputCol="words")

stop_words =StopWordsRemover.loadDefaultStopWords("english")
stop_words = stop_words + ['a','i']

remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_words)

# Load the vectorizer trained on the original training data and used for the LDA model training and hence the rf model relying on it.
cvmodel = CountVectorizerModel.load('count_vectorizer_model')

# Load the LDA model trained on the original training data 
lda_model = LocalLDAModel.load('lda_distributed_model')

X_titles = [f'T_{i}' for i in range(1, num_topics + 1)]

# Load the pre-trained random forest model
pipeline_model = PipelineModel.load('pipeline_model')

# Required variables for the pipeline
 # categorical columns

# Change title of one hot encoded categoricals created in the pipeline
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

In [402]:
# Create the final usable df for submissions
submissions_stream_df = json_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
            "value.id", \
            "value.author_fullname", \
            "value.title", \
            "value.subreddit_name_prefixed", \
            "value.name", \
            "value.upvote_ratio", \
            "value.ups", \
            "value.created", \
            "value.domain", \
            "value.url_overridden_by_dest", \
            "value.over_18", \
            "value.subreddit_id", \
            "value.permalink", \
            "value.parent_whitelist_status", \
            "value.url",
            "value.created_utc"
           )

submissions_stream_df = submissions_stream_df.select('id','title','domain','subreddit_id','event_timestamp') \
    .withColumn("title", F.regexp_replace(F.col("title"), '[^\sa-zA-Z]', '')) \
    .withColumn("hour", F.hour(F.col("event_timestamp"))).withColumn("day", F.dayofweek(F.col("event_timestamp"))) \
    .withColumn("hour", F.col("hour").astype(StringType())).withColumn("day", F.col("day").astype(StringType()))

submissions_stream_df = tokenizer.transform(submissions_stream_df)
submissions_stream_df = remover.transform(submissions_stream_df)
submissions_stream_df = cvmodel.transform(submissions_stream_df)
#submissions_stream_df = submissions_stream_df.select('vectors', 'id')
submissions_stream_df = lda_model.transform(submissions_stream_df)

submissions_stream_df = submissions_stream_df.withColumn("T_", vector_to_array("topicDistribution")) \
    .drop('vectors', 'topicDistribution') \
    .select(["domain","hour","day"] + [F.col(f"T_")[i] for i in range(1,21)]) \
    .withColumnRenamed("T_[1]", "T_1") \
    .withColumnRenamed("T_[2]", "T_2") \
    .withColumnRenamed("T_[3]", "T_3") \
    .withColumnRenamed("T_[4]", "T_4") \
    .withColumnRenamed("T_[5]", "T_5") \
    .withColumnRenamed("T_[6]", "T_6") \
    .withColumnRenamed("T_[7]", "T_7") \
    .withColumnRenamed("T_[8]", "T_8") \
    .withColumnRenamed("T_[9]", "T_9") \
    .withColumnRenamed("T_[10]", "T_10") \
    .withColumnRenamed("T_[11]", "T_11") \
    .withColumnRenamed("T_[12]", "T_12") \
    .withColumnRenamed("T_[13]", "T_13") \
    .withColumnRenamed("T_[14]", "T_14") \
    .withColumnRenamed("T_[15]", "T_15") \
    .withColumnRenamed("T_[16]", "T_16") \
    .withColumnRenamed("T_[17]", "T_17") \
    .withColumnRenamed("T_[18]", "T_18") \
    .withColumnRenamed("T_[19]", "T_19") \
    .withColumnRenamed("T_[20]", "T_20")

# Apply it to the new feature dataframe for new submissions
submissions_stream_df = pipeline_model.transform(submissions_stream_df)

In [403]:
# Create prediction stream
submissions_stream = submissions_stream_df \
    .writeStream \
    .format("memory") \
    .queryName("submissions_view") \
    .start()

In [412]:
subsmissions_data = spark.sql('SELECT * FROM submissions_view')
subsmissions_data.show(5)
print(subsmissions_data.count())
# subs_data.show(5, truncate = 40)

+------+----+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----------+----------+--------+--------+-------+-------+--------+-------------+-----------+----------+
|domain|hour|day|T_1|T_2|T_3|T_4|T_5|T_6|T_7|T_8|T_9|T_10|T_11|T_12|T_13|T_14|T_15|T_16|T_17|T_18|T_19|T_20|domain_ind|domain_ohe|hour_ind|hour_ohe|day_ind|day_ohe|features|rawPrediction|probability|prediction|
+------+----+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----------+----------+--------+--------+-------+-------+--------+-------------+-----------+----------+
+------+----+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----------+----------+--------+--------+-------+-------+--------+-------------+-----------+----------+

0


## Process new titles data ready for modelling and prediction

In [407]:
# Create the final usable df for submissions
test_submissions_stream_df = json_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
            "value.id", \
            "value.author_fullname", \
            "value.title", \
            "value.subreddit_name_prefixed", \
            "value.name", \
            "value.upvote_ratio", \
            "value.ups", \
            "value.created", \
            "value.domain", \
            "value.url_overridden_by_dest", \
            "value.over_18", \
            "value.subreddit_id", \
            "value.permalink", \
            "value.parent_whitelist_status", \
            "value.url",
            "value.created_utc"
           )

In [408]:
# Create prediction stream
test_submissions_stream = test_submissions_stream_df \
    .writeStream \
    .format("memory") \
    .queryName("test_submissions_view") \
    .start()

In [410]:
subs_data = spark.sql('SELECT * FROM test_submissions_view')
subs_data.show(5)
print(subs_data.count())
# subs_data.show(5, truncate = 40)

+---------+-----------+--------------------+------+---------------+--------------------+-----------------------+---------+------------+----+-------+-------------+----------------------+-------+------------+--------------------+-----------------------+--------------------+-----------+
|event_key|event_topic|     event_timestamp|    id|author_fullname|               title|subreddit_name_prefixed|     name|upvote_ratio| ups|created|       domain|url_overridden_by_dest|over_18|subreddit_id|           permalink|parent_whitelist_status|                 url|created_utc|
+---------+-----------+--------------------+------+---------------+--------------------+-----------------------+---------+------------+----+-------+-------------+----------------------+-------+------------+--------------------+-----------------------+--------------------+-----------+
|   ny36rp|submissions|2021-06-13 00:49:...|ny36rp|    t2_1mwvv4vy|Bangladesh signs ...|            r/worldnews|t3_ny36rp|        0.62|   9|   nu

In [367]:
# Only keep the needed columns and remove duplicate rows - submissions
subs_data = subs_data.select('id','title','domain','subreddit_id','event_timestamp')
subs_data = subs_data.distinct()
subs_data.count()

119

In [368]:
# Remove punctuation and numbers from titles
subs_data = subs_data.withColumn("title", F.regexp_replace(F.col("title"), '[^\sa-zA-Z]', ''))

# Prepare a tokenizer
tokenizer = Tokenizer(inputCol="title", outputCol="words")
wordsDataFrame = tokenizer.transform(subs_data)

# Define stopwords
stop_words = StopWordsRemover.loadDefaultStopWords("english")
stop_words = stop_words + ['a','i']

# Apply the stop words remover
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_words)
wordsDataFrame = remover.transform(wordsDataFrame)

In [369]:
# Load the vectorizer trained on the original training data and used for the LDA model training and hence the rf model relying on it.
cvmodel = CountVectorizerModel.load('count_vectorizer_model')

# Then apply it to new titles data
df_vect = cvmodel.transform(wordsDataFrame)
basics = df_vect.select('vectors', 'id')
basics.show()

+--------------------+------+
|             vectors|    id|
+--------------------+------+
|(68283,[1,3,6,144...|nyc0xu|
|(68283,[1,6,47,11...|nyfeyp|
|(68283,[0,4,7,22,...|nyagde|
|(68283,[0,1751,18...|nyccrc|
|(68283,[1,335,501...|ny6ed0|
|(68283,[74,272,30...|nyc1vt|
|(68283,[2,17,47,7...|nyew7i|
|(68283,[55,67,202...|nyg537|
|(68283,[33,153,17...|ny6st7|
|(68283,[0,69,113,...|ny6ug6|
|(68283,[7,17,33,5...|nydqvu|
|(68283,[6,17,47,3...|nygp6t|
|(68283,[61,519,11...|nyifam|
|(68283,[211,719,1...|nybrib|
|(68283,[366,695,1...|nygt8j|
|(68283,[4,67,126,...|nyj2p8|
|(68283,[639,644,2...|nyk526|
|(68283,[0,1,7,9,4...|nyktrp|
|(68283,[3,56,335,...|ny7zzo|
|(68283,[0,1,2,61,...|nylk0u|
+--------------------+------+
only showing top 20 rows



In [370]:
# Load the LDA model trained on the original training data 
lda_model = LocalLDAModel.load('lda_distributed_model')

# Then apply it to new titles vectors
indiv = lda_model.transform(basics)

## Build other features into dataset

In [371]:
subs_data = subs_data.withColumn("hour", F.hour(F.col("event_timestamp"))).withColumn("day", F.dayofweek(F.col("event_timestamp")))
subs_data = subs_data.withColumn("hour", F.col("hour").astype(StringType())).withColumn("day", F.col("day").astype(StringType()))

In [372]:
# The training model used this number of topics, so need to know that for vector importance here
num_topics = 20

# Create partial features df by collecting topic distributions for each title
temp = indiv.select('topicDistribution')

# Create a list of sequential column titles to be used for populating this
X_titles = [f'T_{i}' for i in range(1, num_topics + 1)]
X_titles = ['id'] + X_titles # adding the id at start

# Populate topic features
temp = indiv.withColumn("T_", vector_to_array("topicDistribution")).select(["id"] + [F.col("T_")[i] for i in range(20)]).drop('vectors', 'topicDistribution')
temp = temp.toDF(*X_titles)
temp.show(truncate =40)

+------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+-------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|    id|                  T_1|                  T_2|                  T_3|                  T_4|                  T_5|                  T_6|                  T_7|                  T_8|                T_9|                 T_10|                 T_11|                 T_12|                 T_13|                 T_14|                 T_15|                 T_16|                 T_17|                 T_18|                 T_19|                 T_20|
+------+---------------------+---------------------+---------------------+---------------------+----------

In [373]:
# Pull together the topic features and other ones
rf_full = temp.join(subs_data, on='id', how='left').drop('id','subreddit_id','title','event_timestamp','created_utc','link_id')


In [375]:
rf_full.printSchema()
subs_data.printSchema()

root
 |-- T_1: double (nullable = true)
 |-- T_2: double (nullable = true)
 |-- T_3: double (nullable = true)
 |-- T_4: double (nullable = true)
 |-- T_5: double (nullable = true)
 |-- T_6: double (nullable = true)
 |-- T_7: double (nullable = true)
 |-- T_8: double (nullable = true)
 |-- T_9: double (nullable = true)
 |-- T_10: double (nullable = true)
 |-- T_11: double (nullable = true)
 |-- T_12: double (nullable = true)
 |-- T_13: double (nullable = true)
 |-- T_14: double (nullable = true)
 |-- T_15: double (nullable = true)
 |-- T_16: double (nullable = true)
 |-- T_17: double (nullable = true)
 |-- T_18: double (nullable = true)
 |-- T_19: double (nullable = true)
 |-- T_20: double (nullable = true)
 |-- domain: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- day: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- event_times

In [347]:
# Load the pre-trained random forest model
pipeline_model = PipelineModel.load('pipeline_model')

# Required variables for the pipeline
cat_cols = ['domain','hour','day'] # categorical columns

# Change title of one hot encoded categoricals created in the pipeline
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

# Define the numerical columns
features_cols = rf_full.columns
num_cols = [x for x in features_cols if x not in cat_cols]

# Apply it to the new feature dataframe for new submissions
predicted_comments = pipeline_model.transform(rf_full)

In [349]:
predicted_comments.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+---+----------+------------------+--------+---------------+-------+---------+--------------------+--------------------+--------------------+----------+
|                 T_1|                 T_2|                 T_3|                 T_4|                 T_5|                 T_6|                 T_7|                 T_8|                T_9|                T_10|                T_11|                T_12|                T_13|                T_14|                T_15|                T_16|                T_17|                T_18|                T_19|   

In [None]:
prediction_stream = predicted_comments \
    .writeStream \
    .format("memory") \
    .outputMode("Complete") \
    .queryName("comment_predictions") \
    .start()

In [350]:
preds = predicted_comments.select(['prediction'])

In [None]:
predicted_comments.show(10)

In [351]:
preds.show(10)

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 10 rows



In [29]:
def acknowledged(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

In [28]:
def create_prodcuer(bootstrap_broker):
    # Create producer
    conf = {'bootstrap.servers': bootstrap_broker,
            'client.id': socket.gethostname()}
    producer = Producer(conf)
    return producer

In [None]:
data = {}

data['id'] = predicted_comments
data['prediction'] = predicted_comments
producer = create_producer(bootstrap_broker=broker)

try:
    producer.produce("comment_predictions", key=submission.id, value=dump_submission, callback=acknowledged)
except Exception as e:
    logging.error(e)

In [395]:
spark.stop()