<img align="right" width="200" height="200" src="https://static.tildacdn.com/tild6236-6337-4339-b337-313363643735/new_logo.png">

# Spark Structured Streaming I
**Андрей Титов**  
tenke.iu8@gmail.com  

## На этом занятии
+ Общие сведения
+ Rate streaming
+ File streaming
+ Kafka streaming

## Общие сведения

Системы поточной обработки данных:
- работают с непрерывным потоком данных
- нужно хранить состояние стрима
- результат обработки быстро появляется в целевой системе
- должны проектироваться с учетом требований к высокой доступности
- важная скорость обработки данных и время зажержки (лаг)

### Примеры систем поточной обработки данных

#### Карточный процессинг
- нельзя терять платежи
- нельзя дублировать платежи
- простой сервиса недопустим
- максимальное время задержки ~ 1 сек
- небольшой поток событий
- OLTP

#### Обработка логов безопасности
- потеря единичных событий допустима
- дублирование единичных событий допустимо
- простой сервиса допустим
- максимальное время задержки ~ 1 час
- большой поток событий
- OLAP

### Виды стриминг систем

#### Real-time streaming
- низкие задержки на обработку
- низкая пропускная способность
- подходят для критичных систем
- пособытийная обработка
- OLTP
- exactly once consistency (нет потери данных и нет дубликатов)

#### Micro batch streaming
- высокие задержки
- высокая пропускная способность
- не подходят для критичных систем
- обработка батчами
- OLAP
- at least once consistency (во время сбоев могут возникать дубликаты)

### Выводы:
+ Существуют два типа систем поточной обработки данных - real-time и micro-batch
+ Spark Structured Streaming является micro-batch системой
+ При работе с большими данными обычно пропускная способность важнее, чем время задержки


## Rate streaming

Самый простой способ создать стрим - использовать `rate` источник. Созданный DF является streaming, о чем нам говорит метод создания `readStream` и атрибут `isStreaming`. `rate` хорошо подходит для тестирования приложений, когда нет возможности подключится к потоку реальных данных

In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 8 pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "lab3 lr ALS app") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
spark

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType , StringType , ArrayType
import pyspark.sql.functions as F

# Data

In [25]:
! hdfs dfs -ls /labs/slaba04/

Found 1 items
-rw-r--r--   3 hdfs hdfs  655090069 2021-02-27 22:13 /labs/slaba04/gender_age_dataset.txt


In [26]:
schema = StructType([StructField('gender', StringType()), 
                     StructField('age', StringType()),
                     StructField('uid', StringType()),
                     StructField('user_json', StringType())
                    ]
                   )
train_data = spark.read.format("csv")\
                       .option("inferSchema", "true")\
                       .schema(schema)\
                       .option("header", "true")\
                       .option("delimiter", "\\t")\
                       .load("/labs/slaba04/gender_age_dataset.txt")
visits_schema = StructType([
    StructField("visits", ArrayType(
      StructType([
          StructField("url", StringType()),
          StructField("timestamp", LongType())
      ])
   ))
]) 
train_data = train_data.withColumn('visits', 
                                   F.from_json(F.col('user_json'),
                                               schema=visits_schema)
                                              )

