In [1]:
import os
if "HADOOP_CONF_DIR" in os.environ:
    del os.environ["HADOOP_CONF_DIR"]

In [2]:
"HADOOP_CONF_DIR" in os.environ

False

In [3]:
import json
import time
import random
from kafka import KafkaConsumer
from tqdm import tqdm
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField, TimestampType
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from jinja2 import Environment, FileSystemLoader


# setting constants
APP_NAME = "consume"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

# run spark
spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .master("k8s://https://10.32.7.103:6443")\
    .config("spark.driver.host", LOCAL_IP)\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", '3')\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.6")\
    .config("spark.executor.memory", '3g')\
    .config("spark.driver.memory", "3g")\
    .config("spark.driver.maxResultSize", "1g")\
    .config("spark.kubernetes.memoryOverheadFactor", "0.3")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.kubernetes.namespace", "sburmistrova-266294")\
    .config("spark.kubernetes.driver.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.label.appname", APP_NAME)\
    .config("spark.kubernetes.container.image", "node03.st:5000/spark-executor:sburmistrova-266294")\
    .config("spark.local.dir", "/tmp/spark")\
    .config("spark.driver.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.executor.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.path", "/home/jovyan/shared-data")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.path", "/nfs/shared")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.type", "Directory")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.readOnly", "false")\
    .getOrCreate()

# printing important urls and paths
print("Web UI: {}".format(spark.sparkContext.uiWebUrl))
print("\nlog4j file: {}".format(LOG4J_PROP_FILE))
print("\ndriver log file: {}".format(LOG_FILE))

Web UI: http://10.128.5.219:4041

log4j file: /home/jovyan/nfs-home/conf/pyspark-log4j-consume.properties

driver log file: /home/jovyan/nfs-home/logs/pyspark-consume.log


In [4]:
import json
import time
import random
from kafka import KafkaProducer
from tqdm import tqdm

In [5]:
from pyspark.ml.feature import RegexTokenizer
import nltk
nltk.data.path.append("/home/jovyan/nltk_data")
from nltk.corpus import stopwords

In [6]:
schema = StructType()\
        .add("user_id", LongType())\
        .add("text", StringType())\
        .add("timestamp", TimestampType())

In [7]:
df = spark.readStream \
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka-svc:9092")\
    .option("subscribe", "posts")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

In [8]:
profile_df = spark.read.json("/home/jovyan/shared-data/bigdata20/followers_info.json")\
.select(col("id").alias("user_id"), (when(col("sex") == lit(1), lit("F"))
        .when(col("sex") == lit(2), lit("M")).otherwise(lit(None))).alias("sex"),
        (months_between(current_date(), concat_ws( "-", regexp_extract(col("bdate"), r"(\d{1,2})\.(\d{1,2})\.(\d{4})", 3).cast("int"), 
        regexp_extract(col("bdate"), r"(\d{1,2})\.(\d{1,2})\.(\d{4})", 2).cast("int"), 
        regexp_extract(col("bdate"), r"(\d{1,2})\.(\d{1,2})\.(\d{4})", 1).cast("int")).cast("date")) / lit(12)).cast("int").alias("age"))
profile_df.show(5)

+-------+---+----+
|user_id|sex| age|
+-------+---+----+
|     34|  F|null|
|    102|  F|null|
|    175|  M|null|
|    243|  F|  34|
|    533|  M|  32|
+-------+---+----+
only showing top 5 rows



In [9]:
user_post_df = df.join(broadcast(profile_df), ["user_id"])\
    .select("user_id", "sex", "age", "text", "timestamp")
user_post_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [10]:
import string
print(string.punctuation)

!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~


In [11]:
pattern_punct = string.punctuation+ "\[\]\d]"
pattern_url = "http[s]?://\S+|www\.\S+"
ru_stopwords = stopwords.words('russian')
regexTokenizer = RegexTokenizer(inputCol="cleaned", outputCol="tokens", pattern=r"\s+")

In [12]:
@udf(returnType=ArrayType(StringType()))
def preprocess_udf(tokens):
    return [word for word in tokens if word not in ru_stopwords]

In [13]:
def explode_words(df):
    return regexTokenizer.transform(df.withColumn("cleaned", \
        regexp_replace(regexp_replace("text", pattern_url, " "), pattern_punct, " ")))\
        .withColumn("finished", preprocess_udf("tokens"))\
        .withColumn("word", explode("finished"))\
        .drop("text", "cleaned", "tokens", "finished")

In [14]:
result = explode_words(user_post_df).withColumn("age_group", 
        when(col("age").isNull() | (col("age") < lit(18)), lit("0-18"))
        .when(col("age").between(lit(18), lit(26)), lit("18-27"))
        .when(col("age").between(lit(27), lit(39)), lit("27-40"))
        .when(col("age").between(lit(40), lit(59)), lit("40-60"))
        .when(col("age") >= 60, lit("60")))
