In [1]:
#
# Job for storing the heatpump data to a parquet file on HDFS
#
# Import Libraries
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, LongType
from pyspark.sql import functions
from hops import kafka
from hops import tls
from hops import hdfs


# Setup the Schema for storing the data
schema = StructType([
    StructField("BrineOutTemp", IntegerType(), True),
    StructField("BrineInTemp", IntegerType(), True),
    StructField("OutdoorTemp", IntegerType(), True),
    StructField("SupplylineTemp", IntegerType(), True),
    StructField("ReturnlineTemp", IntegerType(), True),
    StructField("BrinepumpSpeed", IntegerType(), True),
    StructField("HotwaterTemp", IntegerType(), True),
    StructField("timestamp", LongType(), True)])

# Setup Topic namnet
TOPIC_NAME = "heatpump"
config = kafka.get_kafka_default_config()

# Setup Spark Kafka stream
dfk = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("subscribe", TOPIC_NAME) \
  .load()

# If in need of reading from start!
#  .option("startingOffsets", "earliest") \

# Test Stream to Parquet fil
PARQ_PATH  = "/Projects/" + hdfs.project_name() + "/Jupyter/Data/heatpump3.parquet"
CHECK_PATH =  "/Projects/" + hdfs.project_name() + "/Jupyter/Data/heatpump3_checkpoint/"

df_output = dfk \
        .selectExpr("CAST(value AS STRING)") \
        .select(functions.from_json("value", schema=schema).alias("data"))

# parquet sink example - will store just once!
targetParquetHDFS = df_output \
    .writeStream \
    .format("parquet") \
    .outputMode("append")\
    .option("path", PARQ_PATH) \
    .option("checkpointLocation", CHECK_PATH) \
    .trigger(once=True) \
    .start()

#    .trigger(processingTime="120 seconds") \


targetParquetHDFS.awaitTermination()



Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
84,application_1638826983412_0024,pyspark,idle,Link,Link


SparkSession available as 'spark'.
