In [None]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pathlib import Path

In [1]:
from datetime import datetime

# Il timestamp fornito è probabilmente in nanosecondi (come da epoch UNIX)
timestamp_ns = 1750520804967000000
# Converti in secondi
timestamp_s = timestamp_ns / 1e9
# Converti in formato leggibile
readable_time = datetime.utcfromtimestamp(timestamp_s)
print(readable_time)

2025-06-21 15:46:44.967000


In [None]:
# 1. create a TableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.in_streaming_mode()
table_env = StreamTableEnvironment.create(env, environment_settings=settings)

# Get all jar files in the 'jars' folder relative to this notebook
base_dir = Path.cwd() / "jars"
jar_files = list(base_dir.glob("*.jar"))
# Create a semicolon-separated string of jar file URLs (with file:// protocol)
jars_str = ";".join([f"file://{jar}" for jar in jar_files])
# Add jars to the TableEnvironment configuration
table_env.get_config().set("pipeline.jars", jars_str)



In [None]:
jars_str

In [None]:
table_env.execute_sql("""
CREATE TABLE KafkaBoatData (
  asset_id STRING,
  `timestamp` STRING,
  source STRING,
  heading_value DOUBLE,
  gnss_position_altitude DOUBLE,
  gnss_position_latitude DOUBLE,
  gnss_position_longitude DOUBLE,
  speed_over_ground_value DOUBLE,
  course_over_ground_value DOUBLE,
  true_wind_direction_value DOUBLE,
  true_wind_speed_value DOUBLE,
  `time` TIMESTAMP_LTZ(3),
  tstamp STRING,
  boat STRING,
  TWA DOUBLE,
  TWAvalue DOUBLE,
  race_course_course_axis DOUBLE,
  CWA DOUBLE,
  CWAvalue DOUBLE,
  VMG DOUBLE,
  VMC DOUBLE,
  X DOUBLE,
  Y DOUBLE,
  Yrolling STRING,
  duration DOUBLE,
  time_to_prev DOUBLE,
  distance_to_prev DOUBLE,
  speed_over_ground_calculated DOUBLE,
  status STRING,
  foiling STRING,
  direction STRING,
  bord STRING,
  class STRING,
  turn STRING,
  tack STRING,
  gybe STRING,
  maneuver STRING,
  distance_to_start DOUBLE,
  distance_derivative DOUBLE,
  time_to_tack DOUBLE,
  time_to_gybe DOUBLE,
  time_to_turn DOUBLE,
  cumulative_VMG DOUBLE,
  leg STRING,
  Ydist DOUBLE,
  race STRING,
  opponent STRING,
  event_time AS `time`,
  processing_time AS PROCTIME(),
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'boat_data',
  'properties.bootstrap.servers' = 'kafka:19091',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',
  'json.timestamp-format.standard' = 'ISO-8601'
)
""")

In [None]:
table_env.execute_sql("""
CREATE TABLE KafkaBoatData2 (
  asset_id STRING,
  `timestamp` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'localhost:9091',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
)
""")

In [None]:
table_env.execute_sql("""
INSERT INTO KafkaBoatData2
SELECT
  asset_id,
  `timestamp`
FROM KafkaBoatData
""")

# Analisi Navigazione

In [None]:
table_env.execute_sql("""
CREATE TABLE Navigation (
  boat STRING,
  window_start TIMESTAMP_LTZ(3),
  window_end TIMESTAMP_LTZ(3),
  avg_speed DOUBLE,
  avg_vmg DOUBLE,
  avg_heading DOUBLE,
  avg_true_wind_speed DOUBLE,
  avg_true_wind_direction DOUBLE,
  is_foiling BIGINT,
  leg BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'boat_data_navigation',
  'properties.bootstrap.servers' = 'kafka:19091',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
""")

In [None]:
table_result = table_env.execute_sql("""
INSERT INTO Navigation
SELECT
  boat,
  TUMBLE_START(`time`, INTERVAL '1' SECOND) AS window_start,
  TUMBLE_END(`time`, INTERVAL '1' SECOND) AS window_end,
  AVG(speed_over_ground_value) AS avg_speed,
  AVG(VMG) AS avg_vmg,
  AVG(heading_value) AS avg_heading,
  AVG(true_wind_speed_value) AS avg_true_wind_speed,
  AVG(true_wind_direction_value) AS avg_true_wind_direction,
  MAX(
  CASE
    WHEN LOWER(foiling) = 'foiling' THEN 1
    ELSE 0
  END) AS is_foiling,
  MAX(
  CASE leg
  WHEN 'prestart' THEN 0
  WHEN 'up1' THEN 1
  WHEN 'down1' THEN 2
  WHEN 'up2' THEN 3
  WHEN 'down2' THEN 4
  WHEN 'up3' THEN 5
  WHEN 'down3' THEN 6
  WHEN 'up4' THEN 7
  WHEN 'down4' THEN 8
  WHEN 'up5' THEN 9
  WHEN 'postrace' THEN 10
  ELSE -1
END) AS leg_number
FROM KafkaBoatData
WHERE
  boat IS NOT NULL AND boat <> ''
  AND speed_over_ground_value BETWEEN 0 AND 60
  AND heading_value BETWEEN 0 AND 360
  AND true_wind_speed_value BETWEEN 0 AND 60
  AND true_wind_direction_value BETWEEN 0 AND 360
GROUP BY
  boat,
  TUMBLE(`time`, INTERVAL '1' SECOND)
""")


