In [1]:
import os
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, length, when, col
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField, DoubleType, TimestampType
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from jinja2 import Environment, FileSystemLoader

In [2]:
# setting constants
APP_NAME = "LAB-2-ANTONOV-CONSUMER"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

#running spark
SPARK_ADRESS = "local[4]"

# run spark
spark = SparkSession\
        .builder\
        .appName(APP_NAME)\
        .master(SPARK_ADRESS)\
        .config("spark.driver.host", LOCAL_IP)\
        .config("spark.driver.bindAddress", "0.0.0.0")\
        .config("spark.executor.instances", "2")\
        .config("spark.executor.cores", '3')\
        .config("spark.memory.fraction", "0.8")\
        .config("spark.memory.storageFraction", "0.6")\
        .config("spark.executor.memory", '3g')\
        .config("spark.driver.memory", "3g")\
        .config("spark.driver.maxResultSize", "1g")\
        .config("spark.kubernetes.memoryOverheadFactor", "0.3")\
        .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
        .config("spark.kubernetes.namespace", "aantonov-310006")\
        .config("spark.kubernetes.driver.label.appname", APP_NAME)\
        .config("spark.kubernetes.executor.label.appname", APP_NAME)\
        .config("spark.kubernetes.container.image", "node03.st:5000/spark-executor:aantonov-310006")\
        .config("spark.sql.streaming.checkpointLocation", "hdfs:///home/aantonov-310006/project") \
        .config("spark.local.dir", "/tmp/spark")\
        .config("spark.driver.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
        .config("spark.executor.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
        .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
        .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
        .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.path", "/home/jovyan/shared-data")\
        .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.path", "/nfs/shared")\
        .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.type", "Directory")\
        .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.readOnly", "true")\
        .getOrCreate()

In [3]:
# printing important urls and pathes
print("Web UI: {}".format(spark.sparkContext.uiWebUrl))
print("\nlog4j file: {}".format(LOG4J_PROP_FILE))
print("\ndriver log file: {}".format(LOG_FILE))

Web UI: http://10.128.9.86:4041

log4j file: /home/jovyan/nfs-home/LABS/LAB_KAFKA/CONS/conf/pyspark-log4j-LAB-2-ANTONOV-CONSUMER.properties

driver log file: /home/jovyan/nfs-home/LABS/LAB_KAFKA/CONS/logs/pyspark-LAB-2-ANTONOV-CONSUMER.log


## Step 1

Read the data sent by a producer

In [4]:
schema = StructType([
        StructField("user", DoubleType()),
        StructField("date_posted", TimestampType()),
        StructField("text", StringType()),
        StructField("sex", StringType()),
        StructField("age", IntegerType()),
])

In [5]:
df = spark.readStream \
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka-svc:9092")\
    .option("subscribe", "main")\
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.select('value', from_json("value", schema).alias("value_struct"))
df = df.select("value", 
                "value_struct.user", 
                "value_struct.date_posted", 
                "value_struct.text", 
                "value_struct.sex", 
                "value_struct.age")

df

DataFrame[value: string, user: double, date_posted: timestamp, text: string, sex: string, age: int]

## Step 2

Creating a word remover based on the russian stop words. UDF function get_age_group() returns and age group for a specific age

In [6]:
import nltk
nltk.data.path.append("/home/jovyan/nltk_data")

from nltk.corpus import stopwords
from pyspark.ml.feature import RegexTokenizer

In [7]:
from pyspark.ml.feature import StopWordsRemover

RS = stopwords.words('russian')
stopwords_broadcast = spark.sparkContext.broadcast(RS) 
remover = StopWordsRemover(inputCol="tokens", outputCol="cleaned_tokens", stopWords=RS)

@udf(returnType=StringType())
def get_age_group(age):
    AGE_GROUPS = {(0, 18): '0-18', 
                  (18, 27): '18-27', 
                  (27, 40): '27-40', 
                  (40, 60): '40-60', 
                  (60, 500): '60+'}
    
    for key in AGE_GROUPS.keys():    
        if age > key[0] and age < key[1]:
            return AGE_GROUPS[key]


## Step 3

Dividing the data

In [8]:
SEX_GROUPS = ["M", "F"]
AGE_GROUPS = [(0, 18), (18, 27), (27, 40), (40, 60), (60, 500)]
DURATION_GROPUS = ['1 hour', '1 day', '1 week']

for sex in SEX_GROUPS:
    for age_bounds in AGE_GROUPS:
        age_lower_bound = age_bounds[0]
        age_upper_bound = age_bounds[1]
        for dur in DURATION_GROPUS:
            
            dur_normalised = dur.replace(" ", "_")
            topic = f"{sex}_{age_lower_bound}_{age_upper_bound}_{dur_normalised}"
            print(f"dividing stream by topiic: {topic}")

            remover.transform(
                df\
                .withColumn("tokens", split(col('text'), ' '))\
                .withColumn("age_group", get_age_group("age"))
            )\
            .where((col("sex") == sex) & (col("age") > age_lower_bound) & (col("age") < age_upper_bound))\
            .select(
                "date_posted", 
                "age_group",
                "sex",
                explode("cleaned_tokens").alias("token")
            )\
            .groupBy('sex', 'age_group', window('date_posted', dur, dur))\
            .agg(count('token').alias('tokens_count'))\
            .selectExpr('to_json(struct(*)) as value')\
            .writeStream\
            .outputMode("update")\
            .format("kafka") \
            .option("kafka.bootstrap.servers", "kafka-svc:9092")\
            .option("topic", topic)\
            .start()

dividing stream by topiic: M_0_18_1_hour
dividing stream by topiic: M_0_18_1_day
dividing stream by topiic: M_0_18_1_week
dividing stream by topiic: M_18_27_1_hour
dividing stream by topiic: M_18_27_1_day
dividing stream by topiic: M_18_27_1_week
dividing stream by topiic: M_27_40_1_hour
dividing stream by topiic: M_27_40_1_day
dividing stream by topiic: M_27_40_1_week
dividing stream by topiic: M_40_60_1_hour
dividing stream by topiic: M_40_60_1_day
dividing stream by topiic: M_40_60_1_week
dividing stream by topiic: M_60_500_1_hour
dividing stream by topiic: M_60_500_1_day
dividing stream by topiic: M_60_500_1_week
dividing stream by topiic: F_0_18_1_hour
dividing stream by topiic: F_0_18_1_day
dividing stream by topiic: F_0_18_1_week
dividing stream by topiic: F_18_27_1_hour
dividing stream by topiic: F_18_27_1_day
dividing stream by topiic: F_18_27_1_week
dividing stream by topiic: F_27_40_1_hour
dividing stream by topiic: F_27_40_1_day
dividing stream by topiic: F_27_40_1_week
div

Lets create a test consumer to look at the topic "female in age group 18-27 windowed by week"

In [9]:
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(bootstrap_servers="kafka-svc:9092", consumer_timeout_ms=1000)
consumer.subscribe('F_18_27_1_week')

In [None]:
while(True):
    for message in consumer:
        print(json.loads(message.value))

{'sex': 'F', 'age_group': '18-27', 'window': {'start': '2018-12-27T00:00:00.000Z', 'end': '2019-01-03T00:00:00.000Z'}, 'tokens_count': 78}


In [None]:
spark.stop()