<img align="right" width="200" height="200" src="https://static.tildacdn.com/tild6236-6337-4339-b337-313363643735/new_logo.png">

# Spark Structured Streaming I
**Андрей Титов**  
tenke.iu8@gmail.com  

## На этом занятии
+ Общие сведения
+ Rate streaming
+ File streaming
+ Kafka streaming

## Общие сведения

Системы поточной обработки данных:
- работают с непрерывным потоком данных
- нужно хранить состояние стрима
- результат обработки быстро появляется в целевой системе
- должны проектироваться с учетом требований к высокой доступности
- важная скорость обработки данных и время зажержки (лаг)

### Примеры систем поточной обработки данных

#### Карточный процессинг
- нельзя терять платежи
- нельзя дублировать платежи
- простой сервиса недопустим
- максимальное время задержки ~ 1 сек
- небольшой поток событий
- OLTP

#### Обработка логов безопасности
- потеря единичных событий допустима
- дублирование единичных событий допустимо
- простой сервиса допустим
- максимальное время задержки ~ 1 час
- большой поток событий
- OLAP

### Виды стриминг систем

#### Real-time streaming
- низкие задержки на обработку
- низкая пропускная способность
- подходят для критичных систем
- пособытийная обработка
- OLTP
- exactly once consistency (нет потери данных и нет дубликатов)

#### Micro batch streaming
- высокие задержки
- высокая пропускная способность
- не подходят для критичных систем
- обработка батчами
- OLAP
- at least once consistency (во время сбоев могут возникать дубликаты)

### Выводы:
+ Существуют два типа систем поточной обработки данных - real-time и micro-batch
+ Spark Structured Streaming является micro-batch системой
+ При работе с большими данными обычно пропускная способность важнее, чем время задержки


## Rate streaming

Самый простой способ создать стрим - использовать `rate` источник. Созданный DF является streaming, о чем нам говорит метод создания `readStream` и атрибут `isStreaming`. `rate` хорошо подходит для тестирования приложений, когда нет возможности подключится к потоку реальных данных

In [3]:
val sdf = spark.readStream.format("rate").load

sdf = [timestamp: timestamp, value: bigint]


[timestamp: timestamp, value: bigint]

In [4]:
sdf.isStreaming

true

У `sdf`, как и у любого DF, есть схема и план выполнения:

