Skip to content

tspannhw/StreamingAnalyticsUsingFlinkSQL

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

StreamingAnalyticsUsingFlinkSQL

FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

Running on NVIDIA XAVIER NX - 6 CPU, GPU, 8GB RAM

Xavier

Compile Java

jetson_clocks

mvn clean compile assembly:single

Create Your Topic and Schema

StreamNative Cloud Schema

Run Python and Java


#!/bin/bash

while :
do

        DATE=$(date +"%Y-%m-%d_%H%M")
        python3 -W ignore /home/nvidia/nvme/minifi-jetson-xavier/demo.py --camera /dev/video0 --network googlenet /home/nvidia/nvme/images/$DATE.jpg  2>/dev/null

        java -jar IoTProducer-1.0-jar-with-dependencies.jar --topic 'jetsoniot2' --serviceUrl pulsar+ssl://cluster.org.snio.cloud:6651 --audience urn:sn:pulsar:org:cluster --issuerUrl https://auth.streamnative.cloud --privateKey file:///home/nvidia/nvme/pulsar-demo/org-tspann.json --message "`tail -1 /home/nvidia/nvme/logs/demo1.log`"

        sleep 1
done


Create a New Topic

StreamNative Cloud Create New Topic

Create a New Subscription

StreamNative Cloud Create New Subscription

Browse Data

StreamNative Cloud Consumer

Created Schema

{
    "type": "record",
    "name": "IoTMessage",
    "namespace": "io.streamnative.examples.oauth2",
    "fields": [
        {
            "name": "camera",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "cpu",
            "type": "double"
        },
        {
            "name": "cputemp",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "cputempf",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "diskusage",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "filename",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "gputemp",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "gputempf",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "host",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "host_name",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "imageinput",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "ipaddress",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "macaddress",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "memory",
            "type": "double"
        },
        {
            "name": "networktime",
            "type": "double"
        },
        {
            "name": "runtime",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "systemtime",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "te",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "top1",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "top1pct",
            "type": "double"
        },
        {
            "name": "uuid",
            "type": [
                "null",
                "string"
            ],
            "default": null
        }
    ]
}

Create a Flink SQL Table on Pulsar


CREATE TABLE jetsoniot3
(
  `id` STRING, uuid STRING, ir STRING,
  `end` STRING, lux STRING, gputemp STRING, 
  cputemp STRING, `te` STRING, systemtime STRING, hum STRING,
 memory STRING, gas STRING, pressure STRING, 
 `host` STRING, diskusage STRING, ipaddress STRING, macaddress STRING, 
  gputempf STRING, host_name STRING, camera STRING, filename STRING, 
    `runtime` STRING, cpu STRING,cputempf STRING, imageinput STRING,
    `networktime` STRING, top1 STRING, top1pct STRING, 
  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
) WITH (
  'topic' = 'persistent://public/default/jetsoniot2',
  'value.format' = 'json',
  'scan.startup.mode' = 'earliest'
)

CREATE TABLE topitems (
  uuid  STRING,
  top1 STRING, top1pct STRING, 
  camera STRING,
  systemtime STRING,
  cputempf STRING,
  gputempf STRING,
  insert_time TIMESTAMP(3)
)

CREATE TABLE jetsoniotresults
(
  uuid STRING,
camera STRING,
 ipaddress STRING,
    `networktime` STRING,
top1pct double, 
 top1 STRING, 
cputemp STRING,
gputemp STRING,
gputempf STRING,
cputempf STRING,
`runtime` STRING,
`host` STRING,
`filename` STRING,
imageinput STRING,
host_name STRING,
macaddress STRING,
`te` STRING,
`systemtime` STRING,
cpu double,
`diskusage` STRING,
`memory` double,
  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
)

Run Your Flink SQL

StreamNative Cloud Flink SQL

select cputempf, gputempf, diskusage, cpu, systemtime, uuid
from jetsoniot2
where cputempf > 80


INSERT INTO jetsoniot2 VALUES
  (1, 100, 30.15, CURRENT_TIMESTAMP),
  (2, 200, 40, CURRENT_TIMESTAMP),
  (3, 300, 28000.56, CURRENT_TIMESTAMP),
  (4, 400, 42960.90, CURRENT_TIMESTAMP),
  (5, 500, 50000.1, CURRENT_TIMESTAMP),
  (6, 100, 688888888.7, CURRENT_TIMESTAMP),
  (7, 300, 20.99, CURRENT_TIMESTAMP),
  (8, 100, 6000, CURRENT_TIMESTAMP)
  
  
  select camera,
        max(cputempf) as maxcputempf, avg(cputempf) as avgcputtempf, min(cputempf) as mincputempf  
from jetsoniot2 
group by camera


select camera,
        max(cputempf) as maxcputempf
from jetsoniot2 /*+ OPTIONS('scan.startup.mode'='earliest') */
group by camera

select *
from jetsoniot2 /*+ OPTIONS('scan.startup.mode'='earliest') */

select camera,
        min(cputempf) as mincputempf
from jetsoniot2 /*+ OPTIONS('scan.startup.mode'='earliest') */
group by camera


Checks


jtop

Resources (Ops, DevOps, Management, Administration, SQL, Compute, Deploy)

About

FLiP: StreamNative: Cloud-Native: Streaming Analytics Using Apache Flink SQL on Apache Pulsar

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published