In [27]:
train_data.groupby('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|17440|
|     M|18698|
|     -| 5000|
+------+-----+



In [28]:
train_data.groupby('age').count().show()

+-----+-----+
|  age|count|
+-----+-----+
| >=55| 1679|
|45-54| 4744|
|    -| 5000|
|35-44| 9360|
|25-34|15457|
|18-24| 4898|
+-----+-----+



In [29]:
train_data = train_data.withColumn('urls' , train_data['visits']['visits']['url'] )

In [30]:
train_data = train_data.withColumn("urls_hosts", F.expr("transform(urls, x -> parse_url(x, 'HOST' ))"))

In [31]:
# train_data.show()

# train_data.limit(100).toPandas()['urls_hosts'][0]

# train_data.limit(100).toPandas()['urls'][0]

In [32]:
train_data.limit(100).toPandas()['urls_hosts'][0]

  Unsupported type in conversion to Arrow: StructType(List(StructField(visits,ArrayType(StructType(List(StructField(url,StringType,true),StructField(timestamp,LongType,true))),true),true)))
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


['zebra-zoya.ru',
 'news.yandex.ru',
 'www.sotovik.ru',
 'news.yandex.ru',
 'www.sotovik.ru']

In [33]:
train_data.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- user_json: string (nullable = true)
 |-- visits: struct (nullable = true)
 |    |-- visits: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- timestamp: long (nullable = true)
 |-- urls: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- urls_hosts: array (nullable = true)
 |    |-- element: string (containsNull = true)



# ОБучаем модели

In [34]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Pipeline

In [35]:
train_data

DataFrame[gender: string, age: string, uid: string, user_json: string, visits: struct<visits:array<struct<url:string,timestamp:bigint>>>, urls: array<string>, urls_hosts: array<string>]

In [36]:
train_data = train_data.withColumn("gender_label", F.when(F.col("gender")=='F', 1).otherwise(0))

In [37]:
train_data = train_data.withColumn("age_label", F.when(F.col("age")=='18-24', 0) \
                                   .when(F.col("age")=='25-34', 1).when(F.col("age")=='35-44', 2) \
                                   .when(F.col("age")=='45-54', 3).otherwise(4))

In [43]:
train_data = train_data.cache()

In [46]:
urls = train_data.rdd.flatMap(lambda x: x[6])
num_urls = urls.groupBy(lambda x: x).count()

print(num_urls)
print(urls.take(5))

119009


In [133]:
# возьмем поменьше num_urls = 10000

num_urls = 5000

In [134]:
hashingTF = HashingTF(inputCol="urls_hosts", outputCol="rawFeatures", numFeatures=num_urls)
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [135]:
train_data

DataFrame[gender: string, age: string, uid: string, user_json: string, visits: struct<visits:array<struct<url:string,timestamp:bigint>>>, urls: array<string>, urls_hosts: array<string>, gender_label: int, age_label: int]

## Гендер

In [159]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [137]:
lr_gender = LogisticRegression(featuresCol="features",
                            rawPredictionCol='rawPred_gender',
                            predictionCol='pred_gender',
                            labelCol="gender_label",
                            maxIter=100, regParam=0.5)

In [138]:
pipe_gender = Pipeline(stages=[hashingTF, idf, lr_gender])

In [139]:
train = train_data.sampleBy("gender_label", fractions={0: 0.8, 1: 0.8}, seed=42)
test = train_data.join(train, on="uid", how="leftanti")

In [140]:
pipeline_gender = pipe_gender.fit(train)

In [141]:
pred_gender = pipeline_gender.transform(test)
print(pred_gender.columns)

['uid', 'gender', 'age', 'user_json', 'visits', 'urls', 'urls_hosts', 'gender_label', 'age_label', 'rawFeatures', 'features', 'rawPred_gender', 'probability', 'pred_gender']


In [142]:
evaluator_gender = BinaryClassificationEvaluator(rawPredictionCol="probability",
                                          labelCol="gender_label", metricName='areaUnderROC')

In [143]:
evaluator_gender.evaluate(pred_gender)

0.6395395682104358

## Возраст

In [173]:
lr_age = LogisticRegression(featuresCol="features",
                            rawPredictionCol='rawPred_age',
                            predictionCol='pred_age',
                            labelCol="age_label",
                            maxIter=100, regParam=0.5)

In [174]:
pipe_age = Pipeline(stages=[hashingTF, idf, lr_age])

In [175]:
train = train_data.sampleBy("age_label", fractions={0: 0.8, 1: 0.8, 2:0.8, 3:0.8, 4:0.8}, seed=42)
test = train_data.join(train, on="uid", how="leftanti")

In [176]:
pipeline_age = pipe_age.fit(train)

In [177]:
pred_age = pipeline_age.transform(test)
print(pred_age.columns)

['uid', 'gender', 'age', 'user_json', 'visits', 'urls', 'urls_hosts', 'gender_label', 'age_label', 'rawFeatures', 'features', 'rawPred_age', 'probability', 'pred_age']


In [181]:
evaluator_age = MulticlassClassificationEvaluator(labelCol="age_label",
                                                  predictionCol="pred_age", metricName="f1")

In [183]:
evaluator_age.evaluate(pred_age)

0.2411948580754272

#  Стримингоыфй вариант

In [None]:
# kafka_params = {
#     "kafka.bootstrap.servers": "localhost:9092",
#     "subscribe": "test_topic0",
#     "startingOffsets": """earliest""",
#     "maxOffsetsPerTrigger": "5"
# }

# sdf = spark.readStream.format("kafka").options(**kafka_params).load()
# parsed_sdf = sdf.select(col("value").cast("string"), col("topic"), col("partition"), col("offset"))

# sink = create_console_sink(parsed_sdf)

# sq = sink.start()

# def create_console_sink_with_checkpoint(chk_name, df): 
#     return df \
#         .writeStream \
#         .format("console") \
#         .trigger(processingTime="10 seconds") \
#         .option("checkpointLocation", "chk/{n}".format(n=chk_name)) \
#         .option("truncate", "false") \
#         .option("numRows", "20")

# sink = create_console_sink_with_checkpoint("test0", parsed_sdf)
# sq = sink.start()

In [45]:
spark.stop()

In [46]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "lab3 lr ALS app") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [47]:
spark

