# 本セクションの目次

# kafka / Spark Streaming / Avro
kafka / Spark Streaming / Avro はストリーミングシステムを作成する上での3種の神器です。
kafka と Spark Stramingについては前セクションで紹介しましたが、今回はそれに加えてAvroフォーマットについて学んでいきましょう

## Avro フォーマットとは？

もう一つがHadoopの生みの親であるDoug Cutting氏によりプロジェクト化されたAvro（アブロ）フォーマットです【URL】https://avro.apache.org。Avroフォーマットはおもにストリーミングでのやり取りで効力を発揮するフォーマットです余談ですが、同様のしくみとしてプロトコルバッファー（Protocol Buffers）は有名です。　【URL】https://developers.google.com/protocol-buffers/。 　元々AvroはHadoopの弱点であったJavaでしか読み書きできないという言語のポータビリティを解決するために生まれました言語のポータビリティーが低いということはそのままAvroファイルと連携する対向のシステムの利用言語まで縛ってしまう可能性があります。。 　Avroフォーマットの特徴は以下です。 ＠＠＠＠＠半行空き ・行指向フォーマット ・前方互換性と、後方互換性、完全互換を持ち複数のシステム間で速度の違う開発を行うことが可能 ・スキーマエボリューションを提供する ・Parquetに比べてJSONのようなリッチなフォーマットを表現可能

## スキーマファイルを保存する場所
スキーマファイルを保存する場所をスキーマレジストリと呼んだりします。
スキーマレジストリとは、共有してファイルを読み取れる場所である必要があります。

有名な企業としてConfulentが存在しますが、ただし今回はschemaのディレクトリをスキーマレジストリとします。


# Avroにおける前方互換、後方互換、完全互換

さまざまな言語で利用可能になった今、さらに注目されているAvroの特徴が開発スピードの違いを吸収することができるということです。一般にデータ基盤が相手をするシステムはスモールデータシステム含め社内のシステムすべてです。そのシステム群の開発のスピードを合わせようと思ったら組織が大きくなるにつれて調整のコストが増大し調整自体が不可能に近くなります。 　そこで、後方互換性や前方互換性というしくみが活躍します。後方互換性とは、新しい製品が、古い製品を扱えることを指します。前方互換性とは、古い製品が新しい製品を扱えることを指します。たとえば、Excel2010がExcel2003を扱えるようにすることを後方互換。Excel2003がExcel2010を扱えるようにすることを前方互換。ということを指します。 　後方互換や前方互換の機能を利用することによって、一方のシステムへ変更があったときでも、ほかのシステムの稼働を維持しつつ自システムの変更を行うことができるのです。このようなしくみを提供することをスキーマエボリューション（schema evolution）といいますParquetフォーマットはスキーマエボリューションの機能を有していません。なぜならば一方のシステムで変更を加えた場合、もう一方のシステムにも同時に変更を加えないとならないからです。。 　また、Avroフォーマットのもう一つの大きな特徴は、リッチなスキーマを表現できるという点にあります。キーバリューで表現できるmapやenumも表現可能でフォーマットの機能としてバリデーションも行ってくれます。 　Avroフォーマットにおけるスキーマ定義は以下のような形をしています。この定義に従ってデータ部分をシリアライズしたり、デシリアライズすることでデータを操作していきます。

# KafkaとAvroを連携してみよう
1. python(send avro with schema_ver1.avsc) -> kafka <- spark streaming(read avro with schema_ver1.avsc) -> memory
2. python(send avro with schema_ver1.avsc) -> kafka <- spark streaming(read avro with schema_ver1.avsc) -> parquet


1.の場合は今回はスキーマファイル(schema_ver1.avsc)を使ってシリアライズを実行しkafkaに転送します。  
kafkaに到着したデータはSpark Streamingによって弟子リアライズされよみだされ、コンソールに出力されます  

2.の場合は最後の出力がparquetに変換され出力されています。

In [1]:
# コンソールで設定したSparkとNoteBookを接続します(動かす前に毎度実行する必要があります)
import findspark
findspark.init("/home/pyspark/spark")

# スキーマファイルの確認(schema_ver1.avsc)

まず今回のレクチャーで利用するAvroのシリアライズ/デシリアライズ用のスキーマです。  
送信したデータ(name/action/sendtime)がkafkaのvalueへ格納されることになります。

```
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "action", "type": ["string", "null"]},
    {"name": "sendtime", "type": ["int", "null"]}
  ]
}

```

In [17]:
from confluent_kafka import Producer
import avro.schema
import avro.io
import io
import random

avro_json_schema = open("/home/pyspark/pyspark_streaming/schema/schema_ver1.avsc", "r").read()

