In [None]:
import configargparse
from pathlib import Path
from spark_utils.streaming_utils import event_hub_parse, preview_stream

p = configargparse.ArgParser(prog='streaming.py',
                             description='Streaming Job Sample',
                             default_config_files=[Path().joinpath('configuration/run_args_data_generator.conf').resolve().as_posix()],
                             formatter_class=configargparse.ArgumentDefaultsHelpFormatter)
p.add('--output-eh-connection-string', type=str, required=True,
      help='Output Event Hub connection string', env_var='GENERATOR_OUTPUT_EH_CONNECTION_STRING')

args, unknown_args = p.parse_known_args()

if unknown_args:
    print("Unknown args:")
    _ = [print(arg) for arg in unknown_args]


In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark_conf = SparkConf(loadDefaults=True)

spark = SparkSession\
    .builder\
    .config(conf=spark_conf)\
    .getOrCreate()

sc = spark.sparkContext
print("Spark Configuration:")
_ = [print(k + '=' + v) for k, v in sc.getConf().getAll()]


In [None]:

rateStream = spark \
  .readStream \
  .format("rate") \
  .option("rowsPerSecond", 10) \
  .load()

preview_stream(rateStream, await_seconds=3)

In [None]:
from pyspark.sql.functions import col, lit, struct

generatedData = rateStream \
   .withColumn("value", col("value") * 3019) \
   .withColumnRenamed("timestamp", "ObservationTime") \
   .withColumn("MeterId", col("value") % lit(127)) \
   .withColumn("SupplierId", col("value") % lit(23)) \
   .withColumn("Measurement", struct(
       (col("value") % lit(59)).alias("Value"),
       lit("kWH").alias("Unit")
   )) \
   .drop("value")

preview_stream(generatedData,await_seconds=3)

In [None]:
from pyspark.sql.functions import to_json

jsonData = generatedData \
    .select(to_json(struct(col("*"))).cast("string").alias("body"))

preview_stream(jsonData, await_seconds=3)

In [None]:
from spark_utils.schemas import message_schema
from pyspark.sql.functions import col, from_json
fromjsondata = jsonData \
            .select(from_json(col("body").cast("string"), message_schema).alias("message")) \
            .select(col("message.*"))

preview_stream(fromjsondata, await_seconds=3)

In [None]:
from pyspark.sql.functions import to_json

eh_conf = {
    'eventhubs.connectionString':
    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(args.output_eh_connection_string)
}

exec = jsonData \
    .writeStream \
    .format("eventhubs") \
    .options(**eh_conf) \
    .option("checkpointLocation", '.checkpoint/data-generator3') \
    .start()

exec.awaitTermination()
exec.stop()