Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 61 additions & 35 deletions docs/using.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Using Quix Streams

In this topic you will learn how to use Quix Streams to perform two types of data processing:
In the following documentation you will learn how to use Quix Streams to perform two types of data processing:

1. **One message at a time processing** - Here the message received contains all required data for processing. No state needs to be preserved between messages, or between replicas. The data from the message is used to calculate a new value, which is then typically published to the output stream.
1. **Stateless processing** - Here one message is processed at a time, and the message received contains all required data for processing. No state needs to be preserved between messages, or between replicas. The data from the message is used to calculate new information, which is then published to the output stream.
2. **Stateful processing** - This is where you need to keep track of data between messages, such as keeping a running total of a variable. This is more complicated as state needs to be preserved between messages, and potentially between replicas, where multiple replicas are deployed. In addition, state may need to be preserved in the event of the failure of a deployment - Quix Streams supports checkpointing as a way to enable this.

The following sections will explore these methods of data processing in more detail.
Expand All @@ -11,22 +11,26 @@ The following sections will explore these methods of data processing in more det

The main structure used for data organization in Quix is the topic. For example, the topic might be `iot-telemetry`. To allow for horizontal scaling, a topic is typically divided into multiple streams. You may have multiple devices, or sources, writing data into a topic, so to ensure scaling and message ordering, each source writes into its own stream. Device 1 would write to stream 1, and device 2 to stream 2 and so on. This is the idea of [stream context](./features/streaming-context.md).

Quix Streams ensures that stream context is preserved, that is, messages inside one stream are always published to the same single partition. This means that inside one stream, a consumer can rely on the order of messages. A partition can contain multiple streams, but a stream is always confined to one partition.
In some use cases you may want to aggregate data beyond the stream context of a source. This can be done by writing data to new streams in the pipeline, based on a different key. For example, imagine a scenario where invoices are being streamed from stores, and where each stream is based on `StoreId`. Now, let's say you want to calculate totals of a specific item sold across all stores (streams). To do this, you can create a transform that first writes invoices into new streams based on `StockCode`, and then another transform can perform aggregation for each of these `StockCode` streams, in order to calculate how much of each item was sold.

It is possible to organize the code that processes the streams in a topic using the idea of a consumer group. This indicates to the broker that you will process the topic with all available replicas.
Quix Streams ensures that stream context is preserved, that is, messages inside one stream are always published to the same single partition. This means that inside one stream, a consumer can rely on the order of messages. A partition can contain multiple streams, but a stream is always confined to one partition.

Horizontal scaling occurs automatically, because when you deploy multiple replicas, a stream is assigned to a replica. For example, if there are three streams and three replicas, each replica will process a single stream. If you had only one replica, it would need to process all streams in that topic. If you have three streams and two replicas, one replica would process two streams, and the other replica a single stream.
It is possible to organize the code that processes the streams in a topic using the idea of a consumer group. This indicates to the broker that you will process the topic with all available replicas in the consumer group, sharing the processing of all streams in the topic. Horizontal scaling occurs automatically, because when you deploy multiple replicas in a consumer group, a stream (or group of streams) is assigned to a replica. For example, if there are three streams and three replicas, each replica will process a single stream. If you had only one replica, it would need to process all streams in that topic. If you have three streams and two replicas, one replica would process two streams, and the other replica a single stream. If you don't specify a consumer group in your code, then all streams in a topic will be processed by all replicas.

When you create the consumer you specify the consumer group as follows:

```python
topic_consumer = client.get_topic_consumer(os.environ["input"], consumer_group = "empty-transformation")
```

!!! note
Best practice is to make sure the consumer group name matches the name of the service.

!!! warning

If you don't specify a consumer group, then all messages in all streams in a topic will be processed by all replicas in the microservice deployment.

For further information read about how [Quix Streams works with Kafka](kafka.md).

## Stream data formats

There are two main formats of stream data:
Expand All @@ -38,7 +42,7 @@ Event data refers to data that is independent, whereas time-series data is a var

Time-series data is a variable that is tracked over time, such as temperature from a sensor, or the g-forces in a racing car.

Time-series data has three formats in Quix Streams:
Time-series data has three different representations in Quix Streams, to serve different use cases and developers. The underlying data that these three models represent is the same however. The three representations of that data are:

