Skip to content

Latest commit

 

History

History
58 lines (49 loc) · 1.81 KB

environment.adoc

File metadata and controls

58 lines (49 loc) · 1.81 KB
docker-compose up -d
docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
CREATE SOURCE CONNECTOR DOGS WITH (
    'connector.class'                = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'key.converter'                  = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'genkp.dogs.with'                = '#{Internet.uuid}',
    'genv.dogs.name.with'            = '#{Dog.name}',
    'genv.dogs.dogsize.with'         = '#{Dog.size}',
    'genv.dogs.age.with'             = '#{Dog.age}',
    'topic.dogs.throttle.ms'         = 1000
);
CREATE STREAM DOGS (ID STRING KEY, NAME STRING, DOGSIZE STRING, AGE STRING)
  WITH (KAFKA_TOPIC='dogs', VALUE_FORMAT='JSON');
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE DOGS_BY_SIZE AS
  SELECT DOGSIZE AS DOG_SIZE, COUNT(*) AS DOGS_CT
    FROM DOGS WINDOW TUMBLING (SIZE 15 MINUTE)
    GROUP BY DOGSIZE;

Test a pull query

SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
       TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
       DOG_SIZE, DOGS_CT
FROM DOGS_BY_SIZE
WHERE DOG_SIZE='medium';

Test a push query

SELECT ROWTIME, ID, NAME, DOGSIZE, AGE FROM DOGS EMIT CHANGES;