In [48]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-node-1.newprolab.com:6667',
    "subscribe": "input_alexander.okhilkov",
    "startingOffsets": "latest"
}

# read readStream
kafka_sdf_sh = spark.readStream.format("kafka").options(**read_kafka_params).option("failOnDataLoss", 'False').load()

In [49]:
kafka_sdf_sh

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [50]:
kafka_sdf_sh = kafka_sdf_sh.withColumn("value", kafka_sdf_sh["value"].cast("string"))

In [51]:
schema = StructType([StructField('gender', StringType()), 
                     StructField('age', StringType()),
                     StructField('uid', StringType()),
                     StructField('user_json', StringType())
                    ]
                   )

kafka_sdf_sh = kafka_sdf_sh.withColumn('value_parsed', 
                                   F.from_json(F.col('value'),
                                               schema=schema)
                                      )


# kafka_sdf_sh=kafka_sdf_sh.withColumn('gender', kafka_sdf_sh['value_parsed']['gender'])
# kafka_sdf_sh=kafka_sdf_sh.withColumn('age', kafka_sdf_sh['value_parsed']['age'])
kafka_sdf_sh=kafka_sdf_sh.withColumn('uid', kafka_sdf_sh['value_parsed']['uid'])
kafka_sdf_sh=kafka_sdf_sh.withColumn('user_json', kafka_sdf_sh['value_parsed']['user_json'])




visits_schema = StructType([
    StructField("visits", ArrayType(
      StructType([
          StructField("url", StringType()),
          StructField("timestamp", LongType())
      ])
   ))
]) 
kafka_sdf_sh = kafka_sdf_sh.withColumn('visits', 
                                   F.from_json(F.col('user_json'),
                                               schema=visits_schema)
                                              )

kafka_sdf_sh = kafka_sdf_sh.withColumn('urls' , kafka_sdf_sh['visits']['visits']['url'] )
kafka_sdf_sh = kafka_sdf_sh.withColumn("urls_hosts", F.expr("transform(urls, x -> parse_url(x, 'HOST' ))"))

In [52]:
kafka_sdf_sh

DataFrame[key: binary, value: string, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int, value_parsed: struct<gender:string,age:string,uid:string,user_json:string>, uid: string, user_json: string, visits: struct<visits:array<struct<url:string,timestamp:bigint>>>, urls: array<string>, urls_hosts: array<string>]

In [53]:
kafka_sdf_sh = kafka_sdf_sh.select(['uid']).withColumn('gender', F.lit('M'))\
                            .withColumn('age', F.lit('25-34'))

