# Continuously downsample data using InfluxDB and Quix Streams
This notebook allows you to run a real time data processing pipeline that downsamples data sourced from InfluxDB then writes it back in again.

* The data processing uses the [Quix Streams library](https://github.com/quixio/quix-streams) as the processing engine
* Quix Streams is an open source Python library for building containerized stream processing applications with Apache Kafka.
* Quix Streams is designed to run continuously on a stream of data, so to run it in Google Colab, we'll run the processes in the background.

<br>![downsample-2024-02-19-2312.png](quix-pipeline-downsample.png)

## Notebook sections:

1. **Install the dependencies**
2. **Generate synthetic machine data (optional)**
3. **Read the raw data from InfluxDB**
4. <font color="blue">**Downsample the data**</font>
5. **Insert the downsampled data back into InfluxDB**

**Note:**
* If you just want to see the downsampling code, head straight to **Section 4**—the downsampling section.

* If you want to run this kind of pipeline in production, check out [Quix Cloud](https://quix.io/product) where you can run Quix Streams processes in a scalable, fault-tolerant way.

## Setup

Install the Quixstreams, the InfluxDB client and Apache Kafka, then start the Kafka servers.

### 1. Install the main dependencies

Dependencies include:the Quix Streams library, Qdrant library, and the sentence transformers library (we'll use the default sentence transformers embedding model).

In [None]:
!pip install quixstreams==2.3.1 influxdb3-python faker

### 3. Download and setup Kafka and Zookeeper instances

Using the default configurations (provided by Apache Kafka) for spinning up the instances.

In [None]:
!curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
!tar -xzf kafka_2.13-3.6.1.tgz

In [None]:
!./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties
!./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

### 4. Check that the Kafka Daemons are running

Show the running daemon processes by filtering the list for the keyword "java" while excluding the grep process itself

In [None]:
!ps aux | grep -E '[j]ava'

### 5. Clone the tutorial repository

Get the files you'll need to run in this notebook.

In [None]:
!git clone https://github.com/quixio/template-influxdbv3-downsampling.git
os.chdir("template-influxdbv3-downsampling")
!git checkout dev

### 6. Rename all the files so that we can distinguish them better

This step is just for running the tutorial in Colab. Since we will be running multiple processes in the background, they are harder to manage when they are all called `main.py`. We want to be able to look at the running processes and stop any that are misbehaving.

In [None]:
# If "main.py" exists, rename it so that we can better identify it in Google Colab's process list:
if os.path.exists('/content/template-influxdbv3-downsampling/Machine data to InfluxDB/main.py'):
    os.rename(
        '/content/template-influxdbv3-downsampling/Machine data to InfluxDB/main.py',
        '/content/template-influxdbv3-downsampling/Machine data to InfluxDB/machine_generator_main.py'
        )

if os.path.exists('/content/template-influxdbv3-downsampling/InfluxDB V3 Data Source/main.py'):
    os.rename(
        '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Source/main.py',
        '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Source/influx_reader_main.py'
        )

if os.path.exists('/content/template-influxdbv3-downsampling/Downsampler/downsampler_main.py'):
    os.rename(
        '/content/template-influxdbv3-downsampling/Downsampler/main.py',
        '/content/template-influxdbv3-downsampling/Downsampler/downsampler_main.py'
        )

if os.path.exists('/content/template-influxdbv3-downsampling/InfluxDB V3 Data Sink/main.py'):
    os.rename(
        '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Sink/main.py',
        '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Sink/influxwriter_main.py'
        )

### 7. Set some environment variables

Set variables required by the different processes that you will run.

In [None]:
import os

# Variables needed only in this notebook
source_db_name = "machine-generator-data"
dest_db_name = "machine-generator-downsampled"
raw_data_topic_name = source_db_name + "_topic"
downsampled_data_topic_name = dest_db_name + "_topic"

# Kafka-related variables
os.environ['BROKER_ADDRESS'] = "localhost:9092"
os.environ['CONSUMER_GROUP_NAME'] = "influx-raw-data-consumer"

# Application-specific variables
os.environ['task_interval'] = "30s"
os.environ['target_field'] = "temperature"

# InfluxDB related variables
os.environ['INFLUXDB_ORG'] = "ContentSquad"
os.environ['INFLUXDB_HOST'] = "https://us-east-1-1.aws.cloud2.influxdata.com/"
os.environ['INFLUXDB_TAG_KEYS'] = "['machineID','barcode','provider']"
os.environ['INFLUXDB_FIELD_KEYS'] = "['temperature','load','power','vibration']"


Set your secret InfluxDB token in a way that does not expose it to Google Colab

In [None]:
from getpass import getpass
os.environ['INFLUXDB_TOKEN'] = getpass('Enter your InfluxDB Token: ')


Enter your InfluxDB Token: ··········


# 1. Generate synthetic machine data (Optional)

This step generates some artificial machine sensor data and writes it into your own InfluxDB database.

* Follow this step if you don't have existing data that you want to downsample, or want to work with test data first.
* You can find the source code at the bottom of this section or [in GitHub](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Machine%20data%20to%20InfluxDB/main.py)

## Run the code

 1. Define the InfluxDB database and measurement that should store the raw data from the machine generator.

In [None]:
os.environ['INFLUXDB_DATABASE'] = source_db_name # 'source_db_name' is defined further up in the setup section
os.environ['INFLUXDB_MEASUREMENT_NAME'] = "machine-data-colab"

 2. Run the machine generator process in the background (so that we can run subsequent cells)

In [None]:
!nohup python -u '/content/template-influxdbv3-downsampling/Machine data to InfluxDB/machine_generator_main.py' > machine_generator.log 2>&1 &

 3. Confirm that the process is running:

In [None]:
!ps auxww | grep "[m]achine_generator_main"

 4. Check the process logs to make sure there were no errors.

In [None]:
!cat '/content/template-influxdbv3-downsampling/Machine data to InfluxDB/machine_generator.log'


 5. (Optional) If there's a problem with the process, kill it by searching for it by name<br>(i.e. in case that you have decided that it has generated enough data)

In [None]:
!pgrep -f '[m]achine_generator_main' | grep -v grep | wc -l | xargs -I {} bash -c 'if [ {} -eq 1 ]; then pgrep -f "[m]achine_generator_main" | xargs kill; fi'
print(f"Killed the 'machine_generator_main.py' process")

### See the full machine generator source code

Expand this section to see the full source code of the machine generator:

---



In [None]:

# Create a InfluxDBClient3 Application
write_client = InfluxDBClient3(host=os.environ['INFLUXDB_HOST'], token=os.environ['INFLUXDB_TOKEN'], org=os.environ['INFLUXDB_ORG'])

fake = Faker()

machine_id_counter = 1
# Start timestamp
start_time = datetime.datetime.now()

# Current time for the loop
measurement_name = os.environ['INFLUXDB_MEASUREMENT_NAME']
database = os.environ["INFLUXDB_DATABASE"]

class machine():
    def __init__(self) -> None:
        global machine_id_counter
        self.machine_id = "machine" + str(machine_id_counter)
        machine_id_counter += 1
        self.temperature = 0
        self.load = 0
        self.power = 0
        self.vibration = 0
        self.barcode = fake.ean(length=8)
        self.provider = fake.company()
        self.fault = False
        self.previous_fault_state = False

    def toggle_fault(self):
        self.previous_fault_state = self.fault
        self.fault = not self.fault

    def returnMachineID(self):
        return self.machine_id

    def returnTemperature(self):
        currentLoad = self.load
        if currentLoad >= 190:
            self.temperature = randint(95, 120)
        elif currentLoad > 110:
            self.temperature = randint(80, 90)
        elif currentLoad >= 40:
            self.temperature = randint(35, 40)
        elif currentLoad > 0:
            self.temperature = randint(29, 34)
        else:
            self.temperature = 20
        return self.temperature

    def setLoad(self, load):
        # TODO dont randomise
        self.load = load

    def returnPower(self):
        currentLoad = self.load
        if currentLoad >= 190:
            self.power = randint(400, 500)
        elif currentLoad > 110:
            self.power = randint(300, 320)
        elif currentLoad >= 40:
            self.power = randint(200, 220)
        elif currentLoad == 0:
            self.power = 0
        else:
            self.power = randint(180, 199)

        return self.power

    def returnVibration(self):
        currentLoad = self.load
        if currentLoad >= 190:
            self.vibration = randint(500, 600)
        elif currentLoad > 110:
            self.vibration = randint(300, 500)
        elif currentLoad == 0:
            self.vibration = 0
        elif currentLoad >= 40:
            self.vibration = randint(80, 90)
        else:
            self.vibration = randint(50, 79)
        return self.vibration

    def returnMachineHealth(self):
        # trigger load first as needs to be constent:
        return {"metadata": {"machineID": self.returnMachineID(),
                             "barcode": self.barcode, "provider": self.provider},
                "data": [{"temperature": self.returnTemperature()},
                         {"load": self.load},
                         {"power": self.returnPower()},
                         {"vibration": self.returnVibration()}]}


def runMachine(m):
    counter = 0
    counter2 = 0

    sleeptime = 0.25
    m.setLoad(randint(10, 50))
    increasing = True

    while True:

        # Check if fault state has changed from True to False
        if m.previous_fault_state and not m.fault:
            m.setLoad(50)
            m.previous_fault_state = False

        # Chance of fault
        if m.fault:
            if counter2 == 5:
                current_load = m.load
                if current_load < 200:
                    new_load = min(current_load + 20, 200)
                    m.setLoad(new_load)
                counter2 = 0
        else:
            if counter2 == 5:
                # Gradually change load between 50 and 99
                current_load = m.load
                if increasing:
                    new_load = current_load + 5
                    if new_load >= 99:
                        increasing = False
                else:
                    new_load = current_load - 5
                    if new_load <= 50:
                        increasing = True

                m.setLoad(new_load)
                counter2 = 0

        check_machine = m.returnMachineHealth()
        timestamp_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

        point = {
            "measurement": measurement_name,
            "tags": {
                'machineId': check_machine['metadata']['machineID'],
                'barcode': check_machine['metadata']['barcode'],
                'provider': check_machine['metadata']['provider'],
            },
            "fields": {
                'temperature': check_machine['data'][0]['temperature'],
                'load': check_machine['data'][1]['load'],
                'power': check_machine['data'][2]['power'],
                'vibration': check_machine['data'][3]['vibration'],
            },
            "time": timestamp_str
        }

        write_client.write(database=database, record=point)

        print(f"Wrote point {point}")

        sleep(sleeptime)
        counter = counter + 1
        counter2 = counter2 + 1


def main():
    # Initialize a machine instance
    m = machine()

    # Start running the machine to produce data
    runMachine(m)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("Exiting.")



<br><br><br>

# 2. Read the raw data from InfluxDB




In this step, you will run a process that reads the raw data from your source InfluxDB database and sends it to a Kafka topic for the downsampling process to consume.

* You can find the source code at the bottom of this section or [in GitHub](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/InfluxDB%20V3%20Data%20Source/main.py)

## Summary of what this code does

First it uses the Quix library's `Application` class to initialize a connection to the local Apache Kafka.

```python
...
app = Application(
  broker_address=os.environ.get('BROKER_ADDRESS','localhost:9092'),
  consumer_group=consumer_group_name,
  auto_create_topics=True
)
...
```

It then uses the InfluxDB clients query feature to get data from the source database.

```python
...
myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
table = influxdb3_client.query(
                        query=myquery,
                        mode="pandas",
                        language="influxql")
...
```

This query runs at regular intervals, as configured in the `task_interval` setting. This setting defines both how often the query should be run agains the database and the max age of the data to return.

Finally, the results of the query are sent to Kafka using the Quixstreams `get_producer()` method.

```python
...
message_key = obj['machineId']
logger.info(f"Produced message with key:{message_key}, value:{obj}")

serialized = topic.serialize(
    key=message_key, value=obj, headers={"uuid": str(uuid.uuid4())}
    )

# publish the data to the topic
producer.produce(
    topic=topic.name,
    headers=serialized.headers,
    key=serialized.key,
    value=serialized.value,
)
...
```


 ## Run the code

 1. Define the name of the Kafka topic that should receive the raw data and the measurement to use in the query.

In [None]:
os.environ['output'] = raw_data_topic_name
os.environ['localdev'] = "true" # Set this so that Quixstreams looks for a local Kafka
os.environ['INFLUXDB_MEASUREMENT_NAME'] = "machine-data-colab" # Change this if you already have another measurement that you want to downsample

 2. Run the influx reader process in the background (so that we can run subsequent cells)

In [None]:
!nohup python -u '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Source/influx_reader_main.py' > '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Source/influx_reader.log' 2>&1 &

 3. Confirm that the process is running:

In [None]:
!ps auxww | grep '[i]nflux_reader_main'

root       43598  8.1  1.2 1581092 164060 ?      Sl   11:31   0:03 python3 -u influx_consumer_main.p


 4. Check the process logs to make sure there were no errors.

In [None]:
!cat '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Source/influx_reader.log'

 5. (Optional) If there's a problem with the process, kill it by searching for it by name<br>(i.e. in case it has a problem but does not terminate by default)

In [None]:
!pgrep -f '[i]nflux_reader_main' | grep -v grep | wc -l | xargs -I {} bash -c 'if [ {} -eq 1 ]; then pgrep -f "[i]nflux_reader_main" | xargs kill; fi'
print(f"Killed the 'influx_reader_main' process")

### See the full InfluxDB reader source code

In [None]:
from random import randint

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create an Application
app = Application.Quix(consumer_group="influxdbv3_source", auto_create_topics=True)

# Define a serializer for messages, using JSON Serializer for ease
serializer = JSONSerializer()

# Define the topic using the "output" environment variable
topic_name = os.environ["output"]
topic = app.topic(name=topic_name, value_serializer="json")

influxdb3_client = InfluxDBClient3.InfluxDBClient3(token=os.environ["INFLUXDB_TOKEN"],
                         host=os.environ["INFLUXDB_HOST"],
                         org=os.environ["INFLUXDB_ORG"],
                         database=os.environ["INFLUXDB_DATABASE"])

measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", os.environ["output"])
interval = os.environ.get("task_interval", "5m")

# should the main loop run?
# Global variable to control the main loop's execution
run = True

# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing.
# This function is useful for determining the frequency of certain operations.
UNIT_SECONDS = {
    "s": 1,
    "m": 60,
    "h": 3600,
    "d": 86400,
    "w": 604800,
    "y": 31536000,
}

def interval_to_seconds(interval: str) -> int:
    try:
        return int(interval[:-1]) * UNIT_SECONDS[interval[-1]]
    except ValueError as e:
        if "invalid literal" in str(e):
            raise ValueError(
                "interval format is {int}{unit} i.e. '10h'; "
                f"valid units: {list(UNIT_SECONDS.keys())}")
    except KeyError:
        raise ValueError(
            f"Unknown interval unit: {interval[-1]}; "
            f"valid units: {list(UNIT_SECONDS.keys())}")

interval_seconds = interval_to_seconds(interval)

# Function to fetch data from InfluxDB and send it to Quix
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
    # Run in a loop until the main thread is terminated
    while run:
        try:
            myquery = f'SELECT * FROM "{measurement_name}" WHERE time >= {interval}'
            print(f"sending query {myquery}")
            # Query InfluxDB 3.0 using influxql or sql
            table = influxdb3_client.query(
                                    query=myquery,
                                    mode="pandas",
                                    language="influxql")

            table = table.drop(columns=["iox::measurement"])
            table.rename(columns={'time': 'time_recorded'}, inplace=True)
            # If there are rows to write to the stream at this time
            if not table.empty:
                json_result = table.to_json(orient='records', date_format='iso')
                yield json_result
                print("query success")
            else:
                print("No new data to publish.")

            # Wait for the next interval
            sleep(interval_seconds)

        except Exception as e:
            print("query failed", flush=True)
            print(f"error: {e}", flush=True)
            sleep(1)

def main():
    """
    Read data from the Query and publish it to Kafka
    """

    # Create a pre-configured Producer object.
    # Producer is already setup to use Quix brokers.
    # It will also ensure that the topics exist before producing to them if
    # Application.Quix is initialized with "auto_create_topics=True".

    with app.get_producer() as producer:
        for res in get_data():
            # Parse the JSON string into a Python object
            records = json.loads(res)
            for index, obj in enumerate(records):
                print(obj)
                # Generate a unique message_key for each row
                message_key = obj['machineId']
                logger.info(f"Produced message with key:{message_key}, value:{obj}")

                serialized = topic.serialize(
                    key=message_key, value=obj, headers={"uuid": str(uuid.uuid4())}
                    )

                # publish the data to the topic
                producer.produce(
                    topic=topic.name,
                    headers=serialized.headers,
                    key=serialized.key,
                    value=serialized.value,
                )

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("Exiting.")

# 3. Downsample the data

In this step, you will run a process that reads the raw data from the input Kafka, downsamples it, the sends it to an output topic.

This is where the Quixstreams library excels—processing high volumes of data in real time.

* You can find the source code at the bottom of this section or [in GitHub](https://github.com/quixio/template-influxdbv3-downsampling/blob/dev/Downsampler/main.py)

## Summary of what this code does

As before, it uses the Quixstreams library's `Application` class to initialize a connection to our local Apache Kafka.

It then uses the in-built windowing feature to create a tumbling window to downsample the data into 1-minute buckets.

```python
...
target_field = os.environ['target_field']  # By default this is "temperature"

def custom_ts_extractor(value):
    # ... custom code that defines the 'time_recorded' field as the timestamp to use for windowing...

topic = app.topic(input_topic, timestamp_extractor=custom_ts_extractor)

sdf = (
    sdf.apply(lambda value: value[target_field])  # Extract temperature values
    .tumbling_window(timedelta(minutes=1))   # 1-minute tumbling windows
    .mean()                                  # Calculate average temperature
    .final()                                 # Emit results at window completion
)

sdf = sdf.apply(
    lambda value: {
        "time": value["end"],                  # End of the window
        "temperature_avg": value["value"],     # Average temperature
    }
)

sdf.to_topic(output_topic)
...
```
Explanation:
* **custom_ts_extractor**: ensures the downsampling uses the `time_recorded` field in the data so that we  have more control over the behaviour of time-based windows (otherwise it uses the Kafka message timestamp).
* **tumbling_window**: ensures that the data is grouped into non-overlapping 1-minute windows.
* **Aggregation—mean()**: Calculates the average temperature within each window.
* **Output Formatting**: Transforms the result into a format with time (window end) and the final windowed value `temperature_avg`.
* **Sending to Kafka**: `sdf.to_topic(output_topic)` sends downsampled results to the Kafka topic defined in the `output_topic` variable.

Note: "sdf" stands for "Streaming Dataframe".


 ## Run the code

 1. Define the name of the Kafka topic that should receive the downsampled data.

In [None]:
os.environ['input'] = raw_data_topic_name
os.environ['output'] = downsampled_data_topic_name

 2. Run the downsampling process in the background (so that we can run subsequent cells)

In [None]:
!nohup python -u '/content/template-influxdbv3-downsampling/Downsampler/downsampler_main.py' > '/content/template-influxdbv3-downsampling/Downsampler/downsampler.log' 2>&1 &

 3. Confirm that the process is running:

In [None]:
!ps auxww | grep '[d]ownsampler_main'

root       11740  3.0  0.3 1229080 47644 ?       Sl   09:19   0:01 python3 -u downsampler_main.py


 4. Check the process logs to make sure there were no errors.

In [None]:
!cat '/content/template-influxdbv3-downsampling/Downsampler/downsampler.log'

 5. (Optional) If there's a problem with the process, kill it by searching for it by name.<br>(i.e. in case it has a problem but does not terminate by default)

In [None]:
!pgrep -f '[d]ownsampler_main.py' | grep -v grep | wc -l | xargs -I {} bash -c 'if [ {} -eq 1 ]; then pgrep -f "[d]ownsampler_main.py" | xargs kill; fi'
print(f"Killed the 'downsampler_main.py' process")

### See the full downsampler source code

In [None]:
from quixstreams import Application
from quixstreams.models.serializers.quix import JSONDeserializer, JSONSerializer

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Application.Quix(consumer_group="downsampling-consumer-groupv6", auto_offset_reset="earliest")
input_topic = app.topic(os.environ["input"], value_deserializer=JSONDeserializer())
output_topic = app.topic(os.environ["output"], value_serializer=JSONSerializer())

data_key = os.environ["data_key"]
logger.info(f"Data key is: {data_key}")

sdf = app.dataframe(input_topic)
sdf = sdf.update(lambda value: logger.info(f"Input value received: {value}"))

def custom_ts_extractor(value):
    """
    Specifying a custom timestamp extractor to use the timestamp from the message payload
    instead of Kafka timestamp.
    """
    # Convert to a datetime object
    dt_obj = datetime.strptime(value["time_recorded"], "%Y-%m-%dT%H:%M:%S.%f")

    # Convert to milliseconds since the Unix epoch
    milliseconds = int(dt_obj.timestamp() * 1000)
    value["timestamp"] = milliseconds
    logger.info(f"Value of new timestamp is: {value['timestamp']}")
    return value["timestamp"]

# Passing the timestamp extractor to the topic.

# The window functions will now use the extracted timestamp instead of the Kafka timestamp.
topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)

sdf = (
    # Extract the relevant field from the record
    sdf.apply(lambda value: value[data_key])

    # Define a tumbling window of 1 minute
    .tumbling_window(timedelta(minutes=1))

    # Specify the "mean" aggregation function to apply to values of the data key
    .mean()

    # Emit results only when the 1 minute window has elapsed
    .final()
    #.current() #for debug purposes.
)

sdf = sdf.apply(
    lambda value: {
        "time": value["end"],
        f"{data_key}": value["value"],
    }
)

# Produce the result to the output topic
sdf = sdf.to_topic(output_topic)
sdf = sdf.update(lambda value: logger.info(f"Produced value: {value}"))

if __name__ == "__main__":
    logger.info("Starting application")
    app.run(sdf)

# 4. Insert the downsampled data back into InfluxDB

 1. Define the name of the Kafka topic from which to read the downsampled data.<br>Also:
  * Set the name of the destination bucket.
  * Set the name of the measurement to use in the destination bucket.

In [None]:
os.environ['input'] = downsampled_data_topic_name
os.environ['INFLUXDB_DATABASE'] = dest_db_name
os.environ['INFLUXDB_MEASUREMENT_NAME'] = "machine-data-downsampled-colab"

2. Run the InfluxDB write process in the background (so that we can run subsequent cells)

In [None]:
!nohup python -u '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Sink/influxwriter_main.py' > '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Sink/influxwriter.log' 2>&1 &

3. Confirm that the process is running:


In [None]:
!ps aux | grep '[i]nfluxwriter_main'

4. Check the process logs to make sure there were no errors.

In [None]:
!cat '/content/template-influxdbv3-downsampling/InfluxDB V3 Data Sink/influxwriter.log'

5. (optional) If there's a problem with the process, kill it by searching for it by name:

In [None]:
!pgrep -f '[i]nfluxwriter_main' | grep -v grep | wc -l | xargs -I {} bash -c 'if [ {} -eq 1 ]; then pgrep -f "[i]nfluxwriter_main" | xargs kill; fi'

## See the full InfluxDB writer source code

In [None]:
# import vendor-specific modules
from quixstreams import Application
from quixstreams.models.serializers.quix import JSONDeserializer
from influxdb_client_3 import InfluxDBClient3

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
consumer_group_name = os.environ.get('CONSUMER_GROUP_NAME', "influxdb-data-writer")

app = Application.Quix(consumer_group=consumer_group_name,
                       auto_offset_reset="earliest")

input_topic = app.topic(os.environ["input"], value_deserializer=JSONDeserializer())


# Read the environment variable and convert it to a dictionary
tag_keys = ast.literal_eval(os.environ.get('INFLUXDB_TAG_KEYS', "[]"))
field_keys = ast.literal_eval(os.environ.get('INFLUXDB_FIELD_KEYS', "[]"))


# Read the environment variable for the field(s) to get.
# For multiple fields, use a list "['field1','field2']"

influx3_client = InfluxDBClient3(token=os.environ["INFLUXDB_TOKEN"],
                         host=os.environ["INFLUXDB_HOST"],
                         org=os.environ["INFLUXDB_ORG"],
                         database=os.environ["INFLUXDB_DATABASE"])

def send_data_to_influx(message):
    logger.info(f"Processing message: {message}")
    try:
        # Uses the current time as the timestamp for writing to the sink
        # Adjust to use an alternative timestamp if necesssary,

        writetime = datetime.datetime.utcnow()
        writetime = writetime.isoformat(timespec='milliseconds') + 'Z'

        measurement_name = os.environ.get('INFLUXDB_MEASUREMENT_NAME', "measurement1")

        # Initialize the tags and fields dictionaries
        tags = {}
        fields = {}

        # Iterate over the tag_dict and field_dict to populate tags and fields
        for tag_key in tag_keys:
            if tag_key in message:
                tags[tag_key] = message[tag_key]

        for field_key in field_keys:
            if field_key in message:
                fields[field_key] = message[field_key]

        logger.info(f"Using tag keys: {', '.join(tags.keys())}")
        logger.info(f"Using field keys: {', '.join(fields.keys())}")

        # Construct the points dictionary
        points = {
            "measurement": measurement_name,
            "tags": tags,
            "fields": fields,
            "time": message['time']
        }

        influx3_client.write(record=points, write_precision="ms")

        print(f"{str(datetime.datetime.utcnow())}: Persisted ponts to influx: {points}")
    except Exception as e:
        print(f"{str(datetime.datetime.utcnow())}: Write failed")
        print(e)

sdf = app.dataframe(input_topic)
sdf = sdf.update(send_data_to_influx)

if __name__ == "__main__":
    print("Starting application")
    app.run(sdf)

### Run a test query on the destination

Use Qdrant to do a basic similarity seach to make sure the vectors have been ingested properly and are matching in the expected way.

In [None]:
query = "books like star wars" # Leave the test query as-is for the first attempt

hits = qdrant.search(
    collection_name=collectionname,
    query_vector=encoder.encode(query).tolist(),
    limit=10
)

print("Entries matching your query:")
for hit in hits:
  print(hit.payload['doc_name'], " | ", hit.payload['doc_descr'], "score:", hit.score)

Entries matching your query:
Old Man's War  |  Earth's senior citizens are recruited to fight in an interstellar war, discovering new alien cultures and threats. score: 0.29142217809948834
Dune  |  A desert planet is the site of political intrigue and power struggles. score: 0.2174839673349857
Footfall  |  Elephant-like aliens invade Earth, and humanity must find a way to fight back. score: 0.18069611904932592
Foundation  |  A mathematician develops a science to predict the future of humanity and works to save civilization from collapse. score: 0.16798619628421962
Contact  |  Scientists receive a message from extraterrestrial beings and build a machine to meet them. score: 0.1639943016350539
The Forge of God  |  Aliens arrive under the guise of friendship, but their true mission is to destroy Earth. score: 0.163921273441558
The Hunger Games  |  A dystopian society where teenagers are forced to fight to the death in a televised spectacle. score: 0.16221441071437032
Childhood's End  |  A

If everything went to plan, "*Dune*" should be top match for the query "*books like star wars*". This makes sense, since Dune is kind of like Star Wars (depending on who you ask). We can guess it matched because planet" is semantically close to "star" and "struggles" is semantically close to "wars".

Now let's suppose we update our catalog to with more books to acommodate all those who are looking for similar items. We want the vector store to be updated as soon as the new book entries are entered in the main catalog database. This will ensure we get as many good matches (and hopefully purchases) as possible without any delays.