<a href="https://colab.research.google.com/github/merlin-quix/tutorial-code/blob/main/tutorial_reprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. Install quixstreams, clone the tutorial repo and switch to the draft branch.

In [None]:
import os
!pip install quixstreams
!git clone https://github.com/quixio/tutorial-code.git
os.chdir('tutorial-code')
!git checkout tutorial/reprocess-events
os.chdir('reprocess-events')

2. Import the libraries, read a CSV of raw GPS Trackpoints and send them to a Kafka topic

In [None]:
import pandas as pd
import time
import datetime as dt
import quixstreams as qx

# Initialize the Quix Streams client for LOCAL Kafka
# client = qx.KafkaStreamingClient('127.0.0.1:9092')

# Initialize the Quix Streams client for Quix.io Kafka
token = 'sdk-76d8bae1702444fd80232804644ba544'
client = qx.QuixStreamingClient(token)
client

# Initialize the destination topic
print("Initializing topic")
topic_producer = client.get_topic_producer('raw-trackpoints')
output_stream = topic_producer.create_stream()

print(f'Initialized Quix Streams client at {dt.datetime.utcnow()}')

# Read in the CSV file
df = pd.read_csv("/content/tutorial-code/reprocess-events/go_track_trackspoints.csv")

for i in range(len(df)):
    # Create small data frame for each message
    df_r = df.iloc[[i]]

    # Print the message so you can see what is being sent
    print("Sending Message: \n", df_r.to_markdown())

    # Send the data with the Quix Streams client
    output_stream.timeseries.publish(df_r)

    # Optionally wait for half a second to slow down the stream
    # so that we can see what is happening.
    time.sleep(0.5)

3. Consume messages from the raw trackpoints topic and calculate distance travelled and write it to a new downstream topic

In [None]:
from geopy.distance import distance
from collections import deque

#1 — Initialize the Quix Streams client (for standalone Kafka)
# client = qx.KafkaStreamingClient('127.0.0.1:9092')

#2 — Initialize a Quix Streams consumer to read from the predictions topic (with some extra commit settings)
commit_settings = qx.CommitOptions()
commit_settings.auto_commit_enabled = False
topic_consumer = client.get_topic_consumer("raw-trackpoints", commit_settings=commit_settings,auto_offset_reset=qx.AutoOffsetReset.Earliest)

#3 — Initialize a Quix Streams producer for sending predictions to the predictions topic
print("Initializing producer...")
topic_producer = client.get_topic_producer('distance-calcs')
output_stream = topic_producer.create_stream()

print(f"Initialized Kafka producer at {dt.datetime.utcnow()}")

# Initialize variables for calculating the total distance traveled
total_distance = 0.0
last_location = None
location_buffer = deque(maxlen=10)

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
    global total_distance
    global last_location
    global location_buffer
    # Log the receipt of the message in a human-readable format\
    # print("Data received: \n", df.to_markdown(), "\n\n")

    # Extract the latitude and longitude coordinates from the message
    latitude = df["latitude"]
    longitude = df["longitude"]
    location = (float(latitude), float(longitude))

    # Add the current location to the buffer
    location_buffer.append(location)

    # Calculate the distance between the current location and the last location, and update the total distance
    if last_location is not None and len(location_buffer) == location_buffer.maxlen:
        distance_traveled = distance(location_buffer[0], location_buffer[-1]).kilometers
        total_distance += distance_traveled

    # Update the last location for the next iteration
    last_location = location_buffer[-1]

    # Send the timeseries data using the built-in properties
    data = qx.TimeseriesData()
    data.add_timestamp(dt.datetime.utcnow()) \
        .add_value("distance", float(total_distance)) \
        .add_value("track_id", int(df['track_id']))

    output_stream.timeseries.publish(data)
    # Print the current total distance traveled
    print(f'Sending results: | timestamp: [{dt.datetime.utcnow()}] | total distance so far: [{total_distance}] | device id: [{df["track_id"]}]  |')


def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
    # Subscribe to new DataFrames being received
    stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

# Subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler

print("Listening to streams. Press CTRL-C to exit.")

# Handle termination signals and provide a graceful exit
qx.App.run()

4a. Consume the distance calculations and incrementally update my PostGres DB (in Railway) with the totals for each device.

Railyway PostGres instance URL: https://railway.app/project/856071f2-e806-4a76-a138-d7d179d2e1e8/plugin/957afb6c-cbcc-499d-92c7-1c039c255702/Data