In [54]:
kafka_sdf_sh = kafka_sdf_sh.withColumn('key',  F.lit(None) )
kafka_sdf_sh = kafka_sdf_sh.withColumn('value',  F.to_json(F.struct("uid","gender","age")) )                                               

In [55]:
kafka_sdf_sh = kafka_sdf_sh.select('key' , 'value')

In [56]:
kafka_sdf_sh

DataFrame[key: null, value: string]

In [57]:
sink = kafka_sdf_sh \
        .select('key' , 'value').selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", 'spark-node-1.newprolab.com:6667') \
        .option("topic", 'alexander.okhilkov') \
        .option("checkpointLocation", "streaming/chk/chk_kafka/rr" ) \
        .outputMode("append")

# .trigger(processingTime="10 seconds") \
#         .option("checkpointLocation", "streaming/chk/chk_kafka" ) \
#         .option("truncate", "false") \

In [58]:
sq = sink.start()

In [59]:
sq.isActive

True

In [60]:
sq.lastProgress

In [61]:
#sq.stop()

In [70]:
sq.isActive

True

In [71]:
sq.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [72]:
sq.lastProgress

{'id': '343accf0-e41e-4888-a9fd-bbf925627256',
 'runId': '6f82bb97-d49e-402d-84a1-62e8b387a9fc',
 'name': None,
 'timestamp': '2021-03-22T20:28:44.196Z',
 'batchId': 41,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'getEndOffset': 0, 'setOffsetRange': 2, 'triggerExecution': 2},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[input_alexander.okhilkov]]',
   'startOffset': {'input_alexander.okhilkov': {'0': 10000}},
   'endOffset': {'input_alexander.okhilkov': {'0': 10000}},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider@1d03e1f'}}

In [73]:
sq.recentProgress

[{'id': '343accf0-e41e-4888-a9fd-bbf925627256',
  'runId': '6f82bb97-d49e-402d-84a1-62e8b387a9fc',
  'name': None,
  'timestamp': '2021-03-22T20:27:50.485Z',
  'batchId': 1,
  'numInputRows': 1,
  'processedRowsPerSecond': 0.0759589821496392,
  'durationMs': {'addBatch': 12620,
   'getBatch': 10,
   'queryPlanning': 64,
   'triggerExecution': 13165},
  'stateOperators': [],
  'sources': [{'description': 'KafkaV2[Subscribe[input_alexander.okhilkov]]',
    'startOffset': {'input_alexander.okhilkov': {'0': 5000}},
    'endOffset': {'input_alexander.okhilkov': {'0': 5001}},
    'numInputRows': 1,
    'processedRowsPerSecond': 0.0759589821496392}],
  'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider@1d03e1f'}},
 {'id': '343accf0-e41e-4888-a9fd-bbf925627256',
  'runId': '6f82bb97-d49e-402d-84a1-62e8b387a9fc',
  'name': None,
  'timestamp': '2021-03-22T20:28:03.711Z',
  'batchId': 2,
  'numInputRows': 405,
  'inputRowsPerSecond': 30.621503099954634,
  'processedRowsPe

In [37]:
def kill_all():
    streams = SparkSession.builder.getOrCreate().streams.active
    for s in streams:
        desc = s.lastProgress["sources"][0]["description"]
        s.stop()
        print("Stopped {s}".format(s=desc))

In [38]:
kill_all()

# for each batch

In [None]:
spark.stop()

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "lab3 lr ALS app") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [6]:
spark

In [7]:
# read_kafka_params = {
#     "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
#     "subscribe": "input_alexander.okhilkov",
#     "startingOffsets": "latest"
# }


read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-node-1.newprolab.com:6667',
    "subscribe": "input_alexander.okhilkov",
    "startingOffsets": "latest"
}

# read readStream
kafka_sdf_sh = spark.readStream.format("kafka").options(**read_kafka_params).option("failOnDataLoss", 'False').load()


# "maxOffsetsPerTrigger": "100"

In [8]:
kafka_sdf_sh

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [9]:
schema = StructType([StructField('gender', StringType()), 
                     StructField('age', StringType()),
                     StructField('uid', StringType()),
                     StructField('user_json', StringType())
                    ])