#         .withWatermark(eventTime = 'timestamp', delayThreshold = '1 hour')\
#         .groupBy('age_group', 'sex', window(timeColumn = 'timestamp', windowDuration = '1 hour')).count()

In [15]:
result.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- word: string (nullable = true)
 |-- age_group: string (nullable = true)



In [16]:
# result=exploded_df.withWatermark(eventTime = 'timestamp', delayThreshold = '1 hour')\
#     .groupBy('age_group', 'sex', window(timeColumn = 'timestamp', windowDuration = '1 hour')).count()

In [17]:
def get_topic_list(df: DataFrame) -> list:
    topic_list = list()
    for sex in ("F", "M"):
        for age_group in ("18", "18-27", "27-40", "40-60", "60"):
            for dur in ("1 hour", "1 day", "1 week"):
                dur_w = dur.replace(' ', '')
                topic_name = f"topic_{sex}_{age_group}_{dur_w}"
                print(topic_name)
                topic = df\
                    .where(f"sex = '{sex}' and age_group = '{age_group}'")\
                    .withWatermark(eventTime = 'timestamp', delayThreshold = dur)\
                    .groupBy(window(timeColumn = 'timestamp', windowDuration = dur), "sex", "age_group").count()\
                    .selectExpr(f"CAST (count AS STRING) AS key", "to_json(struct(*)) AS value")\
                    .writeStream\
                    .outputMode("complete")\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "kafka-svc:9092")\
                    .option("topic", topic_name)\
                    .option("checkpointLocation", os.path.join(APPS_TMP_DIR, topic_name))
                topic_dict = dict(topic=topic, sex=sex, age_group=age_group, dur=dur)
                topic_list.append(topic_dict)
    return topic_list

In [18]:
topics = get_topic_list(result)

topic_F_18_1hour
topic_F_18_1day
topic_F_18_1week
topic_F_18-27_1hour
topic_F_18-27_1day
topic_F_18-27_1week
topic_F_27-40_1hour
topic_F_27-40_1day
topic_F_27-40_1week
topic_F_40-60_1hour
topic_F_40-60_1day
topic_F_40-60_1week
topic_F_60_1hour
topic_F_60_1day
topic_F_60_1week
topic_M_18_1hour
topic_M_18_1day
topic_M_18_1week
topic_M_18-27_1hour
topic_M_18-27_1day
topic_M_18-27_1week
topic_M_27-40_1hour
topic_M_27-40_1day
topic_M_27-40_1week
topic_M_40-60_1hour
topic_M_40-60_1day
topic_M_40-60_1week
topic_M_60_1hour
topic_M_60_1day
topic_M_60_1week


In [19]:
for topic in topics:
    topic["topic_start"] = topic["topic"].start()

In [20]:
for topic in topics:
    print(topic["topic_start"].status)

{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Writing offsets to log', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
{'message': 'Processing new data', 'isDataAvailable': True, 'i

In [None]:
# spark.stop()

In [None]:
consumer = KafkaConsumer(bootstrap_servers=['kafka-svc:9092'],
                         group_id="jupyter",
                         session_timeout_ms=10000,
                         enable_auto_commit=False,
                         auto_offset_reset='earliest',
                         )
consumer.subscribe(topics='topic_M_18-27_1day')

for msg in consumer:
    print(msg.value.decode('utf-8'))

{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"сделал)","count":1}
{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"#gcam","count":1}
{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"что,","count":1}
{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"мотивации)","count":1}
{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"#february","count":1}
{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"-","count":1}
{"window":{"start":"2019-02-13T00:00:00.000Z","end":"2019-02-14T00:00:00.000Z"},"sex":"M","age_group":"18-27","word":"#snapseed","count":1}
{"window":{"start":"2019-02-13T00:00:0

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



{"window":{"start":"2019-01-02T00:00:00.000Z","end":"2019-01-03T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":22}
{"window":{"start":"2019-01-02T00:00:00.000Z","end":"2019-01-03T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":57}
{"window":{"start":"2019-01-03T00:00:00.000Z","end":"2019-01-04T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":93}
{"window":{"start":"2019-01-04T00:00:00.000Z","end":"2019-01-05T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":129}
{"window":{"start":"2019-01-02T00:00:00.000Z","end":"2019-01-03T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":57}
{"window":{"start":"2019-01-03T00:00:00.000Z","end":"2019-01-04T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":109}
{"window":{"start":"2019-01-04T00:00:00.000Z","end":"2019-01-05T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":129}
{"window":{"start":"2019-01-02T00:00:00.000Z","end":"2019-01-03T00:00:00.000Z"},"sex":"M","age_group":"18-27","count":57}
{"window":{"start":"2