In [1]:
# Import Libraries

import pandas as pd
import numpy as np
import datetime
import re
import json
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as F

In [2]:
# Configuration

KAFKA_HOSTS = 'localhost:9092'
KAFKA_VERSION = (0, 10, 2)
dataDirectory = "shared_data/bigdata20/followers_info.json/*.json"

In [3]:
# Spark

spark = SparkSession.builder.master("local[*]") \
    .appName("Profile Stream Producer") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()

In [4]:
def sendMessage(msg_df, topic):
    ds = msg_df\
        .select(F.to_json(F.struct([F.col(c).alias(c)\
         for c in msg_df.columns])).cast("string").alias('value'))\
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_HOSTS) \
        .option("topic", topic) \
        .save()

In [5]:
# Load Data

profile_df = spark.read.json("shared_data/bigdata20/followers_info.json/*.json")
posts_df = spark.read.json("shared_data/bigdata20/followers_posts_api_final.json")

In [6]:
# Add age and sex of the post owner to the posts dataset

find_year = F.udf(lambda y: y[-4:] if y else 0, StringType())
find_age = F.udf(lambda y: (np.random.randint(15,80)) if y==0 else (datetime.datetime.now().year - y), IntegerType())


profile_df = profile_df.withColumn("year",\
                        F.when(F.col('bdate').isNull() | (F.length(F.col('bdate'))<7), '0')\
                        .otherwise(find_year(F.col('bdate'))))
profile_df = profile_df.withColumn('age', find_age(profile_df["year"].cast(IntegerType())))\
                        .select(F.col('id').alias('owner_id'), F.col('sex'), F.col('age'))

posts_df = posts_df.join(profile_df, "owner_id", "left")
data_df = posts_df.withColumnRenamed('id', 'post_id').na\
                    .drop(subset=['age', 'sex', 'post_id', 'owner_id'])

In [7]:
# Remove Stopwords

stopwords = spark.read.text('shared_data/bigdata20/stopwords.txt')
val = stopwords.select('value').collect()
wordlist = [ele['value'] for ele in val]
x = ",".join(wordlist)

text_df = data_df.select('post_id', 'text').withColumn("text", F.regexp_replace(F.col("text"), "\n", ""))\
                    .withColumn("unfilteredText", F.split("text", " "))
text_df = text_df.withColumn("filter_col", F.lit(x))\
                    .withColumn("filter_col", F.split("filter_col", ","))
text_df = text_df.withColumn("filteredText", F.array_except("unfilteredText", "filter_col"))\
                    .na.drop(subset=['filteredText'])
text_df = text_df.select('filteredText').withColumn("filteredText", F.explode(text_df.filteredText))\
                    .withColumn("filteredText", F.regexp_replace(F.col("filteredText"), ",", ""))

In [8]:
# Send Message

data_df = data_df.select(F.col("post_id"), F.col("owner_id"), \
            F.col("sex").cast("int"), F.col("age").cast("int"))

sendMessage(data_df.filter("sex==1"), "male")
sendMessage(data_df.filter("sex==2"), "female")
sendMessage(data_df.filter("age<18"), "eighteen")
sendMessage(data_df.filter((data_df.age>=18) & (data_df.age<27)), "twentyseven")
sendMessage(data_df.filter((data_df.age>=27) & (data_df.age<40)), "forty")
sendMessage(data_df.filter((data_df.age>=40) & (data_df.age<60)), "sixty")
sendMessage(data_df.filter("age>=60"), "senior")
sendMessage(text_df.select('filteredText'), "words")

In [9]:
spark.stop()