In [20]:
sdf.printSchema
sdf.explain(true)

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1a90d227, rate, [timestamp#4, value#5L]

== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint
StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1a90d227, rate, [timestamp#4, value#5L]

== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1a90d227, rate, [timestamp#4, value#5L]

== Physical Plan ==
StreamingRelation rate, [timestamp#4, value#5L]


В отличии от обычных DF, у `sdf` нет таких методов, как `show`, `collect`, `take`. Для них также недоступен Dataset API. Поэтому для того, чтобы посмотреть их содержимое, мы должны использовать `console` синк и создать `StreamingQuery`. Процессинг начинается только после вызова метода `start`. `trigger` позволяет настроить, как часто стрим будет читать новые данные и обрабатывать их

In [19]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createConsoleSink(df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSink: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [None]:
val sink = createConsoleSink(sdf)

In [32]:
val sq = sink.start

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+



sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@53560ece


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@53560ece

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2020-06-07 15:40:41.723|0    |
|2020-06-07 15:40:42.723|1    |
|2020-06-07 15:40:43.723|2    |
|2020-06-07 15:40:44.723|3    |
|2020-06-07 15:40:45.723|4    |
|2020-06-07 15:40:46.723|5    |
|2020-06-07 15:40:47.723|6    |
|2020-06-07 15:40:48.723|7    |
+-----------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2020-06-07 15:40:49.723|8    |
|2020-06-07 15:40:50.723|9    |
|2020-06-07 15:40:51.723|10   |
|2020-06-07 15:40:52.723|11   |
|2020-06-07 15:40:53.723|12   |
|2020-06-07 15:40:54.723|13   |
|2020-06-07 15:40:55.723|14   |
|2020-06-07 15:40:56.723|15   |
|2020-06-07 15:40:57.723|16   |
|2020-06-07 15:40:58.723|17   |
+----

Чтобы остановить DF, можно вызвать метод `stop` к `sdf`, либо получить список всех streming DF и остановить их:

In [24]:
import org.apache.spark.sql.SparkSession

def killAll() = {
    SparkSession
        .active
        .streams
        .active
        .foreach { x =>
                    val desc = x.lastProgress.sources.head.description
                    x.stop
                    println(s"Stopped ${desc}")
        }               
}

killAll: ()Unit


In [25]:
killAll()

Stopped KafkaV2[Subscribe[test_topic0]]


Создадим стрим, выполняющий запись в `parquet` файл:

In [46]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createParquetSink(df: DataFrame, 
                      fileName: String) = {
    df
    .writeStream
    .format("parquet")
    .option("path", s"datasets/$fileName")
    .option("checkpointLocation", s"chk/$fileName")
    .trigger(Trigger.ProcessingTime("10 seconds"))
}

lastException: Throwable = null
createParquetSink: (df: org.apache.spark.sql.DataFrame, fileName: String)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [47]:
val sink = createParquetSink(sdf, "s1.parquet")

sink = org.apache.spark.sql.streaming.DataStreamWriter@343c5ff0


org.apache.spark.sql.streaming.DataStreamWriter@343c5ff0

In [48]:
val sq = sink.start

sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@188efa2d


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@188efa2d

In [None]:
Убедимся, что стрим пишется в файл:

In [51]:
import sys.process._
"ls -alh datasets/s1.parquet".!!

"total 1520
drwxr-xr-x  193 t3nq  staff   6.0K Jun  7 15:56 .
drwxr-xr-x   17 t3nq  staff   544B Jun  7 15:52 ..
-rw-r--r--    1 t3nq  staff    16B Jun  7 15:56 .part-00000-0586f3cc-6308-4ea8-b8b0-17b738a477bc-c000.snappy.parquet.crc
-rw-r--r--    1 t3nq  staff    12B Jun  7 15:54 .part-00000-07eb2750-a44b-470d-83c8-0f2933e42402-c000.snappy.parquet.crc
-rw-r--r--    1 t3nq  staff    16B Jun  7 15:56 .part-00000-499b60b1-359e-4085-99af-93b0dd20d702-c000.snappy.parquet.crc
-rw-r--r--    1 t3nq  staff    16B Jun  7 15:55 .part-00000-58c09c80-2f1d-4f04-8970-2673401838f6-c000.snappy.parquet.crc
-rw-r--r--    1 t3nq  staff    16B Jun  7 15:55 .part-00000-5a84c643-5909-4598-ac0f-7a5f047d10aa-c000.snappy.parquet.crc
-rw-r--r--    1 t3nq  staff    16B Jun  7 ...


Прочитаем файл с помощью Spark:

In [58]:
val rates = spark.read.parquet("datasets/s1.parquet")
println(rates.count)
rates.printSchema
rates.show(5, false)

244
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)

+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2020-06-07 15:54:55.049|0    |
|2020-06-07 15:54:56.049|1    |
|2020-06-07 15:54:57.049|2    |
|2020-06-07 15:54:58.049|3    |
|2020-06-07 15:54:59.049|4    |
+-----------------------+-----+
only showing top 5 rows



rates = [timestamp: timestamp, value: bigint]


[timestamp: timestamp, value: bigint]

Параллельно внутри одного Spark приложения может работать несколько стримов:

In [57]:
val consoleSink = createConsoleSink(sdf)
val consoleSq = consoleSink.start

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+



consoleSink = org.apache.spark.sql.streaming.DataStreamWriter@499700e8
consoleSq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@15ff7ea9


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@15ff7ea9

-------------------------------------------
Batch: 1
-------------------------------------------
+---------------------+-----+
|timestamp            |value|
+---------------------+-----+
|2020-06-07 15:58:49.8|0    |
|2020-06-07 15:58:50.8|1    |
|2020-06-07 15:58:51.8|2    |
|2020-06-07 15:58:52.8|3    |
|2020-06-07 15:58:53.8|4    |
|2020-06-07 15:58:54.8|5    |
|2020-06-07 15:58:55.8|6    |
|2020-06-07 15:58:56.8|7    |
|2020-06-07 15:58:57.8|8    |
|2020-06-07 15:58:58.8|9    |
+---------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+---------------------+-----+
|timestamp            |value|
+---------------------+-----+
|2020-06-07 15:58:59.8|10   |
|2020-06-07 15:59:00.8|11   |
|2020-06-07 15:59:01.8|12   |
|2020-06-07 15:59:02.8|13   |
|2020-06-07 15:59:03.8|14   |
|2020-06-07 15:59:04.8|15   |
|2020-06-07 15:59:05.8|16   |
|2020-06-07 15:59:06.8|17   |
|2020-06-07 15:59:07.8|18   |
|2020-06-07 15:59:08.8|19

In [60]:
killAll

Напишем функцию, которая добавляет к нашей колонке случайный `ident` аэропорта из датасета [Airport Codes](https://datahub.io/core/airport-codes)  

In [61]:
val csvOptions = Map("header" -> "true", "inferSchema" -> "true")
val airports = spark.read.options(csvOptions).csv("datasets/airport-codes.csv")
airports.printSchema
airports.show(numRows = 1, truncate = 100, vertical = true)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                 

csvOptions = Map(header -> true, inferSchema -> true)
airports = [ident: string, type: string ... 10 more fields]


[ident: string, type: string ... 10 more fields]

In [76]:
val idents = airports.select('ident).limit(200).distinct.as[String].collect

idents = Array(00A, 00AA, 00AK, 00AL, 00AR, 00AS, 00AZ, 00CA, 00CL, 00CN, 00CO, 00FA, 00FD, 00FL, 00GA, 00GE, 00HI, 00ID, 00IG, 00II, 00IL, 00IN, 00IS, 00KS, 00KY, 00LA, 00LL, 00LS, 00MD, 00MI, 00MN, 00MO, 00MT, 00N, 00NC, 00NJ, 00NK, 00NY, 00OH, 00OI, 00OK, 00OR, 00PA, 00PN, 00PS, 00S, 00SC, 00SD, 00TA, 00TE, 00TN, 00TS, 00TX, 00UT, 00VA, 00VI, 00W, 00WA, 00WI, 00WN, 00WV, 00WY, 00XS, 01A, 01AK, 01AL, 01AR, 01AZ, 01C, 01CA, 01CL, 01CN, 01CO, 01CT, 01FA, 01FD, 01FL, 01GA, 01GE, 01IA, 01ID, 01II, 01IL, 01IN, 01IS, 01J, 01K, 01KS, 01KY, 01LA, 01LL, 01LS, 01MA, 01MD, 01ME, 01MI, 01MN, 01MO, 01MT, 01NC, 01NE, 01NH, 01NJ, 01NM, 01NV, 01NY, 01OI, 01OK, 01OR, 01PA, 01PN, 01PS, 01SC, 01TA, 01TE, 01TN, 01TS, 01TX, 01U, 01UT, 01VA, 01WA, 01WI, 01WN, 01WT, 01WY, 01XA, 01XS, 02AK, 02...


Array(00A, 00AA, 00AK, 00AL, 00AR, 00AS, 00AZ, 00CA, 00CL, 00CN, 00CO, 00FA, 00FD, 00FL, 00GA, 00GE, 00HI, 00ID, 00IG, 00II, 00IL, 00IN, 00IS, 00KS, 00KY, 00LA, 00LL, 00LS, 00MD, 00MI, 00MN, 00MO, 00MT, 00N, 00NC, 00NJ, 00NK, 00NY, 00OH, 00OI, 00OK, 00OR, 00PA, 00PN, 00PS, 00S, 00SC, 00SD, 00TA, 00TE, 00TN, 00TS, 00TX, 00UT, 00VA, 00VI, 00W, 00WA, 00WI, 00WN, 00WV, 00WY, 00XS, 01A, 01AK, 01AL, 01AR, 01AZ, 01C, 01CA, 01CL, 01CN, 01CO, 01CT, 01FA, 01FD, 01FL, 01GA, 01GE, 01IA, 01ID, 01II, 01IL, 01IN, 01IS, 01J, 01K, 01KS, 01KY, 01LA, 01LL, 01LS, 01MA, 01MD, 01ME, 01MI, 01MN, 01MO, 01MT, 01NC, 01NE, 01NH, 01NJ, 01NM, 01NV, 01NY, 01OI, 01OK, 01OR, 01PA, 01PN, 01PS, 01SC, 01TA, 01TE, 01TN, 01TS, 01TX, 01U, 01UT, 01VA, 01WA, 01WI, 01WN, 01WT, 01WY, 01XA, 01XS, 02AK, 02...

In [78]:
import org.apache.spark.sql.functions._
val identSdf = sdf.withColumn("ident", shuffle(array(idents.map(lit(_)):_*))(0))

identSdf = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [79]:
val identPqSink = createParquetSink(identSdf, "s2.parquet")
val identPqSq = identPqSink.start

identPqSink = org.apache.spark.sql.streaming.DataStreamWriter@75bdd6f9
identPqSq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@78fbaec6


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@78fbaec6

Проверим, что данные записываются в `parquet`

In [1]:
val identPq = spark.read.parquet("datasets/s2.parquet")
println(identPq.count)
identPq.printSchema
identPq.show(5, false)

267
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- ident: string (nullable = true)

+-----------------------+-----+-------+
|timestamp              |value|ident  |
+-----------------------+-----+-------+
|2020-06-07 20:31:54.019|2    |PL-0152|
|2020-06-07 20:31:56.019|4    |FR-0254|
|2020-06-07 20:32:00.019|8    |CZ-0107|
|2020-06-07 20:32:03.019|11   |KR-0256|
|2020-06-07 20:32:05.019|13   |KR-0306|
+-----------------------+-----+-------+
only showing top 5 rows



identPq = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

Временно остановим стрим, он понадобится нам для следующих экспериментов

In [97]:
killAll

Stopped FileStreamSource[file:/Users/t3nq/Projects/smz/de-spark-scala/datasets/s2.parquet]


### Выводы:
- `rate` - самый простой способ создать стрим для тестирования приложений
- стрим начинает работу после вызова метода `start` и не блокирует основной поток программы
- в одном Spark приложении может работать несколько стримов одновременно

## File Streaming
Spark позволяет запустить стрим, который будет "слушать" директорию и читать из нее новые файлы. При этом за раз будет прочитано количество файлов, установленное в параметре `maxFilesPerTrigger` [ссылка](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources). В этом кроется одна из основных проблем данного источника. Поскольку стрим, сконфигурированный под чтение небольших файлов, может "упасть", если в директорию начнут попадать файлы большого объема. Создадим стрим из директории `datasets/s2.parquet`:

In [85]:
val sdfFromParquet = spark
        .readStream
        .format("parquet")
        .option("maxFilesPerTrigger", "1")
        .option("path", "datasets/s2.parquet")
        .load

sdfFromParquet.printSchema

lastException = null


Name: java.lang.IllegalArgumentException
Message: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
StackTrace:   at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)

Поскольку в директорию могут попасть любые данные, а df должен иметь фиксированную схему, то Spark не позволяет нам создавать SDF на основе файлов без указания схемы.

In [98]:
import org.apache.spark.sql.functions._

val letters = List("a", "b", "c", "d", "e", "f", "g", "i")
val condition = letters.map { x => col("ident").startsWith(x) }.reduce { (x,y) => x or y }

val sdfFromParquet = spark
        .readStream
        .format("parquet")
        .schema(identPq.schema)
        .option("maxFilesPerTrigger", "10")
        .option("path", "datasets/s2.parquet")
        .load
        .withColumn("ident", lower('ident))

sdfFromParquet.printSchema

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- ident: string (nullable = true)



letters = List(a, b, c, d, e, f, g, i)
condition = (((((((startswith(ident, a) OR startswith(ident, b)) OR startswith(ident, c)) OR startswith(ident, d)) OR startswith(ident, e)) OR startswith(ident, f)) OR startswith(ident, g)) OR startswith(ident, i))
sdfFromParquet = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [99]:
val consoleSink = createConsoleSink(sdfFromParquet)
consoleSink.start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------------+-----+-------+
|timestamp              |value|ident  |
+-----------------------+-----+-------+
|2020-06-07 20:31:54.019|2    |pl-0152|
|2020-06-07 20:31:56.019|4    |fr-0254|
|2020-06-07 20:32:00.019|8    |cz-0107|
|2020-06-07 20:31:52.019|0    |sspl   |
|2020-06-07 20:31:53.019|1    |vijn   |
|2020-06-07 20:31:57.019|5    |la30   |
|2020-06-07 20:31:58.019|6    |wi13   |
|2020-06-07 20:31:59.019|7    |ku36   |
|2020-06-07 20:31:55.019|3    |k13    |
+-----------------------+-----+-------+



consoleSink = org.apache.spark.sql.streaming.DataStreamWriter@dd89654


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5d484ca5

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+-------+
|timestamp              |value|ident  |
+-----------------------+-----+-------+
|2020-06-07 20:32:03.019|11   |kr-0256|
|2020-06-07 20:32:05.019|13   |kr-0306|
|2020-06-07 20:32:07.019|15   |py-0041|
|2020-06-07 20:32:08.019|16   |us-0250|
|2020-06-07 20:32:01.019|9    |30cl   |
|2020-06-07 20:32:02.019|10   |29ll   |
|2020-06-07 20:32:04.019|12   |kh68   |
|2020-06-07 20:32:06.019|14   |zukj   |
|2020-06-07 20:32:09.019|17   |pn29   |
|2020-06-07 20:32:10.019|18   |sd99   |
+-----------------------+-----+-------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------------------+-----+-------+
|timestamp              |value|ident  |
+-----------------------+-----+-------+
|2020-06-07 20:32:17.019|25   |fr-0387|
|2020-06-07 20:32:18.019|26   |sk-0071|
|2020-06-07 20:32:14.019|22   |ly-mra |
|2020

In [100]:
killAll

Stopped FileStreamSource[file:/Users/t3nq/Projects/smz/de-spark-scala/datasets/s2.parquet]


File source позволяет со всеми типами файлов, с которыми умеет работать Spark: `parquet`, `orc`, `csv`, `json`, `text`.

### Выводы:
- Spark позволяет создавать SDF на базе всех поддерживаемых типов файлов
- При создании SDF вы должны указать схему данных
- File streaming имеет несколько серьезных недостатков:
  + Входной поток можно ограничить только макисмальным количество файлов, попадающих в батч
  + Если стрим упадает посередине файла, то при перезапуске эти данные будут обработаны еще раз

<img align="right" width="100" height="100" src="https://upload.wikimedia.org/wikipedia/commons/thumb/0/05/Apache_kafka.svg/1200px-Apache_kafka.svg.png">

## Kafka streaming

https://kafka.apache.org

**Apache Kafka** - самая распространенная в мире система, на основе которой строятся приложения для поточной обработки данных. Она имеет несколько преимуществ:
- высокая пропускная способность
- высокая доступность за счет распределенной архитектуры и репликации
- у каждого сообщения есть свой номер, который называется offset, что позволяет гранулярно сохранять состояние стрима

### Архитектура системы

#### Topic
Топик - это таблицы в Kafka. Мы пишем данные в топик и читаем данные из топика. Топик как правило распределен по нескольким узлам кластера для обеспечения высокой доступности и скорости работы с данными

<img align="center" width="500" height="500" src="https://kafka.apache.org/25/images/log_anatomy.png">

#### Partition
Партиции - это блоки, из которых состоят топики. Партиция представляет собой неделимый блок, который хранится на одном из узлов. Топик может иметь произвольное количество партиций. Чем больше партиций - тем выше параллелзим при чтении и записи, однако слишком большое число партиций в топике может привести к замедлению работы всей системы.

#### Replica
Каждая партиция имеет (может иметь) несколько реплик. Внешние приложения всегда работают (читают и пишут) с основной репликой. Остальные реплики являются дочерними и не используются во внешнем IO. Если узел, на котором расположена основная реплика, падает, то одна из дочерних реплик становится основной и работа с данными продолжается

#### Message
Сообщения - это данные, которые мы пишем и читаем в Kafka. Они представлены кортежем (Key, Value), но ключ может быть иметь значение `null` (используется не всегда). Сереализация и десереализация данных всегда происходит на уровне клиентов Kafka. Сама Kafka ничего о типах данных не знает и хранит ключи и значения в виде массива байт

#### Offset
Оффсет - это порядковый номер сообщения в партиции. Когда мы пишем сообщение (сообщение всегда пишется в одну из партиций топика), Kafka помещает его в топик с номер `n+1`, где `n` - номер последнего сообщения в этом топике

<img align="center" width="400" height="400" src="https://kafka.apache.org/25/images/log_consumer.png">

#### Producer
Producer - это приложение, которое пишет в топик. Producer'ов может быть много. Параллельная запись достигается за счет того, что каждое новое сообщение попадает в случайную партицию топика (если не указан `key`)

#### Consumer
Consumer - это приложение, читающее данные из топика. Consumer'ов может быть много, в этом случае они называются `consumer group`. Параллельное чтение достигается за счет распределения партиций топика между consumer'ами в рамках одной группы. Каждый consumer читает данные из "своих" партиций и ничего про другие не знает. Если consumer падает, то "его" партиции переходят другим consumer'ам.

#### Commit
Коммитом в Kafka называют сохранение информации о факте обработки сообщения с определенным оффсетом. Поскольку оффсеты для каждой партиции топика свои, то и информация о последнем обработанном оффсете хранится по каждой партиции отдельно. Обычные приложения пишут коммиты в специальный топик Kafka, который имеет название `__consumer_offsets`. Spark хранит обработанные оффсеты по каждому батчу в ФС (например, в HDFS).

#### Retention
Поскольку кластер Kafka не может хранить данные вечно, то в ее конфигурации задаются пороговые значение по **объему** и **времени хранения** для каждого топика, при превышении которых данные удаляются. Например, если у топика A установлен renention по времени 1 месяц, то данные будут хранится в системе не менее одного месяца (и затем будут удалены одной из внутренних подсистем)

### Spark connector
https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10  
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html  

### Запуск Kafka в docker
```shell
docker run --rm \
   -p 2181:2181 \
   --name=test_zoo \
   -e ZOOKEEPER_CLIENT_PORT=2181 \
   confluentinc/cp-zookeeper
```

```shell
docker run --rm \
    -p 9092:9092 \
    --name=test_kafka \
    -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka
```

### Работа с Kafka с помощь Static Dataframe

Spark позволяет работать с кафкой как с обычной базой данных. Запишем данные в топик `test_topic0`. Для этого нам необходимо подготовить DF, в котором будет две колонки:
- `value: String` - данные, которые мы хотим записать
- `topic: String` - топик, куда писать каждую строку DF

In [2]:
val identPq = spark.read.parquet("datasets/s2.parquet")
println(identPq.count)
identPq.printSchema
identPq.show(5, false)

267
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- ident: string (nullable = true)

+-----------------------+-----+-------+
|timestamp              |value|ident  |
+-----------------------+-----+-------+
|2020-06-07 20:31:54.019|2    |PL-0152|
|2020-06-07 20:31:56.019|4    |FR-0254|
|2020-06-07 20:32:00.019|8    |CZ-0107|
|2020-06-07 20:32:03.019|11   |KR-0256|
|2020-06-07 20:32:05.019|13   |KR-0306|
+-----------------------+-----+-------+
only showing top 5 rows



identPq = [timestamp: timestamp, value: bigint ... 1 more field]


[timestamp: timestamp, value: bigint ... 1 more field]

In [4]:
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

def writeKafka[T](topic: String, data: Dataset[T]): Unit = {
    val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "localhost:9092"
    )
    
    data.toJSON.withColumn("topic", lit(topic)).write.format("kafka").options(kafkaParams).save
}

writeKafka: [T](topic: String, data: org.apache.spark.sql.Dataset[T])Unit


In [5]:
writeKafka("test_topic0", identPq)

Прочитаем данные из Kafka:

In [7]:
val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "localhost:9092",
        "subscribe" -> "test_topic0"
    )


val df = spark.read.format("kafka").options(kafkaParams).load

df.printSchema
df.show

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 74 69 6D 6...|test_topic0|        0|     0|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|     1|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|     2|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|     3|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|     4|2020-06-10 18:08:

kafkaParams = Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> test_topic0)
df = [key: binary, value: binary ... 5 more fields]


[key: binary, value: binary ... 5 more fields]

Чтение из Kafka имеет несколько особенностей:
- по умолчанию читается все содержимое топика. Поскольку обычно в нем много данных, эта операция может создать большую нагрузку на кластер Kafka и Spark приложение
- колонки `value` и `key` имеют тип `binary`, который необходимо десереализовать

Чтобы прочитать только определенную часть топика, нам необходимо задать минимальный и максимальный оффсет для чтения с помощью параметров `startingOffsets` , `endingOffsets`. Возьмем два случайных события:

In [10]:
df.sample(0.1).limit(2).select('topic, 'partition, 'offset).show

+-----------+---------+------+
|      topic|partition|offset|
+-----------+---------+------+
|test_topic0|        0|    20|
|test_topic0|        0|    27|
+-----------+---------+------+



lastException: Throwable = null


На основании этих событий подготовим параметры `startingOffsets` и `endingOffsets`

In [11]:
val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "localhost:9092",
        "subscribe" -> "test_topic0",
        "startingOffsets" -> """ { "test_topic0": { "0": 20 } } """,
        "endingOffsets" -> """ { "test_topic0": { "0": 27 } }  """
    )


val df = spark.read.format("kafka").options(kafkaParams).load

df.printSchema
df.show(20)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 74 69 6D 6...|test_topic0|        0|    20|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|    21|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|    22|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|    23|2020-06-10 18:08:...|            0|
|null|[7B 22 74 69 6D 6...|test_topic0|        0|    24|2020-06-10 18:08:

kafkaParams = Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> test_topic0, startingOffsets -> " { "test_topic0": { "0": 20 } } ", endingOffsets -> " { "test_topic0": { "0": 27 } }  ")
df = [key: binary, value: binary ... 5 more fields]


[key: binary, value: binary ... 5 more fields]

По умолчанию параметр `startingOffsets` имеет значение `earliest`, а `endingOffsets` - `latest`. Поэтому, когда мы не указывали эти параметры, Spark прочитал содержимое всего топика

Чтобы получить наши данные, которые мы записали в топик, нам необходимо их десереализовать. В нашем случае достаточно использовать `.cast("string")`, однако это работает не всегда, т.к. формат данных может быть произвольным.

In [17]:
val jsonString = df.select('value.cast("string")).as[String]

jsonString.show(20, false)

val parsed = spark.read.json(jsonString)
parsed.printSchema
parsed.show(20, false)

+-------------------------------------------------------------------------+
|value                                                                    |
+-------------------------------------------------------------------------+
|{"timestamp":"2020-06-07T20:31:56.019+03:00","value":4,"ident":"FR-0254"}|
|{"timestamp":"2020-06-07T20:32:21.019+03:00","value":29,"ident":"KHHG"}  |
|{"timestamp":"2020-06-07T20:36:02.019+03:00","value":250,"ident":"02MO"} |
|{"timestamp":"2020-06-07T20:35:06.019+03:00","value":194,"ident":"00WY"} |
|{"timestamp":"2020-06-07T20:35:23.019+03:00","value":211,"ident":"02IS"} |
|{"timestamp":"2020-06-07T20:35:42.019+03:00","value":230,"ident":"00GE"} |
|{"timestamp":"2020-06-07T20:34:14.019+03:00","value":142,"ident":"01AK"} |
+-------------------------------------------------------------------------+

root
 |-- ident: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- value: long (nullable = true)

+-------+-----------------------------+-----

jsonString = [value: string]
parsed = [ident: string, timestamp: string ... 1 more field]


[ident: string, timestamp: string ... 1 more field]

### Работа с Kafka с помощью Streaming DF
При создании SDF из Kafka необходимо помнить, что:
- `startingOffsets` по умолчанию имеет значение `latest`
- `endingOffsets` использовать нельзя
- количество сообщений за батч можно (и нужно) ограничить параметром `maxOffsetPerTrigger` (по умолчанию он не задан и первый батч будет содержать данные всего топика

In [26]:
val kafkaParams = Map(
        "kafka.bootstrap.servers" -> "localhost:9092",
        "subscribe" -> "test_topic0",
        "startingOffsets" -> """earliest""",
        "maxOffsetsPerTrigger" -> "5"
    )

val sdf = spark.readStream.format("kafka").options(kafkaParams).load
val parsedSdf = sdf.select('value.cast("string"), 'topic, 'partition, 'offset)

val sink = createConsoleSink(parsedSdf)

val sq = sink.start

kafkaParams = Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> test_topic0, startingOffsets -> earliest, maxOffsetsPerTrigger -> 5)
sdf = [key: binary, value: binary ... 5 more fields]
parsedSdf = [value: string, topic: string ... 2 more fields]
sink = org.apache.spark.sql.streaming.DataStreamWriter@33bf26c8
sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@65595976


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@65595976

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------------------------------------------------------------+-----------+---------+------+
|value                                                                    |topic      |partition|offset|
+-------------------------------------------------------------------------+-----------+---------+------+
|{"timestamp":"2020-06-07T20:32:48.019+03:00","value":56,"ident":"SJWT"}  |test_topic0|0        |0     |
|{"timestamp":"2020-06-07T20:35:05.019+03:00","value":193,"ident":"02FA"} |test_topic0|0        |1     |
|{"timestamp":"2020-06-07T20:33:37.019+03:00","value":105,"ident":"CJB7"} |test_topic0|0        |2     |
|{"timestamp":"2020-06-07T20:32:20.019+03:00","value":28,"ident":"KDEW"}  |test_topic0|0        |3     |
|{"timestamp":"2020-06-07T20:31:54.019+03:00","value":2,"ident":"PL-0152"}|test_topic0|0        |4     |
+--------------------------------------------------------------

Если мы перезапустим этот стрим, он повторно прочитает все данные. Чтобы обеспечить сохранение состояния стрима после обработки каждого батча, нам необходимо добавить параметр `checkpointLocation` в опции `writeStream`:

In [28]:
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame

def createConsoleSinkWithCheckpoint(chkName: String, df: DataFrame) = {
    df
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("checkpointLocation", s"chk/$chkName")
    .option("truncate", "false")
    .option("numRows", "20")
}

createConsoleSinkWithCheckpoint: (chkName: String, df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]


In [33]:
val sink = createConsoleSinkWithCheckpoint("test0", parsedSdf)
val sq = sink.start

sink = org.apache.spark.sql.streaming.DataStreamWriter@134d7092
sq = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@63bc271f


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@63bc271f

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------------------------------------------------------------+-----------+---------+------+
|value                                                                    |topic      |partition|offset|
+-------------------------------------------------------------------------+-----------+---------+------+
|{"timestamp":"2020-06-07T20:31:56.019+03:00","value":4,"ident":"FR-0254"}|test_topic0|0        |20    |
|{"timestamp":"2020-06-07T20:32:21.019+03:00","value":29,"ident":"KHHG"}  |test_topic0|0        |21    |
|{"timestamp":"2020-06-07T20:36:02.019+03:00","value":250,"ident":"02MO"} |test_topic0|0        |22    |
|{"timestamp":"2020-06-07T20:35:06.019+03:00","value":194,"ident":"00WY"} |test_topic0|0        |23    |
|{"timestamp":"2020-06-07T20:35:23.019+03:00","value":211,"ident":"02IS"} |test_topic0|0        |24    |
+--------------------------------------------------------------

In [34]:
killAll

Stopped KafkaV2[Subscribe[test_topic0]]


### Выводы:
- Apache Kafka - распределенная система, обеспечивающая передачу потока данных в слабосвязанных системах
- Работать с Kafka можно как с использованием Static DF, так и с помощью Streaming DF
- Чтобы стрим запоминал свое состояние после остановки, необходимо использовать checkpoint - директорию на HDFS (или локальной ФС), в которую будет сохранятся состояние стрима после каждого батча

В конце работы не забудьте остановить Spark:

In [None]:
spark.stop