Skip to content

tspannhw/FLiP-Pi-DeltaLake-Thermal

main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 

FLiP-Pi-DeltaLake-Thermal

Apache Pulsar -> Sink -> DeltaLake

Diagram

Python Pulsar Record

*** Note: current version requires no nulls, no maps. ***

class thermal(Record):
    uuid = String(required=True)
    ipaddress = String(required=True)
    cputempf = Integer(required=True)
    runtime = Integer(required=True)
    host = String(required=True)
    hostname = String(required=True)
    macaddress = String(required=True)
    endtime = String(required=True)
    te = String(required=True)
    cpu = Float(required=True)
    diskusage = String(required=True)
    memory = Float(required=True)
    rowid = String(required=True)
    systemtime = String(required=True)
    ts = Integer(required=True)
    starttime = String(required=True)
    datetimestamp = String(required=True)
    temperature = Float(required=True)
    humidity = Float(required=True)
    co2 =  Float(required=True)

Raspberry Pi Sensor Python Run

2022-07-15 09:08:06.115 INFO  [3034530880] HandlerBase:64 | [persistent://public/default/pi-sensors-partition-0, ] Getting connection from pool
2022-07-15 09:08:06.117 INFO  [3034530880] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-07-15 09:08:06.117 INFO  [3034530880] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650
2022-07-15 09:08:06.120 INFO  [3034530880] ClientConnection:370 | [192.168.1.204:47626 -> 192.168.1.230:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2022-07-15 09:08:06.127 INFO  [3034530880] ProducerImpl:189 | [persistent://public/default/pi-sensors-partition-0, ] Created producer on broker [192.168.1.204:47626 -> 192.168.1.230:6650]
SCD4X, Serial: d3efd3efd3ef
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_fui_20220715130806', 'ipaddress': '192.168.1.204', 'cputempf': 106, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890486.6318262', 'te': '0.0005743503570556641', 'cpu': 0.0, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130806_60188a3a-57f0-4bc3-819b-5b643e0de5b9', 'systemtime': '07/15/2022 09:08:12', 'ts': 1657890492, 'starttime': '07/15/2022 09:08:06', 'datetimestamp': '2022-07-15 13:08:11.308836+00:00', 'temperature': 27.6959, 'humidity': 31.7, 'co2': 1360.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_wcc_20220715130812', 'ipaddress': '192.168.1.204', 'cputempf': 106, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890492.320293', 'te': '0.0007376670837402344', 'cpu': 0.0, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130812_c59a07d1-8994-49e8-888a-44f82f7ed441', 'systemtime': '07/15/2022 09:08:17', 'ts': 1657890497, 'starttime': '07/15/2022 09:08:12', 'datetimestamp': '2022-07-15 13:08:16.084772+00:00', 'temperature': 27.2527, 'humidity': 32.3, 'co2': 1365.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_wvx_20220715130817', 'ipaddress': '192.168.1.204', 'cputempf': 106, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890497.0999746', 'te': '0.0007336139678955078', 'cpu': 3.3, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130817_af0783b4-12f4-49f3-b5e2-d4ad912d369b', 'systemtime': '07/15/2022 09:08:21', 'ts': 1657890501, 'starttime': '07/15/2022 09:08:17', 'datetimestamp': '2022-07-15 13:08:20.862417+00:00', 'temperature': 26.983, 'humidity': 32.78, 'co2': 1381.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrml_cgk_20220715130821', 'ipaddress': '192.168.1.204', 'cputempf': 105, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1657890501.8731027', 'te': '0.0007915496826171875', 'cpu': 0.0, 'diskusage': '105078.3 MB', 'memory': 9.0, 'rowid': '20220715130821_bb918b7c-f074-4cd0-8315-ab7d773cd9d2', 'systemtime': '07/15/2022 09:08:26', 'ts': 1657890506, 'starttime': '07/15/2022 09:08:21', 'datetimestamp': '2022-07-15 13:08:25.636997+00:00', 'temperature': 26.7052, 'humidity': 33.3, 'co2': 1349.0}
^C2022-07-15 09:08:30.165 INFO  [3069380992] ClientImpl:495 | Closing Pulsar client with 1 producers and 0 consumers
2022-07-15 09:08:30.166 INFO  [3069380992] ProducerImpl:686 | [persistent://public/default/pi-sensors-partition-0, standalone-1-22] Closing producer for topic persistent://public/default/pi-sensors-partition-0
2022-07-15 09:08:30.169 INFO  [3034530880] ProducerImpl:729 | [persistent://public/default/pi-sensors-partition-0, standalone-1-22] Closed producer
2022-07-15 09:08:30.170 INFO  [3034530880] ClientConnection:1548 | [192.168.1.204:47626 -> 192.168.1.230:6650] Connection closed
2022-07-15 09:08:30.171 INFO  [3034530880] ClientConnection:1548 | [192.168.1.204:47624 -> 192.168.1.230:6650] Connection closed

Example Message


{
 "uuid": "thrml_zda_20220715182748",
 "ipaddress": "192.168.1.204",
 "cputempf": 108,
 "runtime": 0,
 "host": "thermal",
 "hostname": "thermal",
 "macaddress": "e4:5f:01:7c:3f:34",
 "endtime": "1657909668.7279365",
 "te": "0.0007398128509521484",
 "cpu": 1.8,
 "diskusage": "105078.0 MB",
 "memory": 9.0,
 "rowid": "20220715182748_fc4cbbb1-79da-4c1a-8991-78bd23c9f221",
 "systemtime": "07/15/2022 14:27:53",
 "ts": 1657909673,
 "starttime": "07/15/2022 14:27:48",
 "datetimestamp": "2022-07-15 18:27:52.492469+00:00",
 "temperature": 28.238,
 "humidity": 29.61,
 "co2": 992.0
}

Pulsar Delta Sink Configuration

  • conf/deltalakesink.yml


configs:
        type: "delta"
        maxCommitInterval: 120
        maxRecordsPerCommit: 10_000_000
        tablePath: "file:///opt/demo/lakehouse"
        processingGuarantees: "EXACTLY_ONCE"
        deltaFileType: "parquet"
        subscriptionType: "Failover"

Pulsar 2.9.* Sink Setup


bin/pulsar-admin topics delete "persistent://public/default/pi-sensors" --force
bin/pulsar-admin schemas delete "persistent://public/default/pi-sensors"
bin/pulsar-admin topics unload "persistent://public/default/pi-sensors"

bin/pulsar-admin sink stop --name delta_sink --namespace default --tenant public
bin/pulsar-admin sinks delete --tenant public --namespace default --name delta_sink

bin/pulsar-client consume "persistent://public/default/pi-sensors" -s "pisensorwatch" --subscription-type Failover

        
bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-lakehouse-2.9.2.24.nar --tenant public --namespace default --name delta_sink --sink-config-file conf/deltalakesink.yml --inputs "persistent://public/default/pi-sensors" --parallelism 1  --subs-name pisensorwatch --processing-guarantees EFFECTIVELY_ONCE


bin/pulsar-admin sinks get --tenant public --namespace default --name delta_sink

{
  "tenant": "public",
  "namespace": "default",
  "name": "delta_sink",
  "className": "org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector",
  "sourceSubscriptionName": "pisensorwatch",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "persistent://public/default/pi-sensors"
  ],
  "inputSpecs": {
    "persistent://public/default/pi-sensors": {
      "isRegexPattern": false,
      "schemaProperties": {},
      "consumerProperties": {},
      "poolMessages": false
    }
  },
  "configs": {
    "maxRecordsPerCommit": 10000000,
    "processingGuarantees": "EXACTLY_ONCE",
    "tablePath": "file:///opt/demo/lakehouse",
    "subscriptionType": "Failover",
    "deltaFileType": "parquet",
    "type": "delta",
    "maxCommitInterval": 120
  },
  "parallelism": 1,
  "processingGuarantees": "EFFECTIVELY_ONCE",
  "retainOrdering": true,
  "autoAck": true
}

bin/pulsar-admin sinks status --tenant public --namespace default --name delta_sink
{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReadFromPulsar" : 66,
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "numSinkExceptions" : 0,
      "latestSinkExceptions" : [ ],
      "numWrittenToSink" : 66,
      "lastReceivedTime" : 1657890506653,
      "workerId" : "c-standalone-fw-127.0.0.1-8080"
    }
  } ]
}

bin/pulsar-admin sinks list

[
  "scylla-airquality-sink",
  "delta_sink"
]


Output


/opt/demo/lakehouse/
total 12
-rw-r--r-- 1 root root    0 Jul 15 09:00 part-0000-f1bfd140-8da9-4507-8e8d-1d98c5faf67a-c000.snappy.parquet
drwxr-xr-x 2 root root 4096 Jul 15 09:00 _delta_log
-rw-r--r-- 1 root root 6968 Jul 15 09:00 part-0000-de100d45-4aae-444f-b078-85fd54662a8f-c000.snappy.parquet

cat /opt/demo/lakehouse/_delta_log/00000000000000000001.json
{"commitInfo":{"timestamp":1657890016312,"operation":"WRITE","operationParameters":{},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"pulsar-sink-connector-version-2.9.1 Delta-Standalone/0.3.0"}}
{"txn":{"appId":"pulsar-delta-sink-connector","version":1,"lastUpdated":1657890016310}}
{"add":{"path":"part-0000-de100d45-4aae-444f-b078-85fd54662a8f-c000.snappy.parquet","partitionValues":{},"size":4460,"modificationTime":1657890016310,"dataChange":true,"stats":"{}"}}

Querying Delta Lake (Scala Spark Shell)


bin/spark-shell --packages io.delta:delta-core_2.12:1.2.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

import io.delta.tables._
import org.apache.spark.sql.functions._

val df = spark.read.format("delta").load("/opt/demo/lakehouse/")
df.printSchema()

df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").orderBy(col("datetimestamp").desc).show(5,100)

Querying Delta Lake pyspark


pyspark --packages io.delta:delta-core_2.12:1.2.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

>>> df = spark.read.format("delta").load("/opt/demo/lakehouse/")
>>> df.show(10)
+--------------------+-------------+--------+-------+-------+--------+-----------------+------------------+--------------------+---+-----------+------+--------------------+-------------------+----------+-------------------+--------------------+-----------+--------+------+
|                uuid|    ipaddress|cputempf|runtime|   host|hostname|       macaddress|           endtime|                  te|cpu|  diskusage|memory|               rowid|         systemtime|        ts|          starttime|       datetimestamp|temperature|humidity|   co2|
+--------------------+-------------+--------+-------+-------+--------+-----------------+------------------+--------------------+---+-----------+------+--------------------+-------------------+----------+-------------------+--------------------+-----------+--------+------+
|thrml_aoi_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890015.3147886|0.000545024871826...|0.0|105078.3 MB|   9.0|20220715130015_dc...|07/15/2022 09:00:20|1657890020|07/15/2022 09:00:15|2022-07-15 13:00:...|    24.7132|    37.3|1332.0|
|thrml_crz_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890020.0944705|0.000758171081542...|0.0|105078.3 MB|   9.0|20220715130020_b1...|07/15/2022 09:00:24|1657890024|07/15/2022 09:00:20|2022-07-15 13:00:...|    24.6491|   37.46|1334.0|
|thrml_nre_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890024.8735719| 0.00072479248046875|0.0|105078.3 MB|   9.0|20220715130024_48...|07/15/2022 09:00:29|1657890029|07/15/2022 09:00:24|2022-07-15 13:00:...|    24.5797|   37.66|1336.0|
|thrml_umx_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890029.6493442|0.000723361968994...|0.0|105078.3 MB|   9.0|20220715130029_3d...|07/15/2022 09:00:34|1657890034|07/15/2022 09:00:29|2022-07-15 13:00:...|    24.4462|   37.83|1339.0|
|thrml_ijc_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890034.5295844|0.000723361968994...|6.5|105078.3 MB|   9.0|20220715130034_f9...|07/15/2022 09:00:39|1657890039|07/15/2022 09:00:34|2022-07-15 13:00:...|    24.3901|   38.07|1343.0|
|thrml_yfy_2022071...|192.168.1.204|     107|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890039.3061981|  0.0005035400390625|0.0|105078.3 MB|   9.0|20220715130039_f6...|07/15/2022 09:00:44|1657890044|07/15/2022 09:00:39|2022-07-15 13:00:...|      24.31|   38.23|1343.0|
|thrml_jzo_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890044.0833697|0.000773191452026...|0.0|105078.3 MB|   9.0|20220715130044_78...|07/15/2022 09:00:48|1657890048|07/15/2022 09:00:44|2022-07-15 13:00:...|    24.2486|   38.31|1345.0|
|thrml_onk_2022071...|192.168.1.204|     105|      0|thermal| thermal|e4:5f:01:7c:3f:34|  1657890048.86295|0.000727176666259...|4.9|105078.3 MB|   9.0|20220715130048_5b...|07/15/2022 09:00:53|1657890053|07/15/2022 09:00:48|2022-07-15 13:00:...|    24.1578|   38.41|1346.0|
|thrml_lrx_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890053.6392615|0.000728368759155...|0.0|105078.3 MB|   9.0|20220715130053_35...|07/15/2022 09:00:58|1657890058|07/15/2022 09:00:53|2022-07-15 13:00:...|    24.1311|   38.49|1347.0|
|thrml_wqr_2022071...|192.168.1.204|     106|      0|thermal| thermal|e4:5f:01:7c:3f:34|1657890058.4192047|0.000722885131835...|0.0|105078.3 MB|   9.0|20220715130058_4e...|07/15/2022 09:01:03|1657890063|07/15/2022 09:00:58|2022-07-15 13:01:...|    24.0536|   38.62|1348.0|
+--------------------+-------------+--------+-------+-------+--------+-----------------+------------------+--------------------+---+-----------+------+--------------------+-------------------+----------+-------------------+--------------------+-----------+--------+------+
only showing top 10 rows

df.printSchema()
root
 |-- uuid: string (nullable = false)
 |-- ipaddress: string (nullable = false)
 |-- cputempf: integer (nullable = false)
 |-- runtime: integer (nullable = false)
 |-- host: string (nullable = false)
 |-- hostname: string (nullable = false)
 |-- macaddress: string (nullable = false)
 |-- endtime: string (nullable = false)
 |-- te: string (nullable = false)
 |-- cpu: float (nullable = false)
 |-- diskusage: string (nullable = false)
 |-- memory: float (nullable = false)
 |-- rowid: string (nullable = false)
 |-- systemtime: string (nullable = false)
 |-- ts: integer (nullable = false)
 |-- starttime: string (nullable = false)
 |-- datetimestamp: string (nullable = false)
 |-- temperature: float (nullable = false)
 |-- humidity: float (nullable = false)
 |-- co2: float (nullable = false)
 
 
df.select("humidity","co2","datetimestamp","cputempf","ts", "uuid").show(100)

df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").show(5,100)

df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").orderBy("datetimestamp").show(5,100)

Python3 Setup

pip3 install -U pip
pip3 install delta-spark==2.0.0
pip3 install pyspark==3.2.0

Example Python3 App

from delta.tables import *
from pyspark.sql.functions import *
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("PulsarApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

df = spark.read.format("delta").load("/opt/demo/lakehouse/")
df.printSchema()
df.select("uuid","humidity","co2","cputempf","datetimestamp","ts").orderBy("datetimestamp").show(5,100)


Validating Parquet Files


pip3 install parquet-tools -U

parquet-tools inspect /opt/demo/lakehouse/part-0000-f2185319-1e50-4ca9-a8dd-69b7e54e552e-c000.snappy.parquet

############ file meta data ############
created_by: parquet-mr version 1.12.0 (build db75a6815f2ba1d1ee89d1a90aeb296f1f3a8f20)
num_columns: 20
num_rows: 3
num_row_groups: 1
format_version: 1.0
serialized_size: 3289


############ Columns ############
uuid
ipaddress
cputempf
runtime
host
hostname
macaddress
endtime
te
cpu
diskusage
memory
rowid
systemtime
ts
starttime
datetimestamp
temperature
humidity
co2

############ Column(uuid) ############
name: uuid
path: uuid
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ipaddress) ############
name: ipaddress
path: ipaddress
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(cputempf) ############
name: cputempf
path: cputempf
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(runtime) ############
name: runtime
path: runtime
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(host) ############
name: host
path: host
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(hostname) ############
name: hostname
path: hostname
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(macaddress) ############
name: macaddress
path: macaddress
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(endtime) ############
name: endtime
path: endtime
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(te) ############
name: te
path: te
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(cpu) ############
name: cpu
path: cpu
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(diskusage) ############
name: diskusage
path: diskusage
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(memory) ############
name: memory
path: memory
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(rowid) ############
name: rowid
path: rowid
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(systemtime) ############
name: systemtime
path: systemtime
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ts) ############
name: ts
path: ts
max_definition_level: 0
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(starttime) ############
name: starttime
path: starttime
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(datetimestamp) ############
name: datetimestamp
path: datetimestamp
max_definition_level: 0
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(temperature) ############
name: temperature
path: temperature
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(humidity) ############
name: humidity
path: humidity
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(co2) ############
name: co2
path: co2
max_definition_level: 0
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

