In [None]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 5 --executor-memory 4g --executor-cores 1 --driver-memory 2g pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "Konstantin Diakvnishvili lab 4 app") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
spark

In [None]:
# Библиотеки 
import pyspark.sql.types as t
import pyspark.sql.functions as f
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.param.shared import HasOutputCol, HasInputCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml import Transformer
from pyspark import keyword_only

In [None]:
# формируем spark DF
path = '/labs/slaba04/gender_age_dataset.txt'

schema = t.StructType(fields=[
    t.StructField('gender', t.StringType()),
    t.StructField('age', t.StringType()),
    t.StructField('uid', t.StringType()),
    t.StructField('user_json', t.StringType()),
])

train_data = spark.read.csv(path, header=True, schema=schema, sep='\t')

In [None]:
# схема для json с визитами
visit_schema = t.ArrayType(
    t.StructType([
        t.StructField('url', t.StringType(), True),
        t.StructField('timestamp', t.LongType(), True)
    ])
)

In [None]:
# фильтруем, парсим json, достаем url
train_data = train_data.filter(train_data.age != '-') \
                       .filter(train_data.gender != '-') \
                       .withColumn('visits', f.from_json(f.col('user_json'), visits_schema )) \
                       .withColumn('url', f.col('visits.visits.url')) \
                       .drop('visits', 'user_json')
train_data.printSchema()

In [None]:
# модели индексации пола и возраста
indexerG = StringIndexer(inputCol='gender', outputCol='gender_i')
indexerA = StringIndexer(inputCol='age', outputCol='age_i')
indexModelG = indexerG.fit(train_data)
indexModelA = indexerA.fit(train_data)

In [None]:
# траснформер, парсит url'ы. Удаляет исходный столбец т.к. gropBy слишком долго отрабатывает
# DefaultParamsReadable, DefaultParamsWritable - для сохранения модели
class ParseURLTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(ParseURLTransformer, self).__init__()
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)
        
    def _transform(self, dataset):
        res = dataset.withColumn('url_inter', f.explode(self.getInputCol())) \
                          .withColumn('url_inter', f.expr('parse_url(url_inter, "HOST")')) \
                          .withColumn('url_inter', f.lower(f.col('url_inter'))) \
                          .drop(self.getInputCol())
        
        res_col_list = res.columns
        res_col_list.remove('url_inter')
        res = res.groupBy(res_col_list) \
                 .agg(f.collect_list('url_inter').alias(self.getOutputCol())) \
#                 .withColumn("timestamp", f.current_timestamp()) \
#                 .withWatermark("timestamp", "10 minutes")
        return res

In [None]:
urlpt = ParseURLTransformer(inputCol='url', outputCol='url_parsed')

In [None]:
# hasher
hasher = HashingTF(numFeatures=130000, binary=False, inputCol="url_parsed", outputCol="url_freq")

In [None]:
# RandomForest. Отдельно для пола и возраста
forestG = RandomForestClassifier(featuresCol='url_freq', labelCol='gender_i',
                                 predictionCol='predictionG', probabilityCol='probabilityG', rawPredictionCol='rawPredictionG')
forestA = RandomForestClassifier(featuresCol='url_freq', labelCol='age_i',  \
                                 predictionCol='predictionA', probabilityCol='probabilityA', rawPredictionCol='rawPredictionA')

In [None]:
# Возвращают индексы возраста и пола обратно в string
stringerG = IndexToString(inputCol='predictionG', outputCol='gender_s', labels=indexModelG.labels)
stringerA = IndexToString(inputCol='predictionA', outputCol='age_s', labels=indexModelA.labels)

In [None]:
# пайплан
pipeline = Pipeline(stages=[
    urlpt,
    hasher,
    forestG,
    forestA,
    stringerG,
    stringerA
])

In [None]:
# индексация
df_visits = indexModelG.transform(train_data)
df_visits = indexModelA.transform(df_visits)
df_visits.printSchema()

In [None]:
# обучение
pipeline_model = pipeline.fit(df_visits)

In [None]:
# параметры для стрима
KAFKA_BOOTSTRAP_SERVER = 'spark-master-1.newprolab.com:6667'
KAFKA_INPUT_TOPIC = 'input_konstantin.diakvnishvili'
KAFKA_OUTPUT_TOPIC = 'konstantin.diakvnishvili'

In [None]:
# чтение стрима
kafka_stream = (
    spark
    .readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVER)
    .option('subscribe', KAFKA_INPUT_TOPIC)
    .option('startingOffsets', 'earliest')
    .option('failOnDataLoss', 'False')
    .load()
)

In [None]:
# схемы для входного json'а
event_schema = t.StructType([
    t.StructField('uid', t.StringType(), True),
    t.StructField('visits', t.StringType(), True),
])

visit_schema2 = t.ArrayType(
    t.StructType([
        t.StructField('url', t.StringType(), True),
        t.StructField('timestamp', t.LongType(), True)
    ])
)

In [None]:
# парсим входной стрим, парсим json, достаем url
parsed_sdf = kafka_stream.select(f.col('value').cast('string').alias('value')) \
                         .select(f.from_json(f.col('value'), event_schema).alias('event')) \
                         .select( 'event.uid', f.from_json(f.col('event.visits'), visit_schema2).alias('visits')) \
                         .withColumn( 'url', f.col('visits.url')) \
                         .drop('visits')
parsed_sdf.printSchema()

In [None]:
# прогнзируем
predictions_df_test = pipeline_model.transform(parsed_sdf)
predictions_df_test.printSchema()

In [None]:
# выделяем нужные столбцы
predictions_df = predictions_df_test.select('uid', f.col('gender_s').alias('gender'), f.col('age_s').alias('age'))
predictions_df.printSchema()

In [None]:
# запись 
# outputMode="complete" т.к. есть агрегация
kafka_write_stream = (
    predictions_df
    .select(f.to_json(f.struct(*predictions_df.columns)).alias('value'))
    .writeStream
    .format("kafka")
    .outputMode("complete")
    .option("checkpointLocation", "checkpoints/checkpoints_lab04")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("topic", KAFKA_OUTPUT_TOPIC)
)
kafka_write_stream.start()

In [None]:
def kill_all():
    streams = SparkSession.builder.getOrCreate().streams.active
    if streams:
        for s in streams:
            desc = s.lastProgress["sources"][0]["description"]
            s.stop()
            print("Stopped {s}".format(s=desc))

In [None]:
#убиваем стрим
kill_all()

In [None]:
spark.stop()