In [4]:
# コンソールで設定したSparkとNoteBookを接続します(動かす前に毎度実行する必要があります)
import findspark
findspark.init("/home/pyspark/spark")

In [5]:
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder \
    .appName("chapter1") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .config("spark.jars.packages", "org.apache.spark:spark-streaming_2.13:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-avro_2.12:3.2.0") \
    .enableHiveSupport() \
    .getOrCreate()

# パッケージを複数渡したい時は「,」で繋いで渡します。
# Sparkのバージョンにしっかりと合わせます(今回はSparkのバージョンが3.2を使っています。)。



:: loading settings :: url = jar:file:/home/pyspark/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/pyspark/.ivy2/cache
The jars for the packages stored in: /home/pyspark/.ivy2/jars
org.apache.spark#spark-streaming_2.13 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b1a0d75e-bf79-459c-a70a-f302b8d52d93;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in ce

In [15]:
# PySparkをKafkaと接続します(kafka <- spark streaming(read avro))
# ストリームの経路を作成行います。

# kafkaからデータを読み取る設定を行います。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "pyspark-topic1") \
  .load()

In [18]:
# spark streaming(read avro) -> console

console_stream_check = df \
  .select(from_avro("value", avro_json_schema).alias("json_col")) \
  .writeStream \
  .trigger(processingTime="5 seconds") \
  .format("console") \
  .option("checkpointLocation", "/tmp/kafka/console_check/") \
  .start()

21/12/09 09:31:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/12/09 09:31:52 ERROR MicroBatchExecution: Query [id = 40ba5078-fb5b-4b44-97ee-df34058c6428, runId = b8427a47-e8c2-473e-8791-2786639ba783] terminated with error
java.lang.IllegalStateException: Set(pyspark-topic2-0) are gone. Some data may have been missed.. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
    
	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.reportDataLoss(KafkaMicroBatchStream.scala:296)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$planInputPartitions$1(KafkaMicroBatchStream.scala:183)
	at org.apache.spark.sql.kafka010.KafkaMicroBatch

In [50]:
# spark streaming(read avro) -> parquet
from pyspark.sql.avro.functions import from_avro, to_avro

kafka_parquet = df \
  .select(from_avro("value", avro_json_schema).alias("json_col")) \
  .writeStream \
  .format("parquet") \
  .option("path", "/tmp/avro_parquet/") \
  .outputMode("append") \
  .trigger(processingTime="5 seconds") \
  .option("checkpointLocation", "/tmp/kafka/avro_parquet/") \
  .start()

21/12/09 09:53:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [111]:
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql import functions as F

# spark streaming(read avro) -> avro
kafka_avro = df.repartition(1) \
  .select(from_avro("value", avro_json_schema).alias('avro')) \
  .writeStream \
  .format("avro") \
  .option("path", "/tmp/avro_file/") \
  .outputMode("append") \
  .trigger(processingTime="5 seconds") \
  .option("checkpointLocation", "/tmp/kafka/avro_kafka/") \
  .start()

21/12/09 13:02:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [60]:
memory = df \
  .select(from_avro("value", avro_json_schema).alias("json_col")) \
  .writeStream \
  .format("memory") \
  .queryName("aggregates8") \
  .start()

21/12/09 10:01:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4a4b02a1-6786-476b-85f8-cbe7d3ae9c3c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/12/09 10:01:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [110]:
kafka_avro.stop()

In [114]:
# 送信するpyspark 

conf = {'bootstrap.servers': 'kafka:9092'}
producer = Producer(**conf)

# Kafka topic
topic = "pyspark-topic1"

# Path to user.avsc avro schema
schema_path = "/home/pyspark/pyspark_streaming/schema/schema_ver1.avsc"
schema = avro.schema.parse(open(schema_path).read())

for i in range(1):
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    # データの送信
    writer.write({"id": "yuki",
                    "type": "login2",
                    "sendtime": random.randint(0, 10)}, encoder)
    raw_bytes = bytes_writer.getvalue()
    producer.produce(topic, raw_bytes)
producer.flush()

0

In [89]:
spark.sql("select json_col.id from aggregates8").show()

+----+
|  id|
+----+
|yuki|
|yuki|
|yuki|
|yuki|
|yuki|
+----+



In [None]:
console_stream_check.stop()
kafka_parquet.stop()

# ファイルがスモール？
プロセシング時間の延長
repartitonの付与

# Avroファイルの読み書き
今回はストリーミングですが、一応Sparkでの読み込みも少しだけおさらいしておきましょう

Avroで出力するビッグデータとしての別のメリットはスプリッタブルである（データを分割して複数のサーバーに分けて処理を行うことが可能）という点です。

Avroに似ているフォーマットとしてJsonがありますが、こちらはスプリッタブルではなく（厳密には圧縮形式によります）データを分割して複数サーバーで処理するということに不向きです。