Flink SQL


CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

use catalog pulsar;

show tables;

describe `pi-sensors`

select uuid, cputempf, temperature, humidity, co2, ts, datetimestamp from `pi-sensors`;

select max(cputempf) as MaxCPU, max(temperature) as MaxTemp, max(humidity) as MaxHumidity, max(co2) as MaxCO2 
from `pi-sensors` ;

FlinkSQLTableDef

FlinkSQLResults

FlinkSQLCluster

Pulsar SQL


presto> desc pulsar."public/default"."pi-sensors";
      Column       |   Type    | Extra |                                   Comment
-------------------+-----------+-------+-----------------------------------------------------------------------------
 uuid              | varchar   |       | "string"
 ipaddress         | varchar   |       | "string"
 cputempf          | integer   |       | "int"
 runtime           | integer   |       | "int"
 host              | varchar   |       | "string"
 hostname          | varchar   |       | "string"
 macaddress        | varchar   |       | "string"
 endtime           | varchar   |       | "string"
 te                | varchar   |       | "string"
 cpu               | real      |       | "float"
 diskusage         | varchar   |       | "string"
 memory            | real      |       | "float"
 rowid             | varchar   |       | "string"
 systemtime        | varchar   |       | "string"
 ts                | integer   |       | "int"
 starttime         | varchar   |       | "string"
 datetimestamp     | varchar   |       | "string"
 temperature       | real      |       | "float"
 humidity          | real      |       | "float"
 co2               | real      |       | "float"
 __partition__     | integer   |       | The partition number which the message belongs to
 __event_time__    | timestamp |       | Application defined timestamp in milliseconds of when the event occurred
 __publish_time__  | timestamp |       | The timestamp in milliseconds of when event as published
 __message_id__    | varchar   |       | The message ID of the message used to generate this row
 __sequence_id__   | bigint    |       | The sequence ID of the message used to generate this row
 __producer_name__ | varchar   |       | The name of the producer that publish the message used to generate this row
 __key__           | varchar   |       | The partition key for the topic
 __properties__    | varchar   |       | User defined properties