visits_schema = StructType([
    StructField("visits", ArrayType(
      StructType([
          StructField("url", StringType()),
          StructField("timestamp", LongType())
      ])
   ))
]) 


def create_predictions(df, epoch_id): # epoch_id batch_id
    df = df.withColumn("value", df["value"].cast("string"))
    df = df.withColumn('value_parsed', F.from_json(F.col('value'), schema=schema))
    
    df=df.withColumn('uid', df['value_parsed']['uid'])
#     df=df.withColumn('user_json', df['value_parsed']['user_json'])
    
#     df = df.withColumn('visits', F.from_json(F.col('user_json'), schema=visits_schema))
    
#     df = df.withColumn('urls' , df['visits']['visits']['url'] )
#     df = df.withColumn("urls_hosts", F.expr("transform(urls, x -> parse_url(x, 'HOST' ))"))

    df = df.select(['uid']).withColumn('gender', F.lit('M'))\
                            .withColumn('age', F.lit('25-34'))
    print(df)
    
    df = df.withColumn('key',  F.lit(None) )
    df = df.withColumn('value',  F.to_json(F.struct("uid","gender","age")) )  
    print(df)
    
    df = df.select('key' , 'value')
    
    print(df)
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    return df

In [10]:
kafka_sdf_sh

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [11]:
sink = kafka_sdf_sh \
        .writeStream \
        .foreachBatch(create_predictions) \
        .format("kafka") \
        .option("kafka.bootstrap.servers", 'spark-node-1.newprolab.com:6667') \
        .option("topic", 'alexander.okhilkov') \
        .option("checkpointLocation", "streaming/chk/chk_kafka/oooou" ) \
        .outputMode("append")

# .trigger(processingTime="10 seconds") \
#         .option("checkpointLocation", "streaming/chk/chk_kafka" ) \
#         .option("truncate", "false") \
# .select('key' , 'value').selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

# spark-node-1.newprolab.com
#         .option("kafka.bootstrap.servers", 'spark-master-1.newprolab.com:6667') \

In [12]:
sq = sink.start()

In [13]:
sq.isActive

True

In [14]:
sq.lastProgress

In [15]:
#sq.stop()

In [20]:
sq.isActive

True

In [21]:
sq.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [37]:
sq.lastProgress

{'id': '917d1b3b-0691-4684-9cfc-8990bdf3d89c',
 'runId': '8f317553-4356-4498-b5fa-86b789957e3d',
 'name': None,
 'timestamp': '2021-03-22T20:09:32.414Z',
 'batchId': 41,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'getEndOffset': 0, 'setOffsetRange': 1, 'triggerExecution': 2},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[input_alexander.okhilkov]]',
   'startOffset': {'input_alexander.okhilkov': {'0': 5000}},
   'endOffset': {'input_alexander.okhilkov': {'0': 5000}},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider@ac86893'}}

In [38]:
sq.recentProgress

