Skip to content

tspannhw/pulsar-thermal-pinot

Repository files navigation

pulsar-thermal-pinot

Apache Pulsar - Apache Pinot - Thermal Sensor Data

img

Meetup December 2022

https://www.meetup.com/new-york-city-apache-pulsar-meetup/events/289817171/

meetup

flipnkitten

Create Topic in Pulsar

bin/pulsar-admin topics delete persistent://public/default/thermalsensors

bin/pulsar-admin topics create persistent://public/default/thermalsensors

bin/pulsar-admin topics create-partitioned-topic --partitions 1 persistent://public/default/thermalsensors

Consume Data in Pulsar


bin/pulsar-client consume "persistent://public/default/thermalsensors" -s "thrmlsnosconsumer" -n 0

DevOps Pulsar

curl http://localhost:8080/admin/v2/persistent/public/default

curl http://localhost:8080/admin/v2/persistent/public/default/thermalsensors-partition-0/stats

http://localhost:8080/admin/v2/persistent/public/default/thermalsensors/partitions?createLocalTopicOnly=false

Data


{
 "uuid": "thrml_qsx_20221121215610",
 "ipaddress": "192.168.1.179",
 "cputempf": 115,
 "runtime": 0,
 "host": "thermal",
 "hostname": "thermal",
 "macaddress": "e4:5f:01:7c:3f:34",
 "endtime": "1669067770.6400402",
 "te": "0.0005550384521484375",
 "cpu": 4.5,
 "diskusage": "102676.2 MB",
 "memory": 9.7,
 "rowid": "20221121215610_8e753591-cb7c-4e1c-886d-85cb3dba6c50",
 "systemtime": "11/21/2022 16:56:15",
 "ts": 1669067775,
 "starttime": "11/21/2022 16:56:10",
 "datetimestamp": "2022-11-21 21:56:14.404291+00:00",
 "temperature": 27.9069,
 "humidity": 24.89,
 "co2": 698.0,
 "totalvocppb": 0.0,
 "equivalentco2ppm": 65535.0,
 "pressure": 102048.65,
 "temperatureicp": 82.0
}

Continuous Analytics with Flink SQL (Pulsar-Flink 1.15+ Connector)

Reference: https://github.com/tspannhw/pulsar-transit-function


CREATE CATALOG pulsar WITH (
   'type' = 'pulsar-catalog',
   'catalog-service-url' = 'pulsar://localhost:6650',
   'catalog-admin-url' = 'http://localhost:8080'
);

SHOW CURRENT DATABASE;
SHOW DATABASES;

USE CATALOG pulsar;

set table.dynamic-table-options.enabled = true;

show databases;

use `public/default`;

SHOW TABLES;

describe `thermalsensors`;

show create table `thermalsensors`;
CREATE TABLE `pulsar`.`public/default`.`thermalsensors` (
  `uuid` VARCHAR(2147483647) NOT NULL,
  `ipaddress` VARCHAR(2147483647) NOT NULL,
  `cputempf` INT NOT NULL,
  `runtime` INT NOT NULL,
  `host` VARCHAR(2147483647) NOT NULL,
  `hostname` VARCHAR(2147483647) NOT NULL,
  `macaddress` VARCHAR(2147483647) NOT NULL,
  `endtime` VARCHAR(2147483647) NOT NULL,
  `te` VARCHAR(2147483647) NOT NULL,
  `cpu` FLOAT NOT NULL,
  `diskusage` VARCHAR(2147483647) NOT NULL,
  `memory` FLOAT NOT NULL,
  `rowid` VARCHAR(2147483647) NOT NULL,
  `systemtime` VARCHAR(2147483647) NOT NULL,
  `ts` INT NOT NULL,
  `starttime` VARCHAR(2147483647) NOT NULL,
  `datetimestamp` VARCHAR(2147483647) NOT NULL,
  `temperature` FLOAT NOT NULL,
  `humidity` FLOAT NOT NULL,
  `co2` FLOAT NOT NULL,
  `totalvocppb` FLOAT NOT NULL,
  `equivalentco2ppm` FLOAT NOT NULL,
  `pressure` FLOAT NOT NULL,
  `temperatureicp` FLOAT NOT NULL
) WITH (
  'connector' = 'pulsar',
  'topics' = 'persistent://public/default/thermalsensors',
  'format' = 'json',
  'admin-url' = 'http://localhost:8080',
  'service-url' = 'pulsar://localhost:6650'
)