(28 rows)

presto> select co2, humidity,temperature, datetimestamp, ts, uuid, __publish_time__ from pulsar."public/default"."pi-sensors";
  co2   | humidity | temperature |          datetimestamp           |     ts     |           uuid           |    __publish_time__
--------+----------+-------------+----------------------------------+------------+--------------------------+-------------------------
 1042.0 |    32.55 |     26.4275 | 2022-07-15 18:44:34.803190+00:00 | 1657910675 | thrml_ska_20220715184430 | 2022-07-15 14:44:35.807
 1045.0 |    32.44 |     26.4676 | 2022-07-15 18:44:39.582550+00:00 | 1657910680 | thrml_udc_20220715184435 | 2022-07-15 14:44:40.587
 1046.0 |    32.49 |     26.4596 | 2022-07-15 18:44:44.358047+00:00 | 1657910685 | thrml_sqj_20220715184440 | 2022-07-15 14:44:45.362
 1047.0 |    32.48 |     26.4729 | 2022-07-15 18:44:49.133060+00:00 | 1657910690 | thrml_kff_20220715184445 | 2022-07-15 14:44:50.137
 1047.0 |    32.49 |     26.4649 | 2022-07-15 18:44:53.912035+00:00 | 1657910694 | thrml_xll_20220715184450 | 2022-07-15 14:44:54.916
 1048.0 |    32.43 |     26.4783 | 2022-07-15 18:44:58.790554+00:00 | 1657910699 | thrml_zkr_20220715184454 | 2022-07-15 14:44:59.793
 1049.0 |     32.5 |     26.4569 | 2022-07-15 18:45:03.567302+00:00 | 1657910704 | thrml_ogp_20220715184459 | 2022-07-15 14:45:04.571
 1050.0 |    32.53 |     26.4409 | 2022-07-15 18:45:08.347039+00:00 | 1657910709 | thrml_xto_20220715184504 | 2022-07-15 14:45:09.351
 1050.0 |    32.51 |     26.4409 | 2022-07-15 18:45:13.118426+00:00 | 1657910714 | thrml_kpz_20220715184509 | 2022-07-15 14:45:14.123
 1051.0 |    32.48 |     26.4382 | 2022-07-15 18:45:17.897249+00:00 | 1657910718 | thrml_ghy_20220715184514 | 2022-07-15 14:45:18.902
 1052.0 |     32.5 |     26.4676 | 2022-07-15 18:45:22.676866+00:00 | 1657910723 | thrml_uei_20220715184518 | 2022-07-15 14:45:23.680
 1052.0 |    32.41 |     26.4676 | 2022-07-15 18:45:27.552872+00:00 | 1657910728 | thrml_mzi_20220715184523 | 2022-07-15 14:45:28.557
 1051.0 |    32.36 |     26.4569 | 2022-07-15 18:45:32.331788+00:00 | 1657910733 | thrml_luk_20220715184528 | 2022-07-15 14:45:33.336
 1052.0 |    32.33 |     26.4783 | 2022-07-15 18:45:37.104204+00:00 | 1657910738 | thrml_tbp_20220715184533 | 2022-07-15 14:45:38.107
 1051.0 |    32.33 |     26.5317 | 2022-07-15 18:45:41.881281+00:00 | 1657910742 | thrml_dqf_20220715184538 | 2022-07-15 14:45:42.887
 1050.0 |    32.43 |     26.5557 | 2022-07-15 18:45:46.659983+00:00 | 1657910747 | thrml_nal_20220715184542 | 2022-07-15 14:45:47.665
 1051.0 |    32.38 |     26.5824 | 2022-07-15 18:45:51.537531+00:00 | 1657910752 | thrml_bxo_20220715184547 | 2022-07-15 14:45:52.543
 1050.0 |    32.39 |      26.553 | 2022-07-15 18:45:56.316338+00:00 | 1657910757 | thrml_gtv_20220715184552 | 2022-07-15 14:45:57.322
 1050.0 |     32.4 |     26.5824 | 2022-07-15 18:46:01.095270+00:00 | 1657910762 | thrml_keo_20220715184557 | 2022-07-15 14:46:02.099
 1050.0 |    32.31 |     26.5664 | 2022-07-15 18:46:05.865263+00:00 | 1657910766 | thrml_jmz_20220715184602 | 2022-07-15 14:46:06.869
 1050.0 |     32.4 |      26.529 | 2022-07-15 18:46:10.643571+00:00 | 1657910771 | thrml_hfg_20220715184606 | 2022-07-15 14:46:11.648
 1048.0 |    32.33 |     26.5317 | 2022-07-15 18:46:15.416817+00:00 | 1657910776 | thrml_xtg_20220715184611 | 2022-07-15 14:46:16.420
 1048.0 |    32.29 |     26.5557 | 2022-07-15 18:46:20.295350+00:00 | 1657910781 | thrml_vzo_20220715184616 | 2022-07-15 14:46:21.304
 1048.0 |    32.33 |     26.5637 | 2022-07-15 18:46:25.074648+00:00 | 1657910786 | thrml_mds_20220715184621 | 2022-07-15 14:46:26.084
 1047.0 |    32.36 |     26.5611 | 2022-07-15 18:46:29.851385+00:00 | 1657910790 | thrml_qgv_20220715184626 | 2022-07-15 14:46:30.856
 1049.0 |    32.41 |     26.5691 | 2022-07-15 18:46:34.629965+00:00 | 1657910795 | thrml_taj_20220715184630 | 2022-07-15 14:46:35.639
 1049.0 |    32.38 |     26.5797 | 2022-07-15 18:46:39.409163+00:00 | 1657910800 | thrml_zdj_20220715184635 | 2022-07-15 14:46:40.413
 1049.0 |    32.24 |     26.6225 | 2022-07-15 18:46:44.281726+00:00 | 1657910805 | thrml_pri_20220715184640 | 2022-07-15 14:46:45.290
 1048.0 |    32.18 |     26.5958 | 2022-07-15 18:46:49.060992+00:00 | 1657910810 | thrml_mhu_20220715184645 | 2022-07-15 14:46:50.066
 

PulsarSQL

Apache Spark Structured Streaming SQL

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/pi-sensors").load()

dfPulsar.printSchema()

root
 |-- uuid: string (nullable = false)
 |-- ipaddress: string (nullable = false)
 |-- cputempf: integer (nullable = false)
 |-- runtime: integer (nullable = false)
 |-- host: string (nullable = false)
 |-- hostname: string (nullable = false)
 |-- macaddress: string (nullable = false)
 |-- endtime: string (nullable = false)
 |-- te: string (nullable = false)
 |-- cpu: float (nullable = false)
 |-- diskusage: string (nullable = false)
 |-- memory: float (nullable = false)
 |-- rowid: string (nullable = false)
 |-- systemtime: string (nullable = false)
 |-- ts: integer (nullable = false)
 |-- starttime: string (nullable = false)
 |-- datetimestamp: string (nullable = false)
 |-- temperature: float (nullable = false)
 |-- humidity: float (nullable = false)
 |-- co2: float (nullable = false)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start()

HTML Table Display

  • To Run: python3 -m http.server 8000
  • See thermal.html

ThermalHTML

References