1. Data (represented by the `qx.TimeseriesData` class)
2. Pandas Data Frame (represented by the `pd.DataFrame` class)
Expand All @@ -48,7 +52,7 @@ In this topic you'll learn about the `TimeseriesData` and `pd.DataFrame` formats

## Registering a callback for stream data

You can register a stream callback that is invoked when data is first received on a stream.
When it comes to registering your callbacks, the first step is to register a stream callback that is invoked when data is first received on a stream.

```python
topic_consumer.on_stream_received = on_stream_received_handler
Expand Down Expand Up @@ -104,9 +108,9 @@ Sometimes you need to convert time-series data into Panda data frames format for
df = ts.to_dataframe()
```

## "One message at a time" processing
## Stateless processing

Now that you have learned about stream data formats and callbacks, the following example shows a simple data processor.
Now that you have learned about stream data formats and callbacks, the following example shows a simple data processor. This will be an example of stateless processing, where messages are processed one at a time, and contain all information required for that processing.

This processor receives (consumes) data, processes it (transforms), and then publishes generated data (produces) on an output topic. This encapsulates the typical processing pipeline which consists of:

Expand Down Expand Up @@ -147,7 +151,7 @@ qx.App.run()

In this example the stream data is inbound in Pandas `DataFrame` [format](https://pandas.pydata.org/docs/reference/frame.html){target=_blank}.

Note that all information required to calculate `gForceTotal` is contained in the inbound data frame (the X, Y, and Z components of g-force). This is an example of "one message at a time" processing: no state needs to be preserved between messages.
Note that all information required to calculate `gForceTotal` is contained in the inbound data frame (the X, Y, and Z components of g-force). This is an example of stateless, or "one message at a time", processing: no state needs to be preserved between messages.

Further, if multiple replicas were used here, it would require no changes to your code, as each replica, running its own instance of the callback for the target stream, would simply calculate a value for `gForceTotal` based on the data in the data frame it received.

Expand All @@ -159,7 +163,7 @@ With stateful processing, additional complexity is introduced, as data now needs

There are problems with using global variables in your code to track state. The first is that callbacks are registered per-stream. This means that if you modify a global variable in a callback, it will be modified by all streams.

For example, consider the following problematic code:
For example, consider the following **problematic** code:

```python
...
Expand All @@ -180,6 +184,9 @@ def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.Dat
...
```

!!! warning
With the previous example code, all streams modify the global variable.

You might think this would give you the running total for a stream, but because the callback is registered for each stream, you'd actually get all streams modifying the global.

If you were running across multiple replicas, you'd get a running total for each replica, because each replica would have its own instance of the global variable. Again, the results would not be as you might expect.
Expand Down Expand Up @@ -210,21 +217,24 @@ def callback_handler (stream_consumer: qx.StreamConsumer, data: qx.TimeseriesDat
...
```

!!! warning
With the previous example code, you should note that the running total will not be preserved in the event of system crashes or restarts.

The key point here is that data is tracked per stream context. You keep running totals on a per-stream basis by using the stream ID, `stream_consumer.stream_id` to index a dictionary containing running totals for each stream.

!!! note

A stream will only ever be processed by one replica.

## Tracking running totals across multiple streams
## Handling system restarts and crashes

Sometimes you want to track a running total across all streams in a topic. The problem is that when you scale using replicas, there is no way to share data between all replicas in a consumer group.
One issue you may run into is that in-memory data is not persisted across instance restarts, shutdowns, and instance crashes. This can be mitigated by using the Quix Streams `LocalFileStorage` facility. This will ensure that specified variables are persisted on permanent storage, and this data is preserved across restarts, shutdowns, and system crashes.

The solution is to write the running total per stream (with stream ID) to an output topic. You can then have another processor in the pipeline to calculate total values from inbound messages.
The following example code demonstrates a simple use of `LocalFileStorage` to **persist data across system restarts and crashes**:

```python
...
g_running_total_per_stream = {}
g_running_total_per_stream = qx.InMemoryStorage(qx.LocalFileStorage())

def callback_handler (stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):

Expand All @@ -234,49 +244,65 @@ def callback_handler (stream_consumer: qx.StreamConsumer, data: qx.TimeseriesDat
...

g_running_total_per_stream[stream_consumer.stream_id] += some_value
data.add_value("RunningTotal", g_running_total_per_stream[stream_consumer.stream_id])

topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(data)
# read streams
def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
stream_consumer.timeseries.on_data_received = callback_handler

topic_consumer.on_stream_received = on_stream_received_handler
topic_consumer.on_committed = g_running_total_per_stream.flush
...
```

In this case the running total is published to its own stream in the output topic. The next service in the data processing pipeline would be able to sum all running totals across all streams in the output topic.
This ensures that the variable `g_running_total_per_stream` is persisted, as periodically (default is 20 seconds) it is flushed to local file storage.

## Handling system restarts and crashes
If the system crashes (or is restarted), Kafka resumes message processing from the last committed message. This facility is built into Kafka.

One issue you may run into is that in-memory data is not persisted across instance restarts, shutdowns, and instance crashes. This can be mitigated by using the Quix Streams `LocalFileStorage` facility. This will ensure that specified variables are persisted on permanent storage, and this data is preserved across restarts, shutdowns, and system crashes.
!!! tip

The following example code demonstrates a simple use of `LocalFile Storage`:
For this facility to work in Quix Platform you need to enable the State Management feature. You can enable it in the `Deployment` dialog, where you can also specify the size of storage required. When using Quix Streams with a third-party broker such as Kafka, no configuration is required, and data is automatically stored on the local file system.

## Tracking running totals across multiple streams

Sometimes you want to track a running total across all streams in a topic. The problem is that when you scale using replicas, there is no way to share data between all replicas in a consumer group.

The solution is to write the running total per stream (with stream ID) to an output topic. You can then have another processor in the pipeline to calculate total values from inbound messages. The following code demonstrates how to do this:

```python
my_var = qx.InMemoryStorage(qx.LocalFileStorage())
...
topic_consumer.on_stream_received = on_stream_received_handler
topic_consumer.on_committed = my_var.flush
...
```
g_running_total_per_stream = qx.InMemoryStorage(qx.LocalFileStorage())

This ensures that the variable `my_var` is persisted, as periodically (default is 20 seconds) it is flushed to local file storage.
def callback_handler (stream_consumer: qx.StreamConsumer, data: qx.TimeseriesData):

If the system crashes (or is restarted), Kafka resumes message processing from the last committed message. This facility is built into Kafka.
if stream_consumer.stream_id not in g_running_total_per_stream:
g_running_total_per_stream[stream_consumer.stream_id] = 0

...

!!! tip
g_running_total_per_stream[stream_consumer.stream_id] += some_value
data.add_value("RunningTotal", g_running_total_per_stream[stream_consumer.stream_id])

topic_producer.get_or_create_stream(stream_consumer.stream_id).timeseries.publish(data)
...
```

In this case the running total is published to its own stream in the output topic. The next service in the data processing pipeline would be able to sum all running totals across all streams in the output topic.

For this facility to work in Quix Platform you need to enable the State Management feature. You can enable it in the `Deployment` dialog, where you can also specify the size of storage required. When using Quix Streams with a third-party broker such as Kafka, no configuration is required, and data is stored on the local file system.
Also, in this example, the running total is persisted in file storage, and so is preserved in the event of service restarts and system crashes.

## Conclusion

In this topic you have learned:
In this documentation you have learned:

* How to perform simple "one message at a time" processing.
* How to perform stateless "one message at a time" processing.
* How to handle the situation where state needs to be preserved, and problems that can arise in naive code.
* How to persist state, so that your data is preserved in the event of restarts or crashes.

## Next steps

Continue your Quix Streams learning journey by reading the following more in-depth topics:
Continue your Quix Streams learning journey by reading the following more in-depth documentation:

* [Publishing data](publish.md)
* [Subscribing to data](subscribe.md)
* [Processing data](process.md)
* [State management](state-management.md)
* [State management](state-management.md)
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ markdown_extensions:
nav:
- 'Introduction': '../client-library-intro.html'
- 'Quickstart': 'quickstart.md'
- 'Data processing 101': 'using.md'
- 'Using Quix Streams': 'using.md'
- 'Kafka and Quix Streams': 'kafka.md'
- 'Features overview':
- 'Streaming context': 'features/streaming-context.md'
Expand Down