In [1]:
import os
import logging

from pyflink.common import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.common import Types, Row
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.datastream.connectors.kafka import KafkaTopicPartition
import json
from datetime import datetime

RUNTIME_ENV = os.getenv("RUNTIME_ENV", "local")
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092")

In [12]:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

JavaObject id=o149

In [13]:
jar_files = ["flink-sql-connector-kafka-3.2.0-1.18.jar", "postgresql-42.6.0.jar", "flink-connector-jdbc-3.1.2-1.18.jar"]

In [14]:
CURRENT_DIR = os.getcwd()

In [15]:
position_info = Types.ROW_NAMED(['latitude', 'longitude', 'bearing', 'odometer', 'speed'],
                                [Types.FLOAT(), Types.FLOAT(), Types.INT(), Types.INT(), Types.INT()])
vehicle_type_info = Types.ROW_NAMED(['id', 'label', 'license_plate'], 
                                    [Types.STRING(), Types.STRING(), Types.STRING()])
trip_update_trip_info = Types.ROW_NAMED(['trip_id', 'start_time', 'start_date', 'schedule_relationship', 'route_id', 'direction_id'], 
                                        [Types.STRING(), Types.STRING(), Types.STRING(), Types.INT(), Types.STRING(), Types.INT()])
vehicle_info_type = Types.ROW_NAMED(['trip', 'position', 'current_stop_sequence', 'current_status', 'timestamp', 'congestion_level', 'stop_id', 'vehicle', 'occupancy_status', 'occupancy_percentage'],
                                    [trip_update_trip_info, position_info, Types.INT(), Types.INT(), Types.STRING(), Types.INT(), Types.STRING(), vehicle_type_info, Types.INT(), Types.INT()])
vehicle_info_row = Types.ROW_NAMED(['id', 'is_deleted', 'trip_update', 'vehicle', 'alert'], 
                                   [Types.STRING(), Types.BOOLEAN(), Types.STRING(), vehicle_info_type, Types.STRING()])
json_format = JsonRowDeserializationSchema.builder().type_info(vehicle_info_row).build()

In [16]:
jar_paths = tuple(
            [f"file://{os.path.join(CURRENT_DIR, 'Downloads', name)}" for name in jar_files]
        )
logging.info(f"adding local jars - {', '.join(jar_files)}")
env.add_jars(*jar_paths)

In [17]:
partition_set = {
    KafkaTopicPartition("transitStream", 1)
}

flink_simple_json_source = (
        KafkaSource.builder()
        .set_bootstrap_servers(BOOTSTRAP_SERVERS)
        .set_group_id("whatIsThis")
        .set_starting_offsets(KafkaOffsetsInitializer.latest())
        .set_value_only_deserializer(
            json_format
        )
        .set_partitions(partition_set)
        .build()
    )

In [18]:
flink_stream = env.from_source(
        flink_simple_json_source, WatermarkStrategy.no_watermarks(), source_name = "fthusd"
    )

In [19]:
def extract_vehicle_position_info(row):
    vehicle_id = row.vehicle.vehicle.id
    trip_id = row.vehicle.trip.trip_id
    start_date = datetime.strptime(row.vehicle.trip.start_date, "%Y%m%d")
    latitude = row.vehicle.position.latitude
    longitude = row.vehicle.position.longitude
    bearing = row.vehicle.position.bearing
    vehicle_time_updated_at = datetime.fromtimestamp(int(row.vehicle.timestamp))
    occupancy_status = row.vehicle.occupancy_status
    occupancy_percentage = row.vehicle.occupancy_percentage
    vehicle_label = row.vehicle.vehicle.label
    return Row(vehicle_id, vehicle_label, trip_id, start_date, latitude, longitude, bearing, vehicle_time_updated_at, occupancy_status, occupancy_percentage)

In [20]:
row_type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_DATE(), Types.FLOAT(), Types.FLOAT(), Types.INT(), Types.SQL_TIMESTAMP(), Types.INT(), Types.INT()])

flink_stream.map(extract_vehicle_position_info, output_type = row_type_info).add_sink(
    JdbcSink.sink(
        """INSERT INTO public.vehicle_position_updates (vehicle_id, vehicle_label, trip_id, start_date, latitude, longitude, bearing, vehicle_time_updated_at, occupancy_status, occupancy_percentage)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 
            ON CONFLICT (trip_id, start_date) 
            DO UPDATE SET 
            vehicle_id = EXCLUDED.vehicle_id,
            vehicle_label = EXCLUDED.vehicle_label,
            trip_id = EXCLUDED.trip_id,
            start_date = EXCLUDED.start_date,
            latitude = EXCLUDED.latitude,
            longitude = EXCLUDED.longitude,
            bearing = EXCLUDED.bearing,
            updated_at = EXCLUDED.updated_at,
            occupancy_status = EXCLUDED.occupancy_status,
            occupancy_percentage = EXCLUDED.occupancy_percentage
        """,
        row_type_info,
        JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .with_url('jdbc:postgresql://localhost:5432/transitstreamtest')
            .with_driver_name('org.postgresql.Driver')
            .with_user_name('root')
            .with_password('root')
            .build(),
        JdbcExecutionOptions.builder()
            .with_batch_size(50)
            .with_max_retries(0)
            .build()
)
)


<pyflink.datastream.data_stream.DataStreamSink at 0x129df1660>

In [21]:
env.execute("vehicleUpdcsdcates") 
# The program execution will be logged and displayed with the provided name

KeyboardInterrupt: 

In [16]:
env.close()