[{'id': '917d1b3b-0691-4684-9cfc-8990bdf3d89c',
  'runId': '8f317553-4356-4498-b5fa-86b789957e3d',
  'name': None,
  'timestamp': '2021-03-22T20:08:06.641Z',
  'batchId': 0,
  'numInputRows': 0,
  'processedRowsPerSecond': 0.0,
  'durationMs': {'addBatch': 1710,
   'getBatch': 16,
   'getEndOffset': 4,
   'queryPlanning': 1611,
   'setOffsetRange': 4717,
   'triggerExecution': 8704,
   'walCommit': 299},
  'stateOperators': [],
  'sources': [{'description': 'KafkaV2[Subscribe[input_alexander.okhilkov]]',
    'startOffset': None,
    'endOffset': {'input_alexander.okhilkov': {'0': 0}},
    'numInputRows': 0,
    'processedRowsPerSecond': 0.0}],
  'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider@ac86893'}},
 {'id': '917d1b3b-0691-4684-9cfc-8990bdf3d89c',
  'runId': '8f317553-4356-4498-b5fa-86b789957e3d',
  'name': None,
  'timestamp': '2021-03-22T20:08:15.413Z',
  'batchId': 1,
  'numInputRows': 0,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 0.0,
  

In [None]:
def kill_all():
    streams = SparkSession.builder.getOrCreate().streams.active
    for s in streams:
        desc = s.lastProgress["sources"][0]["description"]
        s.stop()
        print("Stopped {s}".format(s=desc))

In [None]:
kill_all()

In [42]:
KAFKA_BOOTSTRAP_SERVER = 'spark-node-1.newprolab.com:6667'
# KAFKA_BOOTSTRAP_SERVER = 'spark-master-1.newprolab.com:6667'
INPUT_KAFKA_TOPIC = 'input_alexander.okhilkov'
OUTPUT_KAFKA_TOPIC = 'alexander.okhilkov'

In [43]:
read_kafka_params = {
    'kafka.bootstrap.servers': KAFKA_BOOTSTRAP_SERVER,
    'subscribe': OUTPUT_KAFKA_TOPIC,
    'startingOffsets': 'earliest'
}
kafka_sdf = (
    spark
    .read
    .format('kafka')
    .options(**read_kafka_params)
    .load()
    .cache()
)
kafka_sdf.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [44]:
kafka_sdf.limit(10).show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



#  Стримингоыфй вариант + МОДЕЛЬ

In [45]:
spark.stop()

In [46]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.app.name", "lab3 lr ALS app") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [47]:
spark

In [48]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-node-1.newprolab.com:6667',
    "subscribe": "input_alexander.okhilkov",
    "startingOffsets": "latest"
}

# read readStream
kafka_sdf_sh = spark.readStream.format("kafka").options(**read_kafka_params).option("failOnDataLoss", 'False').load()

In [49]:
kafka_sdf_sh

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [50]:
kafka_sdf_sh = kafka_sdf_sh.withColumn("value", kafka_sdf_sh["value"].cast("string"))

In [51]:
schema = StructType([StructField('gender', StringType()), 
                     StructField('age', StringType()),
                     StructField('uid', StringType()),
                     StructField('user_json', StringType())
                    ]
                   )

kafka_sdf_sh = kafka_sdf_sh.withColumn('value_parsed', 
                                   F.from_json(F.col('value'),
                                               schema=schema)
                                      )


# kafka_sdf_sh=kafka_sdf_sh.withColumn('gender', kafka_sdf_sh['value_parsed']['gender'])
# kafka_sdf_sh=kafka_sdf_sh.withColumn('age', kafka_sdf_sh['value_parsed']['age'])
kafka_sdf_sh=kafka_sdf_sh.withColumn('uid', kafka_sdf_sh['value_parsed']['uid'])
kafka_sdf_sh=kafka_sdf_sh.withColumn('user_json', kafka_sdf_sh['value_parsed']['user_json'])




visits_schema = StructType([
    StructField("visits", ArrayType(
      StructType([
          StructField("url", StringType()),
          StructField("timestamp", LongType())
      ])
   ))
]) 
kafka_sdf_sh = kafka_sdf_sh.withColumn('visits', 
                                   F.from_json(F.col('user_json'),
                                               schema=visits_schema)
                                              )

kafka_sdf_sh = kafka_sdf_sh.withColumn('urls' , kafka_sdf_sh['visits']['visits']['url'] )
kafka_sdf_sh = kafka_sdf_sh.withColumn("urls_hosts", F.expr("transform(urls, x -> parse_url(x, 'HOST' ))"))

In [52]:
kafka_sdf_sh = pipeline_age.transform(kafka_sdf_sh)
kafka_sdf_sh = kafka_sdf_sh.withColumn("age", F.when(F.col("pred_age")==0, '18-24') \
                                       .when(F.col("pred_age")==1, '25-34') \
                                       .when(F.col("pred_age")==2, '35-44') \
                                       .when(F.col("pred_age")==3, '45-54') \
                                       .otherwise(">=55")).select(["uid", "age", "urls_hosts"])
