# 構造化ストリーミングを使用した Azure Cosmos DB コレクションへのストリーミング取り込み

このノートでは、あなたは、

1. レートストリーミングソースを使用したストリーミングデータ生成のシミュレーションします
2. IoTSignals スキーマに従ってストリームデータフレームをフォーマットする
3. ストリーミングデータフレームを Azure Cosmos DB コレクションに書き込みます

>**ご存知ですか？** Azure Cosmos DB は、IoT の予測メンテナンスと異常検出のユースケースに最適です。Azure Cosmos DB 用の Azure Synapse Link の HTAP 機能を活用した IoT アーキテクチャの詳細については、[こちらをクリック](https://docs.microsoft.com/ja-jp/azure/cosmos-db/synapse-link-use-cases)してください。

>**ご存知ですか？** [Azure Cosmos DB 用 Azure Synapse Link](https://docs.microsoft.com/ja-jp/azure/cosmos-db/synapse-link) は、Azure Cosmos DBの運用データに対してほぼリアルタイムの分析を実行できるハイブリッドトランザクション分析処理 (HTAP) 機能です。
&nbsp;

>**ご存知ですか？** [Azure Cosmos DB 分析ストア](https://docs.microsoft.com/ja-jp/azure/cosmos-db/analytical-store-introduction)は、トランザクションワークロードに影響を与えることなく、Azure Cosmos DB の運用データに対して大規模な分析を可能にする完全に分離された列ストアです。
&nbsp;

### 1. レートストリーミングソースを使用したストリーミングデータ生成をシミュレーションする
* レートストリーミングソースは、ここでのソリューションを簡略化するために使用され、[Azure Event Hubs](https://azure.microsoft.com/ja-jp/services/event-hubs/) や [Apache Kafka](https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-introduction) などのサポートされているストリーミングソースに置き換えることができます。

* [こちらをクリック](https://github.com/Azure-Samples/streaming-at-scale)し、さまざまな Azure テクノロジーを選択して実装可能なエンドツーエンドのストリーミングソリューションの方法の詳細をご覧ください。

>**ご存知ですか？**  レートストリーミングソースは、指定された 1 秒あたりの行数でデータを生成し、各出力行にはタイムスタンプと値が含まれます。

In [None]:
dfStream = (spark
                .readStream
                .format("rate")
                .option("rowsPerSecond", 10)
                .load()
            )

### 2. IoTSignals スキーマに従いストリームデータフレームをフォーマットする


In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import uuid

numberOfDevices = 10
generate_uuid = F.udf(lambda : str(uuid.uuid4()), StringType())
              
dfIoTSignals = (dfStream
                    .withColumn("id", generate_uuid())
                    .withColumn("deviceId", F.concat(F.lit("dev-"), F.expr("mod(value, %d)" % numberOfDevices)))
                    .withColumn("dateTime", dfStream["timestamp"].cast(StringType()))
                    .withColumn("unit", F.expr("CASE WHEN rand() < 0.5 THEN 'Revolutions per Minute' ELSE 'MegaWatts' END"))
                    .withColumn("unitSymbol", F.expr("CASE WHEN rand() < 0.5 THEN 'RPM' ELSE 'MW' END"))
                    .withColumn("measureType", F.expr("CASE WHEN rand() < 0.5 THEN 'Rotation Speed' ELSE 'Output' END"))
                    .withColumn("measureValue", F.expr("CASE WHEN rand() > 0.95 THEN value * 10 WHEN rand() < 0.05 THEN value div 10 ELSE value END"))
                    .drop("timestamp", "value")
                )

### 3. Azure Cosmos DB コレクションへのストリーム書き込みを行う
>**ご存知ですか？** 「cosmos.oltp」は、Cosmos DB トランザクションストアへの接続を可能にする Spark フォーマットです。

>**ご存知ですか？** Cosmos DB コレクションへの取り込みは、分析ストアが有効になっているかどうかに関係なく、常にトランザクションストアを通じて実行されます。

In [None]:
streamQuery = dfIoTSignals\
                    .writeStream\
                    .format("cosmos.oltp")\
                    .outputMode("append")\
                    .option("spark.cosmos.connection.mode", "gateway") \
                    .option("spark.synapse.linkedService", "CosmosDBIoTDemo")\
                    .option("spark.cosmos.container", "IoTSignals")\
                    .option("checkpointLocation", "/writeCheckpointDir")\
                    .start()

streamQuery.awaitTermination()