In [None]:
import psycopg2

# Create a connection to the downstream database
conn = psycopg2.connect(
    host='containers-us-west-204.railway.app',
    port=6307,
    dbname='railway',
    user='postgres',
    password='pCnASnXCIXtP35iCOf1g'
)

# Create a cursor object to execute SQL statements
cur = conn.cursor()

#2 — Initialize a Quix Streams consumer to read from the predictions topic (with some extra commit settings)
commit_settings = qx.CommitOptions()
commit_settings.auto_commit_enabled = False
topic_consumer = client.get_topic_consumer("distance-calcs", commit_settings=commit_settings,auto_offset_reset=qx.AutoOffsetReset.Earliest)

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
    # Log the prediction in a human-readable format
    print("Distance calc received: \n", df.to_markdown(), "\n\n")
    print("Updating DB...")
    cur.execute(f"SELECT * FROM dist_calc WHERE tracking_id = {int(df['track_id'])};")
    existing_record = cur.fetchone()

    # Check if a record with the ID "123" already exists
    if existing_record is None:
        # Create a new record
        print(f"Attempting to INSERT INTO dist_calc (tracking_id, distance) VALUES ({int(df['track_id'])}, {float(df['distance'])});")
        cur.execute(f"INSERT INTO dist_calc (tracking_id, distance) VALUES ({int(df['track_id'])}, {float(df['distance'])});")
    else:
        # Update the existing record
        print(f"Attempting to UPDATE dist_calc SET distance = {float(df['distance'])} WHERE tracking_id = {int(df['track_id'])}")
        cur.execute(f"UPDATE dist_calc SET distance = {float(df['distance'])} WHERE tracking_id = {int(df['track_id'])}")
    conn.commit()

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
    # Subscribe to new DataFrames being received
    stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

# Subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler

print("Listening to streams. Press CTRL-C to exit.")

# Handle termination signals and provide a graceful exit
qx.App.run()

4b. Consume the distance calculations and incrementally update SQllite DB (in Railway) !!Running into threading errors

In [None]:
import sqlite3
import pandas as pd
import time
import datetime as dt
import quixstreams as qx
import threading

conn = sqlite3.connect('distance-calcs.db')

# Initialize the Quix Streams client for LOCAL Kafka
# client = qx.KafkaStreamingClient('127.0.0.1:9092')

# Initialize the Quix Streams client for Quix.io Kafka
token = 'sdk-76d8bae1702444fd80232804644ba544'
client = qx.QuixStreamingClient(token)
client

cur = conn.cursor()

# create a new table if it doesn't already exist.
# cur.execute('''CREATE TABLE IF NOT EXISTS dist_calc
#                 (track_id INTEGER PRIMARY KEY, distance REAL)''')

#2 — Initialize a Quix Streams consumer to read from the predictions topic (with some extra commit settings)
commit_settings = qx.CommitOptions()
commit_settings.auto_commit_enabled = False
topic_consumer = client.get_topic_consumer("distance-calcs", commit_settings=commit_settings,auto_offset_reset=qx.AutoOffsetReset.Earliest)

def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
    # Log the prediction in a human-readable format
    print("Distance calc received: \n", df.to_markdown(), "\n\n")
    print("Updating DB...")
    cur.execute(f"SELECT * FROM dist_calc WHERE tracking_id = {int(df['track_id'])};")
    existing_record = cur.fetchone()

    # Check if a record with the ID "123" already exists
    if existing_record is None:
        # Create a new record
        print(f"Attempting to INSERT INTO dist_calc (tracking_id, distance) VALUES ({int(df['track_id'])}, {float(df['distance'])});")
        cur.execute(f"INSERT INTO dist_calc (tracking_id, distance) VALUES ({int(df['track_id'])}, {float(df['distance'])});")
    else:
        # Update the existing record
        print(f"Attempting to UPDATE dist_calc SET distance = {float(df['distance'])} WHERE tracking_id = {int(df['track_id'])}")
        cur.execute(f"UPDATE dist_calc SET distance = {float(df['distance'])} WHERE tracking_id = {int(df['track_id'])}")
    conn.commit()
    conn.close()

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
    # Subscribe to new DataFrames being received
    stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler

# Subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler

print("Listening to streams. Press CTRL-C to exit.")

# Handle termination signals and provide a graceful exit
qx.App.run()