select * from thermalsensors;

Create Postgresql Table


CREATE TABLE "public"."thermalalerts" (
systemtime VARCHAR(256),
humidity FLOAT,
temperature FLOAT,
uuid VARCHAR(256),
co2 FLOAT,
datetimestamp VARCHAR(256),
rowid VARCHAR(256),
diskusage VARCHAR(256)
);

Access Docker Container


docker exec -it pinot-controller /bin/bash

Delete Table and Delete Schema


curl -X DELETE "http://localhost:9000/tables/thermal?type=realtime" -H "accept: application/json"

curl -X DELETE "http://localhost:9000/schemas/thermal" -H "accept: application/json"

Build a Schema From JSON Data


docker exec -it pinot-controller bin/pinot-admin.sh JsonToPinotSchema \
  -timeColumnName ts \
  -metrics "temperature,humidity,co2,totalvocppb,equivalentco2ppm,pressure,temperatureicp,cputempf"\
  -dimensions "host,ipaddress" \
  -pinotSchemaName=thermal \
  -jsonFile=/data/thermal.json \
  -outputDir=/config
  

Add our schema


docker exec -it pinot-controller bin/pinot-admin.sh AddSchema   \
  -schemaFile /config/thermalschema.json \
  -exec
  

Load schema via REST

Add Table Via Swagger UI / Curl


curl -X POST "http://localhost:9000/tables" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"tableName\": \"thermal\", \"tableType\": \"REALTIME\", \"segmentsConfig\": { \"timeColumnName\": \"ts\", \"schemaName\": \"thermal\", \"replication\": \"1\", \"replicasPerPartition\": \"1\" }, \"ingestionConfig\": { \"batchIngestionConfig\": { \"segmentIngestionType\": \"APPEND\", \"segmentIngestionFrequency\": \"DAILY\" } }, \"tableIndexConfig\": { \"loadMode\": \"MMAP\", \"streamConfigs\": { \"streamType\": \"pulsar\", \"stream.pulsar.topic.name\": \"persistent://public/default/thermalsensors\", \"stream.pulsar.bootstrap.servers\": \"pulsar://192.168.1.153:6650\", \"stream.pulsar.consumer.type\": \"lowlevel\", \"stream.pulsar.fetch.timeout.millis\": \"10000\", \"stream.pulsar.consumer.prop.auto.offset.reset\": \"largest\", \"stream.pulsar.consumer.factory.class.name\": \"org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory\", \"stream.pulsar.decoder.class.name\": \"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder\", \"realtime.segment.flush.threshold.rows\": \"0\", \"realtime.segment.flush.threshold.time\": \"1h\", \"realtime.segment.flush.threshold.segment.size\": \"5M\" } }, \"tenants\": {}, \"metadata\": {}}"

Fix a segment

curl -X POST "{host}/segments/thermal_REALTIME/{segmentName}/reset"

Defining Pulsar-Pinot Realtime Table

If you use stream.pulsar.consumer.prop.auto.offset.reset=smallest than it goes back earliest which can be a lot of data.

https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/apache-pulsar

This could be millions or billions of records.

Pinot Cluster

Pinot Table Definition

Query Console Table Schema

Query Console Table Information

Query Console SQL results

Adding a Realtime Table via REST AI / Swagger Docs

Apache Pinot Query

select systemtime, totalvocppb, temperature, cputempf, humidity, co2, equivalentco2ppm,
       pressure, temperatureicp, ts, datetimestamp, cpu, diskusage, memory, rowid
from thermal 
order by ts desc
limit 200

Superset + Pinot

Run this to initialize: https://github.com/kbastani/climate-change-analysis/blob/master/docker/docker-init.sh

Let's Explore and Visualize Apache Pinot Data

Apache Superset

Add a Database

Edit Database

Configure a Pinot dataset

Datasets

Query and Validate the dataset

Query Dataset

Create a new chart from that dataset

Create a New Chart

Save the chart to a dashboard

Save Chart

Build a dashboard of charts and markdown

Dashboard

Build more charts

chart1

Build more charts

chart2

Superset CSV Chart Extract

Video Preview

https://youtu.be/KMbTlmoDXXA

img

Pulsar to NiFi

NiFI to Postgresql

img

img

img

img

img

img

img

img

Postgresql Exploration

img img img img img

References

mastodon

About

Apache Pulsar - Apache Pinot - Thermal Sensor Data

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published