Пример Spark Structured Streaming + Kafka
=========================================

На основе [Structured Streaming + Kafka Integration Guide][kafka-streaming].

- [Structured Streaming Programming Guide][guide]
- [Spark Structured Streaming | Databricks][databricks-2016]
- [Spark Structured Streaming – Overview | Kaizen][kaizen]

[kafka-streaming]: https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html
[guide]: https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html
[databricks-2016]: https://www.databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
[kaizen]: https://kaizen.itversity.com/courses/hdp-certified-spark-developer-hdpcsd-scala/lessons/apache-spark-2-streaming/topic/hdpcsd-spark-structured-streaming-overview-scala/

## Avro-схемы данных

In [None]:
val ismdetobsScheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointIsmdetobs",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "NavigationSystem",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.NavigationSystem",
    |          "type": "enum",
    |          "symbols": [
    |            "GPS",
    |            "GLONASS",
    |            "SBAS",
    |            "Galileo",
    |            "BeiDou",
    |            "QZSS",
    |            "Reserved",
    |            "Other"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "SignalType",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.SignalType",
    |          "type": "enum",
    |          "symbols": [
    |            "Unknown",
    |            "L1CA",
    |            "L2C",
    |            "L2CA",
    |            "L2P",
    |            "L2P_codeless",
    |            "L2Y",
    |            "L5Q"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "Satellite",
    |        "type": "string"
    |      },
    |      {
    |        "name": "Prn",
    |        "type": "int"
    |      },
    |      {
    |        "name": "GloFreq",
    |        "type": "int"
    |      },
    |      {
    |        "name": "Power",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

val ismrawtecScheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointIsmrawtec",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "NavigationSystem",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.NavigationSystem",
    |          "type": "enum",
    |          "symbols": [
    |            "GPS",
    |            "GLONASS",
    |            "SBAS",
    |            "Galileo",
    |            "BeiDou",
    |            "QZSS",
    |            "Reserved",
    |            "Other"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "Satellite",
    |        "type": "string"
    |      },
    |      {
    |        "name": "Prn",
    |        "type": "int"
    |      },
    |      {
    |        "name": "GloFreq",
    |        "type": "int"
    |      },
    |      {
    |        "name": "PrimarySignal",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.SignalType",
    |          "type": "enum",
    |          "symbols": [
    |            "Unknown",
    |            "L1CA",
    |            "L2C",
    |            "L2CA",
    |            "L2P",
    |            "L2P_codeless",
    |            "L2Y",
    |            "L5Q"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "SecondarySignal",
    |        "type": "NovAtelLogReader.LogData.SignalType"
    |      },
    |      {
    |        "name": "Tec",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

val ismredobsScheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointIsmredobs",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "NavigationSystem",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.NavigationSystem",
    |          "type": "enum",
    |          "symbols": [
    |            "GPS",
    |            "GLONASS",
    |            "SBAS",
    |            "Galileo",
    |            "BeiDou",
    |            "QZSS",
    |            "Reserved",
    |            "Other"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "SignalType",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.SignalType",
    |          "type": "enum",
    |          "symbols": [
    |            "Unknown",
    |            "L1CA",
    |            "L2C",
    |            "L2CA",
    |            "L2P",
    |            "L2P_codeless",
    |            "L2Y",
    |            "L5Q"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "Satellite",
    |        "type": "string"
    |      },
    |      {
    |        "name": "Prn",
    |        "type": "int"
    |      },
    |      {
    |        "name": "GloFreq",
    |        "type": "int"
    |      },
    |      {
    |        "name": "AverageCmc",
    |        "type": "double"
    |      },
    |      {
    |        "name": "CmcStdDev",
    |        "type": "double"
    |      },
    |      {
    |        "name": "TotalS4",
    |        "type": "double"
    |      },
    |      {
    |        "name": "CorrS4",
    |        "type": "double"
    |      },
    |      {
    |        "name": "PhaseSigma1Second",
    |        "type": "double"
    |      },
    |      {
    |        "name": "PhaseSigma30Second",
    |        "type": "double"
    |      },
    |      {
    |        "name": "PhaseSigma60Second",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

val psrposScheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointPsrpos",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "Lat",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Lon",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Hgt",
    |        "type": "double"
    |      },
    |      {
    |        "name": "LatStdDev",
    |        "type": "double"
    |      },
    |      {
    |        "name": "LonStdDev",
    |        "type": "double"
    |      },
    |      {
    |        "name": "HgtStdDev",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

val rangeScheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointRange",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "NavigationSystem",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.NavigationSystem",
    |          "type": "enum",
    |          "symbols": [
    |            "GPS",
    |            "GLONASS",
    |            "SBAS",
    |            "Galileo",
    |            "BeiDou",
    |            "QZSS",
    |            "Reserved",
    |            "Other"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "SignalType",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.SignalType",
    |          "type": "enum",
    |          "symbols": [
    |            "Unknown",
    |            "L1CA",
    |            "L2C",
    |            "L2CA",
    |            "L2P",
    |            "L2P_codeless",
    |            "L2Y",
    |            "L5Q"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "Satellite",
    |        "type": "string"
    |      },
    |      {
    |        "name": "Prn",
    |        "type": "int"
    |      },
    |      {
    |        "name": "GloFreq",
    |        "type": "int"
    |      },
    |      {
    |        "name": "Psr",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Adr",
    |        "type": "double"
    |      },
    |      {
    |        "name": "CNo",
    |        "type": "double"
    |      },
    |      {
    |        "name": "LockTime",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Power",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

val satvisScheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointSatvis",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "NavigationSystem",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.NavigationSystem",
    |          "type": "enum",
    |          "symbols": [
    |            "GPS",
    |            "GLONASS",
    |            "SBAS",
    |            "Galileo",
    |            "BeiDou",
    |            "QZSS",
    |            "Reserved",
    |            "Other"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "Satellite",
    |        "type": "string"
    |      },
    |      {
    |        "name": "Prn",
    |        "type": "int"
    |      },
    |      {
    |        "name": "GloFreq",
    |        "type": "int"
    |      },
    |      {
    |        "name": "SatVis",
    |        "type": "boolean"
    |      },
    |      {
    |        "name": "Health",
    |        "type": "long"
    |      },
    |      {
    |        "name": "Elev",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Az",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

val satxyz2Scheme = """
    |{
    |  "type": "array",
    |  "items": {
    |    "name": "NovAtelLogReader.DataPoints.DataPointSatxyz2",
    |    "type": "record",
    |    "fields": [
    |      {
    |        "name": "Timestamp",
    |        "type": "long"
    |      },
    |      {
    |        "name": "NavigationSystem",
    |        "type": {
    |          "name": "NovAtelLogReader.LogData.NavigationSystem",
    |          "type": "enum",
    |          "symbols": [
    |            "GPS",
    |            "GLONASS",
    |            "SBAS",
    |            "Galileo",
    |            "BeiDou",
    |            "QZSS",
    |            "Reserved",
    |            "Other"
    |          ]
    |        }
    |      },
    |      {
    |        "name": "Satellite",
    |        "type": "string"
    |      },
    |      {
    |        "name": "Prn",
    |        "type": "int"
    |      },
    |      {
    |        "name": "X",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Y",
    |        "type": "double"
    |      },
    |      {
    |        "name": "Z",
    |        "type": "double"
    |      }
    |    ]
    |  }
    |}""".stripMargin

## Запрос к Kafka использование Structured Streaming

### Импорты

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`org.apache.spark::spark-sql-kafka-0-10:2.4.0`
import $ivy.`org.apache.spark::spark-avro:2.4.0`

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.avro._

### Первоначальная настройка

In [None]:
val conf = new SparkConf().setAppName("GNSS Stream Receiver")
conf.setMaster("local[*]")

In [None]:
val spark = SparkSession.builder.config(conf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._

### Общие функции

Функция для создания потока из Kafka:

In [None]:
import java.util.UUID

val clientUID = s"${UUID.randomUUID}"

// Использование Docker DNS позволяет обращаться к контейнерам по именам внутри одной
// Docker-сети. Поэтому в `bootstrap.servers` прописано имя контейнера:
val kafkaServerAddress = "kafka:9092"

def createKafkaStream(topic: String) = {
  spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaServerAddress)
    .option("enable.auto.commit", (false: java.lang.Boolean))
    .option("auto.offset.reset", "latest")
    .option("group.id", s"gnss-stream-receiver-${clientUID}-${topic}")
    .option("subscribe", topic)
}

Функция для отладки при помощи специальной таблицы движка в Catalyst, хранимой в памяти.  
Такой подход используется только при отладке, потому что содержимое таких временный таблиц не ощичается и в итоге приводит к исчерпанию всей RAM!

In [None]:
def memTableSink(stream: DataFrame, tableName: String) = {
  stream
    .writeStream
    .outputMode("update")
    .format("memory")
    .queryName(tableName)
}

### Получение сообщений из Kafka

Теперь создадим потоки:

In [None]:
val ismdetobsStream = createKafkaStream("datapoint-raw-ismdetobs").load()
val ismrawtecStream = createKafkaStream("datapoint-raw-ismrawtec").load()
val ismredobsStream = createKafkaStream("datapoint-raw-ismredobs").load()
val psrposStream    = createKafkaStream("datapoint-raw-psrpos").load()
val rangeStream     = createKafkaStream("datapoint-raw-range").load()
val satvisStream    = createKafkaStream("datapoint-raw-satvis").load()
val satxyz2Stream   = createKafkaStream("datapoint-raw-satxyz2").load()

Получим срез сообщений из Kafka, отправляемых NovAtelLogReader:

In [None]:
val ismdetobsMem = memTableSink(ismdetobsStream, "datapoint_raw_ismdetobs").start()
val ismrawtecMem = memTableSink(ismrawtecStream, "datapoint_raw_ismrawtec").start()
val ismredobsMem = memTableSink(ismredobsStream, "datapoint_raw_ismredobs").start()
val psrposMem    = memTableSink(psrposStream,    "datapoint_raw_psrpos").start()
val rangeMem     = memTableSink(rangeStream,     "datapoint_raw_range").start()
val satvisMem    = memTableSink(satvisStream,    "datapoint_raw_satvis").start()
val satxyz2Mem   = memTableSink(satxyz2Stream,   "datapoint_raw_satxyz2").start()

ismdetobsMem.recentProgress
ismrawtecMem.recentProgress
ismredobsMem.recentProgress
psrposMem.recentProgress
rangeMem.recentProgress
satvisMem.recentProgress
satxyz2Mem.recentProgress

### Десериализация сообщений

Сообщения NovAtelLogReader сериализованы при помощи формата Avro. Для их обработки нужно провести десериализацию.

- [Документация на библиотеку][docs].
- [Read and write streaming Avro data | Databricks][databricks].

[docs]: https://spark.apache.org/docs/2.4.0/sql-data-sources-avro.html
[databricks]: https://docs.databricks.com/structured-streaming/avro-dataframe.html

Пример разложения десериализованного сообщения на составляющие поля:

In [None]:
val ismdetobsDeser =
      ismdetobsStream
        .select(from_avro($"value", ismdetobsScheme).as("array"), $"timestamp")
        .withColumn("point", explode($"array"))
val ismrawtecDeser =
      ismrawtecStream
        .select(from_avro($"value", ismrawtecScheme).as("array"), $"timestamp")
        .withColumn("point", explode($"array"))
val ismredobsDeser =
      ismredobsStream
        .select(from_avro($"value", ismredobsScheme).as("array"), $"timestamp")
        .withColumn("point", explode($"array"))
val psrposDeser =
      psrposStream
        .select(from_avro($"value", psrposScheme).as("array"), $"timestamp")
        .withColumn("point", explode($"array"))
val rangeDeser =
      rangeStream
        .select(from_avro($"value", rangeScheme).as("array"))
        .withColumn("point", explode($"array"))
        .select(
          $"point.Timestamp".as("time"),
          $"point.NavigationSystem".as("system"),
          $"point.SignalType".as("freq"),
          $"point.Satellite".as("sat"),
          $"point.Prn".as("prn"),
          $"point.GloFreq".as("glofreq"),
          $"point.Psr".as("psr"),
          $"point.Adr".as("adr"),
          $"point.CNo".as("cno"),
          $"point.LockTime".as("locktime"))
val satvisDeser =
      satvisStream
        .select(from_avro($"value", satvisScheme).as("array"), $"timestamp")
        .withColumn("point", explode($"array"))
val satxyz2Deser =
      satxyz2Stream
        .select(from_avro($"value", satxyz2Scheme).as("array"))
        .withColumn("point", explode($"array"))
        .select(
          $"point.Timestamp".as("time"),
          $"point.X".as("X"),
          $"point.Y".as("Y"),
          $"point.Z".as("Z"),
          $"point.Satellite".as("sat"),
          $"point.NavigationSystem".as("system"),
          $"point.Prn".as("prn"))

memTableSink(ismdetobsDeser, "ismdetobsdeser").start()
memTableSink(ismrawtecDeser, "ismrawtecdeser").start()
memTableSink(ismredobsDeser, "ismredobsdeser").start()
memTableSink(psrposDeser,    "psrposdeser").start()
memTableSink(rangeDeser,     "rangedeser").start()
memTableSink(satvisDeser,    "satvisdeser").start()
memTableSink(satxyz2Deser,   "satxyz2deser").start()

Все срезы можно наблюдать далее:

In [None]:
spark.sql("SHOW tables").show(50, false)

In [None]:
spark.sql("SELECT * FROM datapoint_raw_ismdetobs ORDER BY timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM datapoint_raw_ismrawtec ORDER BY timestamp DESC").show()
spark.sql("SELECT * FROM datapoint_raw_ismredobs ORDER BY timestamp DESC").show()
spark.sql("SELECT * FROM datapoint_raw_psrpos ORDER BY timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM datapoint_raw_range ORDER BY timestamp DESC").show()
spark.sql("SELECT * FROM datapoint_raw_satvis ORDER BY timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM datapoint_raw_satxyz2 ORDER BY timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM ismdetobsdeser ORDER BY Timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM ismrawtecdeser ORDER BY timestamp DESC").show()
spark.sql("SELECT * FROM ismredobsdeser ORDER BY timestamp DESC").show()
spark.sql("SELECT * FROM psrposdeser ORDER BY timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM rangedeser ORDER BY time DESC").show()
spark.sql("SELECT * FROM satvisdeser ORDER BY timestamp DESC").show()

In [None]:
spark.sql("SELECT * FROM satxyz2deser ORDER BY time DESC").show()