そのため「Jsonでいいや〜」と思わずに是非Avroでの実装を考えてみてください。

In [115]:
# Avro でのデータの読み込み
avro_df = spark.read.format("avro").load("/tmp/avro_file/")
avro_df.printSchema()
avro_df.show()

Py4JJavaError: An error occurred while calling o627.load.
: java.lang.IllegalStateException: /tmp/avro_file/_spark_metadata/9.compact doesn't exist (latestId: 18, compactInterval: 10)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$allFiles$4(CompactibleFileStreamLog.scala:267)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$allFiles$2(CompactibleFileStreamLog.scala:265)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$allFiles$2$adapted(CompactibleFileStreamLog.scala:263)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:75)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.allFiles(CompactibleFileStreamLog.scala:263)
	at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
	at jdk.internal.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
# Avroでのデータ書き込み
avro_df.select("name").write.format("avro").save("/tmp/avro_etl/")

In [None]:
! ls -al /tmp/avro_etl/

In [53]:
spark.read.parquet("/tmp/avro_parquet/").show()

+------------------+
|          json_col|
+------------------+
|{yuki, login2, 10}|
+------------------+



# Avroで後方互換をやってみよう
Avroの特徴の一つである後方互換をやってみましょう。

今回の後方互換のシナリオとしては、以下を想像してみてください。  
各家庭に冷蔵庫が配置されている(IoT機器、パブリッシャー)。  
そのIoT機器からは絶え間なくデータが流れている(ストリーミング)  
ストリーミングデータの受け口はKafkaを利用し、Spark Streamingでデータを読み込んでいる(コンシューマー)。

今回、IoTのソフトウェアがアップデートし温度(temp)も取得することが可能になったのでスキーマのアップデートを行いたい。  
しかし、各家庭に配置されている冷蔵庫のソフトウェアのアップデートを行うことは不可能である。

こんな時に活躍するのがAvroの後方互換です。

今回の後方互換はコンシューマー(Spark Streaming)が古いスキーマバージョンでシリアライズされたデータを新しいスキーマバージョンで読み取ることができるようにしたものです。  
この機能を使うことによって、システム側は早々にスキーマバージョンをアップデートし、ゆっくりと各家庭に配置されたソフトウェアが更新されるのを待てば良いことになります。

後方互換を行うために、shcema_ver2.avscというファイルを利用します。

```
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "type", "type": ["string", "null"]},
    {"name": "sendtime", "type": ["int", "null"]}
    {
      "name": "temp",
      "type": ["null", "string"],
      "default": "1"
    }
  ]
}

```

古いスキーマバージョンからの送信であればnullにするという命令を後方互換として設定します。

In [None]:
# Path to user.avsc avro schema
schema_path_ver1 = "/home/pyspark/pyspark_streaming/schema/schema_ver1.avsc"
schema_ver1 = avro.schema.parse(open(schema_path_ver1).read())

schema_path_ver2 = "/home/pyspark/pyspark_streaming/schema/schema_ver2.avsc"
schema_ver2 = avro.schema.parse(open(schema_path_ver2).read())

In [None]:
# スキーマバージョン1で送信するpyspark 

conf = {'bootstrap.servers': 'kafka:9092'}
producer = Producer(**conf)

# Kafka topic
topic = "pyspark-topic1"

for i in range(1):
    writer = avro.io.DatumWriter(schema_ver1)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    # データの送信
    writer.write({"name": "yuki_schema_ver1",
                    "action": "login2",
                    "sendtime": random.randint(0, 10)}, encoder)
    raw_bytes = bytes_writer.getvalue()
    producer.produce(topic, raw_bytes)
producer.flush()

In [None]:
# spark streaming(read avro) -> console

console_stream_check = df \
  .select(from_avro("value", schema_ver2).alias("json_col")) \
  .writeStream \
  .trigger(processingTime="5 seconds") \
  .format("console") \
  .option("checkpointLocation", "/tmp/kafka/console_check/") \
  .start()

In [None]:
# spark streaming(read avro) -> parquet
from pyspark.sql.functions import from_json, col

kafka_parquet = df \
  .select(from_avro("value", schema_ver2).alias("json_col")) \
  .writeStream \
  .format("parquet") \
  .option("path", "/tmp/avro/") \
  .outputMode("append") \
  .trigger(processingTime="5 seconds") \
  .option("checkpointLocation", "/tmp/kafka/avro/") \
  .start()

In [None]:
# 読み込みして確認してみる
avro_df = spark.read.format("avro").load("/tmp/avro/")
avro_df.show(truncate=False)

In [None]:
kafka_parquet.stop()
console_stream_check.stop()

In [None]:
spark.stop()
sparkCo