In [None]:
%env JAVA_TOOL_OPTIONS='--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED'
%env FLINK_CONF_DIR=/Users/lgfquentin/dev/medical-dashboard/back-end-synthea-flink/config

In [None]:
%load_ext streaming_jupyter_integrations.magics

In [None]:
%flink_connect --execution-target remote --remote-hostname localhost --remote-port 8081
#%flink_connect 

In [None]:
%%flink_execute_sql
CREATE TABLE patient (
  `id` STRING,
  `name` ROW(`family` STRING, `given` STRING ARRAY) ARRAY,
  `gender` STRING,
  `birthDate` STRING,
  `telecom` ROW ( `value` STRING ) ARRAY,
  `address` ROW (
    `line` STRING ARRAY,
    `city` STRING,
    `state` STRING,
    `postalCode` STRING,
    `country` STRING
  ) ARRAY,
  `maritalStatus` ROW (
    `text` STRING
  )
) WITH (
  'connector' = 'kafka',
  'topic' = 'Patient',
  'properties.bootstrap.servers' = 'kafka-edge1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In [None]:
%%flink_execute_sql
CREATE TABLE diagnosticReport (
  `id` STRING,
  `subject` ROW (`reference` STRING),
  `code` ROW(`coding` ROW(`display` STRING) ARRAY)
) WITH (
  'connector' = 'kafka',
  'topic' = 'DiagnosticReport',
  'properties.bootstrap.servers' = 'kafka-edge1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In [None]:
%%flink_execute_sql
CREATE OR REPLACE TABLE hospital (
  `id` STRING,
  `identifier` ROW (`value` STRING) ARRAY,
  `status` STRING,
  `name` STRING,
  `telecom` ROW (`system` STRING, `value` STRING) ARRAY,
  `address` ROW (
    `line` STRING ARRAY,
    `city` STRING,
    `state` STRING,
    `postalCode` STRING,
    `country` STRING
  ),
  `position` ROW (
    `longitude` DOUBLE,
    `latitude` DOUBLE
  ),
  `managingOrganization` ROW (
    `identifier` ROW (`value` STRING)
  )
) WITH (
  'connector' = 'kafka',
  'topic' = 'Location',
  'properties.bootstrap.servers' = 'kafka-edge1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In [None]:
%%flink_execute_sql
CREATE TABLE practitionerRole (
  `id` STRING,
  `practitioner` ROW (
    `identifier` ROW (
      `value` STRING
    )
  ),
  `organization` ROW (
    `identifier` ROW (
      `value` STRING
    )
  ),
  `specialty` ROW (
    `text` STRING
  ) ARRAY
) WITH (
  'connector' = 'kafka',
  'topic' = 'PractitionerRole',
  'properties.bootstrap.servers' = 'kafka-edge1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In [None]:
%%flink_execute_sql
CREATE TABLE practitioner (
  `identifier` ROW (
    `value` STRING
  ) ARRAY,
  `active` BOOLEAN,
  `name` ROW (
    `family` STRING,
    `given` STRING ARRAY
  ) ARRAY,
  `telecom` ROW (
    `value` STRING
  ) ARRAY,
  `address` ROW (
    `line` STRING ARRAY,
    `city` STRING,
    `state` STRING,
    `postalCode` STRING,
    `country` STRING
  ) ARRAY,
  `gender` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'Practitioner',
  'properties.bootstrap.servers' = 'kafka-edge1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In [None]:
%%flink_execute_sql
CREATE TABLE encounter (
  `id` STRING,
  `status` STRING,
  `type` ROW (
    `coding` ROW (
      `code` STRING,
      `display` STRING
    ) ARRAY  ) ARRAY,
  `subject` ROW (
    `reference` STRING,
    `display` STRING
  ),
  `participant` ROW (
    `individual` ROW (
      `reference` STRING,
      `display` STRING
    )
  ) ARRAY,
  `period` ROW (
    `start` STRING,
    `end` STRING
  ),
  `location` ROW (
    `location` ROW (
      `reference` STRING,
      `display` STRING
    )
  ) ARRAY
) WITH (
  'connector' = 'kafka',
  'topic' = 'Encounter',
  'properties.bootstrap.servers' = 'kafka-edge1:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)

In [None]:
%%flink_execute_sql
SHOW TABLES

In [None]:
%%flink_execute_sql
SELECT * FROM patient

In [None]:
%%flink_execute_sql
SELECT * FROM diagnosticReport

In [None]:
%%flink_execute_sql
SELECT * FROM hospital

In [None]:
%%flink_execute_sql
SELECT * FROM practitioner

In [None]:
%%flink_execute_sql
SELECT * FROM practitionerRole

In [None]:
%%flink_execute_sql
SELECT * FROM encounter

In [None]:
%%flink_execute_sql
SELECT id, name[1].family as familyName, gender FROM patient limit 5

In [None]:
%%flink_execute_sql
SELECT id,subject.reference, code.coding[1].display as name FROM diagnosticReport limit 5

In [None]:
%%flink_execute_sql
SELECT p.name[1].family as familyName, p.gender, dr.id, dr.code.coding[1].display as diagnosticReportLabel
FROM diagnosticReport dr
JOIN patient p ON CONCAT('urn:uuid:', p.id) = dr.subject.reference

## Insert into Postgres

Create table in Postgres (replace container name by yours)

```bash
docker exec -it flink-sql_tutorial-db-1 psql -U example
> create table patient(identifier VARCHAR, familyName VARCHAR, gender VARCHAR, diagnosticReportLabel VARCHAR, PRIMARY KEY(identifier));
```

In [None]:
%%flink_execute_sql
CREATE TABLE hospital_psql (
  `id_organization` STRING,
  `id_location` STRING,
  `status` STRING,
  `name` STRING,
  `telecom` STRING,
  `address_line` STRING,
  `city` STRING,
  `state` STRING,
  `postalCode` STRING,
  `country` STRING,
  `longitude` DOUBLE,
  `latitude` DOUBLE,
  PRIMARY KEY (`id_organization`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db:5432/example',
  'table-name' = 'public.hospital',
  'username' = 'example',
  'password' = 'example'
);


In [None]:
%%flink_execute_sql
CREATE TABLE practitioner_psql (
  `identifier` STRING,
  `name` STRING,
  `email` STRING,
  `adress` STRING,
  `city` STRING,
  `state` STRING,
  `postalCode` STRING,
  `country` STRING,
  `gender` STRING,
  `role` STRING,
  `id_organization` STRING,
  PRIMARY KEY (`identifier`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db:5432/example',
  'table-name' = 'public.practitioner',
  'username' = 'example',
  'password' = 'example'
);


In [None]:
%%flink_execute_sql
CREATE TABLE patient_psql (
  `identifier` STRING,
  `surname` STRING,
  `familyName` STRING,
  `gender` STRING,
  `birthDate` STRING,
  `telecom` STRING,
  `adress` STRING,
  `city` STRING,
  `state` STRING,
  `postalCode` STRING,
  `country` STRING,
  `maritalStatus` STRING,
  PRIMARY KEY (`identifier`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db:5432/example',
  'table-name' = 'public.patient',
  'username' = 'example',
  'password' = 'example'
);


In [None]:
%%flink_execute_sql
CREATE TABLE visit_psql (
  `identifier` STRING,
  `id_organization` STRING,
  `id_practitioner` STRING,
  `id_patient` STRING,
  `status` STRING,
  `startTime` TIMESTAMP,
  `endTime` TIMESTAMP,
  `type` STRING,
  PRIMARY KEY (`identifier`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db:5432/example',
  'table-name' = 'public.visit',
  'username' = 'example',
  'password' = 'example'
);


In [None]:
%%flink_execute_sql
INSERT INTO patient_psql
SELECT id, p.name[1].family as familyname, p.name[1].given[1] as surname, gender, birthDate, p.telecom[1][1] as telecom, p.address[1].line[1] as address,  p.address[1].city as city, p.address[1].state as state, p.address[1].country as country, p.address[1].postalCode as postalCode, maritalStatus.text as maritalStatus from patient p;

In [None]:
%%flink_execute_sql
SELECT id, p.name[1].family as familyname, p.name[1].given[1] as surname, gender, birthDate, p.telecom[1][1] as telecom, p.address[1].line[1] as address,  p.address[1].city as city, p.address[1].state as state, p.address[1].country as country, p.address[1].postalCode as postalCode, maritalStatus.text as maritalStatus from patient p;