kafka_sdf_sh = pipeline_gender.transform(kafka_sdf_sh)
kafka_sdf_sh = kafka_sdf_sh.withColumn("gender", F.when(F.col("pred_gender")==1, 'F').otherwise('M'))


kafka_sdf_sh

DataFrame[key: binary, value: string, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int, value_parsed: struct<gender:string,age:string,uid:string,user_json:string>, uid: string, user_json: string, visits: struct<visits:array<struct<url:string,timestamp:bigint>>>, urls: array<string>, urls_hosts: array<string>]

In [53]:
# kafka_sdf_sh = kafka_sdf_sh.select(['uid']).withColumn('gender', F.lit('M'))\
#                             .withColumn('age', F.lit('25-34'))

In [54]:
kafka_sdf_sh = kafka_sdf_sh.withColumn('key',  F.lit(None) )
kafka_sdf_sh = kafka_sdf_sh.withColumn('value',  F.to_json(F.struct("uid","gender","age")) )                                               

In [55]:
kafka_sdf_sh = kafka_sdf_sh.select('key' , 'value')

In [56]:
kafka_sdf_sh

DataFrame[key: null, value: string]

In [57]:
sink = kafka_sdf_sh \
        .select('key' , 'value').selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", 'spark-node-1.newprolab.com:6667') \
        .option("topic", 'alexander.okhilkov') \
        .option("checkpointLocation", "streaming/chk/chk_kafka/rr" ) \
        .outputMode("append")

# .trigger(processingTime="10 seconds") \
#         .option("checkpointLocation", "streaming/chk/chk_kafka" ) \
#         .option("truncate", "false") \

In [58]:
sq = sink.start()

In [59]:
sq.isActive

True

In [60]:
sq.lastProgress

In [61]:
#sq.stop()

In [70]:
sq.isActive

True

In [71]:
sq.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [72]:
sq.lastProgress

{'id': '343accf0-e41e-4888-a9fd-bbf925627256',
 'runId': '6f82bb97-d49e-402d-84a1-62e8b387a9fc',
 'name': None,
 'timestamp': '2021-03-22T20:28:44.196Z',
 'batchId': 41,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'getEndOffset': 0, 'setOffsetRange': 2, 'triggerExecution': 2},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[input_alexander.okhilkov]]',
   'startOffset': {'input_alexander.okhilkov': {'0': 10000}},
   'endOffset': {'input_alexander.okhilkov': {'0': 10000}},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider@1d03e1f'}}

In [73]:
sq.recentProgress

[{'id': '343accf0-e41e-4888-a9fd-bbf925627256',
  'runId': '6f82bb97-d49e-402d-84a1-62e8b387a9fc',
  'name': None,
  'timestamp': '2021-03-22T20:27:50.485Z',
  'batchId': 1,
  'numInputRows': 1,
  'processedRowsPerSecond': 0.0759589821496392,
  'durationMs': {'addBatch': 12620,
   'getBatch': 10,
   'queryPlanning': 64,
   'triggerExecution': 13165},
  'stateOperators': [],
  'sources': [{'description': 'KafkaV2[Subscribe[input_alexander.okhilkov]]',
    'startOffset': {'input_alexander.okhilkov': {'0': 5000}},
    'endOffset': {'input_alexander.okhilkov': {'0': 5001}},
    'numInputRows': 1,
    'processedRowsPerSecond': 0.0759589821496392}],
  'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider@1d03e1f'}},
 {'id': '343accf0-e41e-4888-a9fd-bbf925627256',
  'runId': '6f82bb97-d49e-402d-84a1-62e8b387a9fc',
  'name': None,
  'timestamp': '2021-03-22T20:28:03.711Z',
  'batchId': 2,
  'numInputRows': 405,
  'inputRowsPerSecond': 30.621503099954634,
  'processedRowsPe

In [37]:
def kill_all():
    streams = SparkSession.builder.getOrCreate().streams.active
    for s in streams:
        desc = s.lastProgress["sources"][0]["description"]
        s.stop()
        print("Stopped {s}".format(s=desc))

In [38]:
kill_all()