In [1]:
import os
if "HADOOP_CONF_DIR" in os.environ:
   del os.environ["HADOOP_CONF_DIR"] 

In [2]:
"HADOOP_CONF_DIR" in os.environ

False

In [3]:
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField, TimestampType
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from jinja2 import Environment, FileSystemLoader


# setting constants
APP_NAME = "jupsparkapp"
NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = "-Dlog4j.configuration=file://{} -Dspark.hadoop.dfs.replication=1 -Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"\
    .format(LOG4J_PROP_FILE)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

# preparing configuration files from templates
for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template\
    .stream(logfile=LOG_FILE)\
    .dump(LOG4J_PROP_FILE)

# run spark
spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .master("k8s://https://10.32.7.103:6443")\
    .config("spark.driver.host", LOCAL_IP)\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.executor.instances", "2")\
    .config("spark.executor.cores", '3')\
    .config("spark.memory.fraction", "0.8")\
    .config("spark.memory.storageFraction", "0.6")\
    .config("spark.executor.memory", '3g')\
    .config("spark.driver.memory", "3g")\
    .config("spark.driver.maxResultSize", "1g")\
    .config("spark.kubernetes.memoryOverheadFactor", "0.3")\
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)\
    .config("spark.kubernetes.namespace", "szhumabaev-307617")\
    .config("spark.kubernetes.driver.label.appname", APP_NAME)\
    .config("spark.kubernetes.executor.label.appname", APP_NAME)\
    .config("spark.kubernetes.container.image", "node03.st:5000/spark-executor:szhumabaev-307617")\
    .config("spark.local.dir", "/tmp/spark")\
    .config("spark.driver.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.executor.extraClassPath", "/home/jovyan/shared-data/my-project-name-jar-with-dependencies.jar")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.path", "/tmp/spark")\
    .config("spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-tmp-spark.mount.readOnly", "false")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.path", "/home/jovyan/shared-data")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.path", "/nfs/shared")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.options.type", "Directory")\
    .config("spark.kubernetes.executor.volumes.hostPath.depdir.mount.readOnly", "true")\
    .getOrCreate()

# printing important urls and paths
print("Web UI: {}".format(spark.sparkContext.uiWebUrl))
print("\nlog4j file: {}".format(LOG4J_PROP_FILE))
print("\ndriver log file: {}".format(LOG_FILE))

Web UI: http://10.128.251.168:4040

log4j file: /home/jovyan/conf/pyspark-log4j-jupsparkapp.properties

driver log file: /home/jovyan/logs/pyspark-jupsparkapp.log


In [4]:
schema = StructType()\
        .add("user_id", LongType())\
        .add("sex", StringType())\
        .add("age", IntegerType())\
        .add("text", StringType())\
        .add("timestamp", TimestampType())

In [5]:
df = spark.readStream \
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka-svc:9092")\
    .option("subscribe", "main_topic")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")\
    .withColumn("age_group", 
        when(col("age").isNull() | (col("age") < lit(18)), lit("0-18"))
        .when(col("age").between(lit(18), lit(26)), lit("18-27"))
        .when(col("age").between(lit(27), lit(39)), lit("27-40"))
        .when(col("age").between(lit(40), lit(59)), lit("40-60"))
        .when(col("age") >= 60, lit("60+"))
    )

In [6]:
from pyspark.ml.feature import RegexTokenizer
import nltk
nltk.data.path.append("/home/jovyan/nltk_data")
from nltk.corpus import stopwords

pattern_punct = "[!@\"“’«»№#$%&'()*+\.,\-—/:;<=>?^_`{|}~\[\]\d]"
pattern_url = "http[s]?://\S+|www\.\S+"
ru_stopwords = stopwords.words('russian')

regexTokenizer = RegexTokenizer(inputCol="cleaned", outputCol="tokens", pattern=r"\s+")

@udf(returnType=ArrayType(StringType()))
def preprocess_udf(tokens):
    return [
        "".join(c for c in word if c.isalnum()) 
        for word in tokens 
        if word not in ru_stopwords and word[0].isalnum()
    ]

def explode_words_ru(base_df):
    return regexTokenizer.transform(
            base_df.withColumn("cleaned", regexp_replace(regexp_replace("text", pattern_url, " "), pattern_punct, " "))
        ).withColumn("finished", preprocess_udf("tokens"))\
        .withColumn("word", explode("finished"))\
        .drop("text", "cleaned", "tokens", "finished")

In [7]:
from pyspark.sql.dataframe import DataFrame

def get_queries_list(base_df: DataFrame) -> list:
    queries_list = list()
    for sex in ("F", "M"):
        for age_group in ("0-18", "18-27", "27-40", "60+"):
            for win_dur in (("1 hour", "1h"), ("1 day", "1d"), ("1 week", "1w")):
                query = explode_words_ru(base_df)\
                    .where(f"sex = '{sex}' and age_group = '{age_group}'")\
                    .groupBy(window("timestamp", win_dur[0]).alias(win_dur[1]), "sex", "age_group", "word").count()\
                    .writeStream\
                    .outputMode("update")\
                    .format("console")\
                    .option("truncate", "false")
                query_dict = dict(query=query, sex=sex, age_group=age_group, win_dur=win_dur[1])
                queries_list.append(query_dict)
    return queries_list

In [8]:
q_lst = get_queries_list(df)

In [9]:
# for q_d in [q for q in q_lst if q["sex"] == "F" and q["age_group"] == "0-18"]:
for q_d in [q for q in q_lst]:
    q_d["query_s"] = q_d["query"].start()
    # q_d["query_s"].stop()