In [None]:
"""
CREATE TABLE KafkaBoatData (
  asset_id STRING,
  `timestamp` STRING,
  source STRING,
  heading_value DOUBLE,
  gnss_position_altitude DOUBLE,
  gnss_position_latitude DOUBLE,
  gnss_position_longitude DOUBLE,
  speed_over_ground_value DOUBLE,
  course_over_ground_value DOUBLE,
  true_wind_direction_value DOUBLE,
  true_wind_speed_value DOUBLE,
  `time` TIMESTAMP_LTZ(3),
  tstamp STRING,
  boat STRING,
  TWA DOUBLE,
  TWAvalue DOUBLE,
  race_course_course_axis DOUBLE,
  CWA DOUBLE,
  CWAvalue DOUBLE,
  VMG DOUBLE,
  VMC DOUBLE,
  X DOUBLE,
  Y DOUBLE,
  Yrolling STRING,
  duration DOUBLE,
  time_to_prev DOUBLE,
  distance_to_prev DOUBLE,
  speed_over_ground_calculated DOUBLE,
  status STRING,
  foiling STRING,
  direction STRING,
  bord STRING,
  class STRING,
  turn STRING,
  tack STRING,
  gybe STRING,
  maneuver STRING,
  distance_to_start DOUBLE,
  distance_derivative DOUBLE,
  time_to_tack DOUBLE,
  time_to_gybe DOUBLE,
  time_to_turn DOUBLE,
  cumulative_VMG DOUBLE,
  leg STRING,
  Ydist DOUBLE,
  race STRING,
  opponent STRING,
  event_time AS `time`,
  processing_time AS PROCTIME(),
  WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'boat_data',
  'properties.bootstrap.servers' = 'kafka:19091',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',
  'json.timestamp-format.standard' = 'ISO-8601'
);


CREATE TABLE Navigation (
  boat STRING,
  window_start TIMESTAMP_LTZ(3),
  window_end TIMESTAMP_LTZ(3),
  avg_speed DOUBLE,
  avg_vmg DOUBLE,
  avg_heading DOUBLE,
  avg_true_wind_speed DOUBLE,
  avg_true_wind_direction DOUBLE,
  is_foiling BIGINT,
  leg BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'boat_data_navigation',
  'properties.bootstrap.servers' = 'kafka:19091',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
);

INSERT INTO Navigation
SELECT
  boat,
  TUMBLE_START(`time`, INTERVAL '1' SECOND) AS window_start,
  TUMBLE_END(`time`, INTERVAL '1' SECOND) AS window_end,
  AVG(speed_over_ground_value) AS avg_speed,
  AVG(VMG) AS avg_vmg,
  AVG(heading_value) AS avg_heading,
  AVG(true_wind_speed_value) AS avg_true_wind_speed,
  AVG(true_wind_direction_value) AS avg_true_wind_direction,
  MAX(
  CASE
    WHEN LOWER(foiling) = 'foiling' THEN 1
    ELSE 0
  END) AS is_foiling,
  MAX(
  CASE leg
  WHEN 'prestart' THEN 0
  WHEN 'up1' THEN 1
  WHEN 'down1' THEN 2
  WHEN 'up2' THEN 3
  WHEN 'down2' THEN 4
  WHEN 'up3' THEN 5
  WHEN 'down3' THEN 6
  WHEN 'up4' THEN 7
  WHEN 'down4' THEN 8
  WHEN 'up5' THEN 9
  WHEN 'postrace' THEN 10
  ELSE -1
END) AS leg_number
FROM KafkaBoatData
WHERE
  boat IS NOT NULL AND boat <> ''
  AND speed_over_ground_value BETWEEN 0 AND 60
  AND heading_value BETWEEN 0 AND 360
  AND true_wind_speed_value BETWEEN 0 AND 60
  AND true_wind_direction_value BETWEEN 0 AND 360
GROUP BY
  boat,
  TUMBLE(`time`, INTERVAL '1' SECOND);

"""