In [None]:
%load_ext jupyflink.magics

In [None]:
%flink_gateway_init --debug

In [None]:
%%flink_gateway_sql_prepare
ADD JAR '/opt/gateway/jars/flink-sql-connector-kafka-3.2.0-1.19.jar';

In [None]:
%%flink_gateway_sql_prepare
drop table if exists attributes;
CREATE TABLE attributes (
  `entityId` STRING,
  `name` STRING,
  `nodeType` STRING,
  `valueType` STRING,
  `type` STRING,
  `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
  `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
  `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
  `deleted` BOOLEAN,
  `synched` BOOLEAN,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'kafka',
  'topic' = 'iff.ngsild.attributes',
'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
  'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
DROP VIEW IF EXISTS `attributes_view`;
CREATE VIEW `attributes_view` AS
SELECT `type`,
`entityId`,
`name`,
`nodeType`,
`valueType`,
`https://uri.etsi.org/ngsi-ld/datasetId`,
`https://uri.etsi.org/ngsi-ld/hasValue`,
`https://uri.etsi.org/ngsi-ld/hasObject`,
`deleted`,
`ts` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `entityId`,`name`, `https://uri.etsi.org/ngsi-ld/datasetId`
ORDER BY ts DESC) AS rownum
FROM `attributes` )
WHERE rownum = 1;


In [None]:
%%flink_gateway_sql_prepare
drop table if exists ngsild_updates;
CREATE TABLE ngsild_updates (
  `op` STRING,
  `overwriteOrReplace` Boolean,
  `noForward` Boolean,
  `entities` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'iff.ngsild-updates',
  'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

In [None]:
%%flink_gateway_sql_prepare
DROP TABLE IF EXISTS `propertyChecksTable`;

CREATE TABLE `propertyChecksTable` (
    `targetClass` STRING,
    `propertyPath` STRING,
    `propertyClass` STRING,
    `propertyNodetype` STRING,
    `maxCount` STRING,
    `minCount` STRING,
    `severity` STRING,
    `minExclusive` STRING,
    `maxExclusive` STRING,
    `minInclusive` STRING,
    `maxInclusive` STRING,
    `minLength` STRING,
    `maxLength` STRING,
    `pattern` STRING,
    `ins` STRING, 
    PRIMARY KEY (targetClass,propertyPath) NOT ENFORCED) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True', 
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'propertyChecksTable');


In [None]:
%%flink_gateway_sql_prepare
DROP TABLE IF EXISTS `relationshipChecksTable`;
CREATE TABLE `relationshipChecksTable` (
    `targetClass` STRING,
    `propertyPath` STRING,
    `propertyClass` STRING,
    `maxCount` STRING,
    `minCount` STRING,
    `severity` STRING, 
    PRIMARY KEY (targetClass,propertyPath) NOT ENFORCED) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'topic' = 'relationshipChecksTable');


In [None]:
%%flink_gateway_sql_prepare
DROP TABLE IF EXISTS `entity`;
CREATE TABLE `entity` (
    `id` STRING,
    `type` STRING,
    `deleted` BOOLEAN,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
    watermark FOR `ts` AS `ts` - INTERVAL '0.001' SECONDS
) WITH (
    'connector' = 'kafka',
    'format' = 'json',
    'json.fail-on-missing-field' = 'False',
    'json.ignore-parse-errors' = 'True',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
    'scan.startup.mode' = 'earliest-offset', 
    'topic' = 'iff.ngsild.entities'
);
CREATE VIEW `entity_view` AS SELECT `type`,
 `id`, `deleted`,
 `ts` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `id`
ORDER BY ts DESC) AS rownum
FROM `entity` )
WHERE rownum = 1 and `type` IS NOT NULL;

In [None]:
%%flink_gateway_sql_prepare
DROP TABLE IF EXISTS `rdf`;
CREATE TABLE `rdf` (
`subject` STRING,
`predicate` STRING,
`object` STRING,
`index` INTEGER,
PRIMARY KEY(`subject`,`predicate`,`index`)  NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'iff.rdf',
  'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
  'value.format' = 'json',
  'key.format' = 'json'
);

In [None]:
%%flink_gateway_sql_prepare
CREATE TABLE `alerts_bulk` (
    `resource` STRING,
    `event` STRING,
    `environment` STRING,
    `service` ARRAY<STRING>,
    `severity` STRING,
    `customer` STRING,
    `text` STRING,watermark FOR ts AS ts,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, 
    PRIMARY KEY (resource,event) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'value.format' = 'json',
    'value.json.fail-on-missing-field' = 'False',
    'value.json.ignore-parse-errors' = 'True',
    'key.format' = 'json',
    'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092', 
    'topic' = 'iff.alerts.bulk'
);

In [None]:
%%flink_gateway_sql_prepare
drop table if exists attributes_insert;
CREATE TABLE attributes_insert (
  `entityId` STRING,
  `name` STRING,
  `nodeType` STRING,
  `valueType` STRING,
  `type` STRING,
  `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
  `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
  `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
  PRIMARY KEY (entityId,name) NOT ENFORCED,
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'iff.ngsild.attributes_insert',
  'value.json.fail-on-missing-field' = 'False',
  'value.json.ignore-parse-errors' = 'True',
  'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);
DROP VIEW IF EXISTS `attributes_insert_view`;
CREATE VIEW `attributes_insert_view` AS
SELECT `type`,
`entityId`,
`name`,
`nodeType`,
`valueType`,
`https://uri.etsi.org/ngsi-ld/datasetId`,
`https://uri.etsi.org/ngsi-ld/hasValue`,
`https://uri.etsi.org/ngsi-ld/hasObject`,
`ts` FROM (
  SELECT *,
ROW_NUMBER() OVER (PARTITION BY `entityId`,`name`, `https://uri.etsi.org/ngsi-ld/datasetId`
ORDER BY ts DESC) AS rownum
FROM `attributes` )
WHERE rownum = 1;

In [None]:
%%flink_gateway_sql_prepare
drop table if exists attributes_writeback;
CREATE TABLE attributes_writeback (
  `entityId` STRING,
  `name` STRING,
  `nodeType` STRING,
  `valueType` STRING,
  `type` STRING,
  `https://uri.etsi.org/ngsi-ld/datasetId` STRING,
  `https://uri.etsi.org/ngsi-ld/hasValue` STRING,
  `https://uri.etsi.org/ngsi-ld/hasObject` STRING,
    PRIMARY KEY (entityId,name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'iff.ngsild.attributes',
  'value.json.fail-on-missing-field' = 'False',
  'value.json.ignore-parse-errors' = 'True',
  'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);