diff --git a/docs/tutorials/influxdb-alerting/add-alerting.md b/docs/tutorials/influxdb-alerting/add-alerting.md index 1444496c..60cf9c50 100644 --- a/docs/tutorials/influxdb-alerting/add-alerting.md +++ b/docs/tutorials/influxdb-alerting/add-alerting.md @@ -1,22 +1,18 @@ # Add an alerting service using PagerDuty -To add a PagerDuty alerting service to your pipeline: +You'll now add a destination that provides alerting. The following screenshot illustrates what the new service will look like, when added into your pipeline: -1. Sign up for a free PagerDuty account. - -2. In PagerDuty, create a new service, using the Events v2 API as the integration. Make sure you obtain a routing key, also called the integration key, from the `Integrations` tab of the service. + -3. In Quix, create a new destination on the output of your CPU Threshold service, the following screenshot illustrates what the completed pipeline looks like: +To add a PagerDuty alerting destination to your pipeline: -  +1. Sign up for a free PagerDuty account. -4. Edit `requirements.txt` to use Quix Streams v2: +2. In PagerDuty, create a new service, using the Events v2 API as the integration. Make sure you obtain a routing key, also called the integration key, from the `Integrations` tab of the service. - ``` - quixstreams==2.2.0a0 - ``` +3. In Quix, create a new destination on the output of your CPU Threshold service. Choose the starter destination that uses Streaming Data Frames. -5. Edit `main.py` and replace the existing code with the following: +4. Edit `main.py` and replace the existing code with the following: ``` python import os @@ -27,8 +23,7 @@ To add a PagerDuty alerting service to your pipeline: from typing import Dict from typing import Optional from quixstreams import Application - from quixstreams.models.serializers.quix import JSONDeserializer - + def build_alert(title: str, alert_body: str, dedup: str) -> Dict[str, Any]: routing_key = os.environ["PAGERDUTY_ROUTING_KEY"] return { @@ -61,13 +56,12 @@ To add a PagerDuty alerting service to your pipeline: def pg_message(row): alert_title = "CPU Threshold Alert" - alert_msg = "Average CPU load has reached " + str(row["value"]) + " %" print("Sending PagerDuty alert") - send_alert(alert_title, alert_msg) + send_alert(alert_title, row["alert"]["message"]) return app = Application.Quix("pagerduty-v1", auto_offset_reset="latest") - input_topic = app.topic(os.environ["input"], value_deserializer=JSONDeserializer()) + input_topic = app.topic(os.environ["input"]) sdf = app.dataframe(input_topic) sdf = sdf.update(pg_message) @@ -78,11 +72,11 @@ To add a PagerDuty alerting service to your pipeline: This code is based on the PagerDuty example code. -6. Now create a new secret to store your routing key, `PAGERDUTY_ROUTING_KEY`. You'll also need to create a corresponding environment variable, `PAGERDUTY_ROUTING_KEY`: +5. Now create a new secret to store your routing key, `PAGERDUTY_ROUTING_KEY`. You'll also need to create a corresponding environment variable, `PAGERDUTY_ROUTING_KEY`:  -7. Now deploy your service. +6. Deploy the service. When a message, which is in the JSON format, is received by the alerting service, it means the average CPU load has exceeded its specified threshold. An alert is therefore created and sent to PagerDuty. The PagerDuty service then notifies your team of the incident. You can also check your activity in PagerDuty: @@ -90,4 +84,4 @@ When a message, which is in the JSON format, is received by the alerting service ## 🏃♀️ Next step -[Part 10 - Summary :material-arrow-right-circle:{ align=right }](./summary.md) +[Part 6 - Summary :material-arrow-right-circle:{ align=right }](./summary.md) diff --git a/docs/tutorials/influxdb-alerting/create-transform.md b/docs/tutorials/influxdb-alerting/create-transform.md deleted file mode 100644 index 2a325eff..00000000 --- a/docs/tutorials/influxdb-alerting/create-transform.md +++ /dev/null @@ -1,47 +0,0 @@ -# Create a transform to convert your data to Quix format - -You'll now add a transform to convert the JSON data to Quix format. There are two reasons for this: - -1. To be able to display the CPU load waveform (time series data) in the Quix data explorer. -2. To be compatible with the Quix InfluxDB destination connector, which expects messages in Quix format at time of writing. - -## Add the transform - -Add a transform as you saw in the [InfluxDB Quickstart](../../integrations/databases/influxdb/quickstart.md). In brief: - -1. Go to `Code Samples` and locate the `Starter transformation SDF` (this version uses Quix Streams v2). -2. Edit the application code. - -Ensure that the input topic is `cpu-load` and the output is `cpu-load-transform`. - -## Edit the code - -Change the code in `main.py` for the transform to the following: - -``` python -import os -from quixstreams import Application, State -from quixstreams.models.serializers.quix import JSONDeserializer, QuixTimeseriesSerializer - -app = Application.Quix("transformation-v1", auto_offset_reset="latest") - -input_topic = app.topic(os.environ["input"], value_deserializer=JSONDeserializer()) -output_topic = app.topic(os.environ["output"], value_serializer=QuixTimeseriesSerializer()) - -sdf = app.dataframe(input_topic) -sdf = sdf.update(lambda row: print(row)) -sdf = sdf.to_topic(output_topic) - -if __name__ == "__main__": - app.run(sdf) -``` - -The data on the input topic, from your external Python program, is in simple JSON format. You need to convert it to Quix to be able to display it in the Quix data explorer, and also to be compatible with the current InfluxDB destination connector. - -The new transform code reads data from the input topic as JSON, and publishes it to the output topic in Quix format. - -Once you've made your changes, tag your code (optional), and then deploy the transform. - -## 🏃♀️ Next step - -[Part 4 - Examine your data :material-arrow-right-circle:{ align=right }](./data-explorer.md) diff --git a/docs/tutorials/influxdb-alerting/data-explorer.md b/docs/tutorials/influxdb-alerting/data-explorer.md deleted file mode 100644 index fa27600a..00000000 --- a/docs/tutorials/influxdb-alerting/data-explorer.md +++ /dev/null @@ -1,17 +0,0 @@ -# Examine your data with Quix data explorer - -Once your deployed transform is running, you can view the data it publishes in the Quix data explorer. - -In the pipeline view, on the output topic `cpu-load-transform`, click `Explore live data` (or alternatively go to the data explorer). Select the stream you want to view (in this case `server-1-cpu`), and the parameter `CPULoad`. - -The waveform view displays something similar to the following: - - - -If you examine the format in the `Messages` tab, you'll see the data has the following format: - - - -## 🏃♀️ Next step - -[Part 5 - Add InfluxDB destination (sink) :material-arrow-right-circle:{ align=right }](./influxdb-destination.md) diff --git a/docs/tutorials/influxdb-alerting/downsampling.md b/docs/tutorials/influxdb-alerting/downsampling.md deleted file mode 100644 index d8dac388..00000000 --- a/docs/tutorials/influxdb-alerting/downsampling.md +++ /dev/null @@ -1,50 +0,0 @@ -# Downsampling with an aggregation - -While CPU spikes might be acceptable in the short term, they might be more concerning if such levels are sustained over a longer period of time. For detecting such a condition, an aggregation using a tumbling window could be implemented. Let's say you want to raise an alert if the CPU level exceeds a certain average level over some time period. You could use code such as the following: - -``` python -import os -from quixstreams import Application, State -from quixstreams.models.serializers.quix import QuixDeserializer, JSONSerializer -from datetime import timedelta - -def threshold_detect(row): - # If average value greater than 10 - if row['value'] > 10: - print ('CPU average', row['value']) - return row - -app = Application.Quix("transformation-v1", auto_offset_reset="latest") - -input_topic = app.topic(os.environ["input"], value_deserializer=QuixDeserializer()) -output_topic = app.topic(os.environ["output"], value_serializer=JSONSerializer()) - -sdf = app.dataframe(input_topic) -sdf = sdf.apply(lambda row: row["CPULoad"]) -sdf = sdf.tumbling_window(timedelta(seconds=10)).mean().final() -sdf = sdf.apply(threshold_detect) -sdf = sdf.filter(lambda row: row != None) -sdf = sdf.update(lambda row: print(row)) -sdf = sdf.to_topic(output_topic) - -if __name__ == "__main__": - app.run(sdf) -``` - -Note the threshold values and time for the tumbling window were chosen as quite low values to make testing easier. - -The code has been modified to take an average of the CPU load over the time span of the tumbling window. If this average is greater than the threshold, then a JSON object containing the start and end times of the window, and the CPU average value. This can be passed to further services as required. The output JSON has the following format: - -``` json -{ - "start": 1708014640000, - "end": 1708014650000, - "value": 14.5 -} -``` - -The code also filters the case where the function returns a `null` row (the threshold has not been exceeded), using the `filter` function. - -## 🏃♀️ Next step - -[Part 9 - Add alerting :material-arrow-right-circle:{ align=right }](./add-alerting.md) diff --git a/docs/tutorials/influxdb-alerting/external-source.md b/docs/tutorials/influxdb-alerting/external-source.md index 5bcb7251..04196b57 100644 --- a/docs/tutorials/influxdb-alerting/external-source.md +++ b/docs/tutorials/influxdb-alerting/external-source.md @@ -1,4 +1,4 @@ -# Create an external source +# Add an external source At this point you have an external program sending data into Quix, and it is writing into a Quix topic. However, you can't see this external program in the Pipeline view. To help you visualize what you've created, you can add an external source component, to provide a visual entity in the pipeline view. To do this, log into Quix Cloud: @@ -39,5 +39,5 @@ Note, this video is for a different project, but the principle is the same. ## 🏃♀️ Next step -[Part 3 - Develop a transform :material-arrow-right-circle:{ align=right }](./create-transform.md) +[Part 3 - Add InfluxDB destination :material-arrow-right-circle:{ align=right }](./influxdb-destination.md) diff --git a/docs/tutorials/influxdb-alerting/images/add-transform-to-source.png b/docs/tutorials/influxdb-alerting/images/add-transform-to-source.png deleted file mode 100644 index dbc48918..00000000 Binary files a/docs/tutorials/influxdb-alerting/images/add-transform-to-source.png and /dev/null differ diff --git a/docs/tutorials/influxdb-alerting/images/alerting-pipeline.png b/docs/tutorials/influxdb-alerting/images/alerting-pipeline.png index 9248c737..4b3a699c 100644 Binary files a/docs/tutorials/influxdb-alerting/images/alerting-pipeline.png and b/docs/tutorials/influxdb-alerting/images/alerting-pipeline.png differ diff --git a/docs/tutorials/influxdb-alerting/images/data-explorer-cpu-waveform.png b/docs/tutorials/influxdb-alerting/images/data-explorer-cpu-waveform.png deleted file mode 100644 index 03a6ea24..00000000 Binary files a/docs/tutorials/influxdb-alerting/images/data-explorer-cpu-waveform.png and /dev/null differ diff --git a/docs/tutorials/influxdb-alerting/images/influxdb-alerting-pipeline.png b/docs/tutorials/influxdb-alerting/images/influxdb-alerting-pipeline.png deleted file mode 100644 index 52dcca3a..00000000 Binary files a/docs/tutorials/influxdb-alerting/images/influxdb-alerting-pipeline.png and /dev/null differ diff --git a/docs/tutorials/influxdb-alerting/images/influxdb-query.png b/docs/tutorials/influxdb-alerting/images/influxdb-query.png deleted file mode 100644 index c8dcd67a..00000000 Binary files a/docs/tutorials/influxdb-alerting/images/influxdb-query.png and /dev/null differ diff --git a/docs/tutorials/influxdb-alerting/images/timeseries-raw-data.png b/docs/tutorials/influxdb-alerting/images/timeseries-raw-data.png deleted file mode 100644 index 54621364..00000000 Binary files a/docs/tutorials/influxdb-alerting/images/timeseries-raw-data.png and /dev/null differ diff --git a/docs/tutorials/influxdb-alerting/influxdb-destination.md b/docs/tutorials/influxdb-alerting/influxdb-destination.md index b59d0430..66b29701 100644 --- a/docs/tutorials/influxdb-alerting/influxdb-destination.md +++ b/docs/tutorials/influxdb-alerting/influxdb-destination.md @@ -1,23 +1,25 @@ # Add an InfluxDB destination connector -You learned how to do this in the [InfluxDB Quickstart](../../integrations/databases/influxdb/quickstart.md). Make sure the input to the destination is the `cpu-load-transform` topic. +Now add an InfluxDB destination. In this case you'll subscribe to data coming from the external source (CPU load data from your laptop in this case) and then write it directly to InfluxDB for persistence. - +!!! tip + + You learned how to do this in the [InfluxDB Quickstart](../../integrations/databases/influxdb/quickstart.md). -Configure the connector with your InfluxDB credentials. Deploy your connector. +Make sure the input to the destination is the `cpu-load` topic. -Your pipeline now looks like this: +Configure the connector with your InfluxDB credentials. Deploy your connector. Raw CPU load data is stored in InfluxDB. - +You can now log into your InfluxDB Cloud account and query your bucket for data. -You can now log into your InfluxDB Cloud account and query your bucket for data. The following screenshot shows the results for a typical query: +## Optional filtering - +In this case you connected your InfluxDB destination (sink) directly to the External Source. All inbound data is therefore written to InfluxDB. In some cases you may prefer to filter the data before writing it to InfluxDB. To do this simply add a transform to the output of the External Source, add the filtering code suitable for your use case, and then connect the InfluxDB destination to the output of your transform. See the next step for an [example on how to do a filtering tranasform](./threshold-detection.md), should you need to refilter data before writing it to InfluxDB. -You have now concluded the first part of the pipeline, where you learned how to get data into Quix, transform it, and stream that data to InfluxDB. You saw that very little code and configuration was required, and you worked in Python. +## Optional reading back from InfluxDB -In the next part of the tutorial you build a pipline with an InfluxDB source (this queries InfluxDB using polling for new data), add a threshold detection transform, and add an alerting service. +You could optionally add an InfluxDB source connector to your pipeline. You learned how to do this in the [InfluxDB Quickstart](../../integrations/databases/influxdb/quickstart.md). This would enable you to read data from your InfluxDB database, and publish it to a topic of your choice. Once data is published to a topic, you can add any additional processing required by connecting transforms you develop in Python to this topic. For a detailed example of this see the [Predictive maintenance tutorial](../predictive-maintenance/overview.md). ## 🏃♀️ Next step -[Part 6 - Add InfluxDB source :material-arrow-right-circle:{ align=right }](./influxdb-source.md) +[Part 4 - Add threshold detection :material-arrow-right-circle:{ align=right }](./threshold-detection.md) diff --git a/docs/tutorials/influxdb-alerting/influxdb-source.md b/docs/tutorials/influxdb-alerting/influxdb-source.md deleted file mode 100644 index 6de7e386..00000000 --- a/docs/tutorials/influxdb-alerting/influxdb-source.md +++ /dev/null @@ -1,15 +0,0 @@ -# Add an InfluxDB source connector - -You learned how to do this in the [InfluxDB Quickstart](../../integrations/databases/influxdb/quickstart.md). - -You can reuse your `INFLUXDB_ORG` and `INFLUXDB_TOKEN`secrets, and set the other variables to the same as you used when setting up the InfluxDB destination connector. - -Set the `task_interval` - you can set this to `1s` (one second) to ensure you see any new data promptly (this make testing a little easier as you don't need to wait too long for updates). - -Add a new topic `influxdb-cpu-load` for the configured output topic. This will help avoid confusion with the topics you created in the Quickstart. - -When you have completed the configuration, deploy the service. - -## 🏃♀️ Next step - -[Part 7 - Add threshold detection :material-arrow-right-circle:{ align=right }](./threshold-detection.md) diff --git a/docs/tutorials/influxdb-alerting/overview.md b/docs/tutorials/influxdb-alerting/overview.md index 6080f1d2..5009fdb4 100644 --- a/docs/tutorials/influxdb-alerting/overview.md +++ b/docs/tutorials/influxdb-alerting/overview.md @@ -1,7 +1,9 @@ -# Alerting with InfluxDB, Quix Streams and PagerDuty +# Event detection and alerting featuring InfluxDB and PagerDuty In this tutorial you learn how to create a CPU overload alerting pipeline with Quix, Quix Streams, InfluxDB, and PagerDuty. +You gather CPU data from your laptop, and store this directly in InfluxDB. You also add a real-time event detection transform to detect if your CPU exceeds a threshold value, and if so, sends an alert to PagerDuty. + !!! note This tutorial uses Quix Streams v2. @@ -38,23 +40,15 @@ This tutorial is divided up into several parts, to make it a more manageable lea 1. [Write the Python client](./python-client.md) - you write a command-line program using Quix Streams to get CPU load data into your pipeline. -2. [Create an external source](./external-source.md) - you create an external source - this enables your command-line program to be visible in the pipeline. - -3. [Develop a transform](./create-transform.md) - you write a transform to convert inbound JSON data to a Quix format to be compatible with our InfluxDB connector and Quix data explorer. - -4. [Examine your data](./data-explorer.md) - you use Quix data explorer to examine the data produced by your transform. - -5. [Add an InfluxDB destination](./influxdb-destination.md) - you add a Quix InfluxDB destination connector to your pipeline. - -6. [Add an InfluxDB source](./influxdb-source.md) - you add a Quix InfluxDB source connector to your pipeline. +2. [Add an external source](./external-source.md) - you add an external source - this enables your command-line program to be visible in the pipeline. -7. [Add threshold detection](./threshold-detection.md) - you add a threshold detection transform. +3. [Add an InfluxDB destination](./influxdb-destination.md) - you add a Quix InfluxDB destination connector (sink) to your pipeline. CPU load data is stored directly in InfluxDB. -8. [Downsample your data](./downsampling.md) - you use an aggregation to downsample your data. +4. [Create a threshold detection transform](./threshold-detection.md) - you develop a threshold detection transform. This determines if a CPU load threshold has been exceeded, and if so publishes a message to the output topic. -9. [Add alerting](./add-alerting.md) - add alerting using PagerDuty. +5. [Create an alerting sink](./add-alerting.md) - adds alerting using PagerDuty. You add a PagerDuty destination (sink) to the pipeline. If a message is received by the sink, a message is sent to PagerDuty. -10. [Summary](./summary.md) - conclusion and next steps. +6. [Summary](./summary.md) - conclusion and next steps. ## 🏃♀️ Next step diff --git a/docs/tutorials/influxdb-alerting/python-client.md b/docs/tutorials/influxdb-alerting/python-client.md index 1817cbfb..439b16be 100644 --- a/docs/tutorials/influxdb-alerting/python-client.md +++ b/docs/tutorials/influxdb-alerting/python-client.md @@ -2,37 +2,16 @@ In this part you write a command-line program to read the CPU load of your laptop, and publish that data to Quix. -## Set your environment variables - -First, in order to use Quix Streams on the command line (as opposed to working in Quix Cloud), you need to set the following environment variables: - -* `Quix__Sdk__Token` -* `Quix__Portal__Api` - -Note, these variables use **double** underscores. - -To obtain these values you can go to `Settings` in your environment, and then click on the `APIs and tokens tab`. You can obtain the `Streaming Token` and the Portal API URL from there. - -Create a `.env` file containing your environment variables: - -``` -Quix__Sdk__Token="sdk-12345" -Quix__Portal__Api="portal-api.platform.quix.io" -``` - -!!! note - - The SDK token and streaming token are the same thing. The SDK token is now called the streaming token in the UI. - ## Add the Python code Using your editor of choice, create a file called `cpu_load.py`. Add the following code: ```python import psutil +import os import time +import json from quixstreams import Application -from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext from dotenv import load_dotenv load_dotenv() @@ -41,39 +20,65 @@ def get_cpu_load(): cpu_load = psutil.cpu_percent(interval=1) return cpu_load -app = Application.Quix( - consumer_group="cpu_load", - auto_create_topics=True, -) - -serializer = JSONSerializer() -output_topic = app.topic("cpu-load") +app = Application.Quix() +output_topic = app.topic(os.environ["output"]) producer = app.get_producer() def main(): while True: + cpu_load = get_cpu_load() print("CPU load: ", cpu_load) timestamp = int(time.time_ns()) # Quix timestamp is nano seconds - message = {"Timestamp": timestamp, "CPULoad": cpu_load} + message = {"timestamp": timestamp, "cpu_load": cpu_load} + with producer: - serialized_value = serializer( - value=message, ctx=SerializationContext(topic=output_topic.name) - ) producer.produce( topic=output_topic.name, key="server-1-cpu", - value=serialized_value + value=json.dumps(message) ) if __name__ == '__main__': try: main() except KeyboardInterrupt: - print('Exiting due to keyboard interrupt') + print('Exiting due to keyboard interrupt') +``` + +## Set environment variables + +If you're not using the CLI, then in order to use Quix Streams on the command line (as opposed to working in Quix Cloud), you need to set the following environment variables: + +* `Quix__Sdk__Token` +* `Quix__Portal__Api` + +Note, these variables use **double** underscores. + +To obtain these values you can go to `Settings` in your environment, and then click on the `APIs and tokens tab`. You can obtain the `Streaming Token` and the Portal API URL from there. + +Create a `.env` file containing your environment variables: + +``` +Quix__Sdk__Token="sdk-12345" +Quix__Portal__Api="portal-api.platform.quix.io" +``` + +You'd then need to add the following lines near the top of your code: + +``` python +# Load env variables from .env +from dotenv import load_dotenv +load_dotenv() ``` -Save the file, and run it using a command similar to the following (the exact command you use depends on your Python set up): +!!! note + + The SDK token and streaming token are the same thing. The SDK token is now called the streaming token in the UI. + +## Run the code + +Run the code using a command similar to the following (the exact command you use depends on your Python set up): ``` python3 cpu_load.py diff --git a/docs/tutorials/influxdb-alerting/summary.md b/docs/tutorials/influxdb-alerting/summary.md index d31336cf..cf319619 100644 --- a/docs/tutorials/influxdb-alerting/summary.md +++ b/docs/tutorials/influxdb-alerting/summary.md @@ -1,6 +1,6 @@ # Summary -In this tutorial you have learned how to build a pipeline to get your laptop CPU load data into and out of InfluxDB using Quix Streams v2. +In this tutorial you have learned how to build a pipeline to get your laptop CPU load data into InfluxDB using Quix Streams v2. You have also seen how to add threshold detection to your pipeline, and built a PagerDuty destination to make sure alerts go to the engineer on duty. diff --git a/docs/tutorials/influxdb-alerting/threshold-detection.md b/docs/tutorials/influxdb-alerting/threshold-detection.md index ee57dd33..2c448621 100644 --- a/docs/tutorials/influxdb-alerting/threshold-detection.md +++ b/docs/tutorials/influxdb-alerting/threshold-detection.md @@ -1,47 +1,151 @@ # Add a CPU load threshold detection transform -You now add a transform to detect when CPU threshold is exceeded. Click `Add new` and locate the `Starter transformation SDF` again. - - +You now add a transform to detect when CPU threshold is exceeded. Click `Add new` on the output of your external source, and add the `Starter transformation SDF`. You can use the defaults, or rename your transform to something like `CPU Threshold`. Then click on `Edit code`. You can rename the output topic to `cpu-threshold-transform`. -You'll add new code to `main.py`. +You'll replace the code in `main.py` with the following: ``` python import os -from quixstreams import Application, State -from quixstreams.models.serializers.quix import QuixDeserializer, QuixTimeseriesSerializer +from quixstreams import Application + +app = Application.Quix("transformation-v1", auto_offset_reset="latest") + +input_topic = app.topic(os.environ["input"]) +output_topic = app.topic(os.environ["output"]) + +sdf = app.dataframe(input_topic) + +# Filter in all rows where CPU load is over 20. +sdf = sdf.filter(lambda row: row["cpu_load"] > 20) + +# Produce message payload with alert. +sdf = sdf.apply(lambda row: { + "alert": { + "timestamp": row["timestamp"], + "title": "CPU overload", + "message": "CPU value is " + row["cpu_load"] + } +}) + +sdf = sdf.to_topic(output_topic) + +if __name__ == "__main__": + app.run(sdf) +``` + +Here, a very simple filter function checks if the inbound data contains a CPU load above a fixed limit (set to 20 here for ease of testing). The filter filters in all rows where CPU is over the threshold. + +You can test the application is running by loading some CPU intensive apps on your laptop. When the threshold is exceeed it will send a message of the following format to the output topic: + +``` json +{ + "alert": { + "timestamp": 1710501507863622000, + "title": "CPU overload", + "message": "CPU value is 35.5" + } +} +``` + +## Windowing -def threshold_detect(row): - if row['CPULoad'] > 20: - print ('CPU overload') +While CPU spikes might be acceptable in the short term, they might be more concerning if such levels are sustained over a longer period of time. For detecting such a condition, an aggregation using a tumbling window could be implemented. Let's say you want to raise an alert if the CPU level exceeds a certain average level over some time period. You could use a time-based windowing function such as illustrated by the following code example: + +``` python +import os +from quixstreams import Application +from datetime import timedelta app = Application.Quix("transformation-v1", auto_offset_reset="latest") -input_topic = app.topic(os.environ["input"], value_deserializer=QuixDeserializer()) -output_topic = app.topic(os.environ["output"], value_serializer=QuixTimeseriesSerializer()) +input_topic = app.topic(os.environ["input"]) +output_topic = app.topic(os.environ["output"]) sdf = app.dataframe(input_topic) -sdf = sdf.update(threshold_detect) + +sdf = sdf.apply(lambda row: row["cpu_load"]) \ + .tumbling_window(timedelta(seconds=10)).mean().final() + +# Filter all rows where CPU load is over 20. +sdf = sdf.filter(lambda row: row["cpu_load"] > 20) + +sdf["window_duration_s"] = (sdf["end"] - sdf["start"]) / 1000 + +# Produce message payload with alert. +sdf = sdf.apply(lambda row: { + "alert": { + "timestamp": row["end"], + "title": "CPU overload", + "message": f"CPU {row["cpu_load"]} for duration of {row["window_duration_s"]} seconds." + } +}) + sdf = sdf.to_topic(output_topic) if __name__ == "__main__": app.run(sdf) ``` -Here, a very simple function checks if the inbound data contains a CPU load above a fixed limit (set to 20 here for ease of testing). +Replace the code in `main.py` with the windowing code, if you want to test that out. + +??? "Advanced version" + + Version of code that sends the alert only once: + + ``` python + import os + from quixstreams import Application, State + from datetime import timedelta + + app = Application.Quix("transformation-v1", auto_offset_reset="latest") + + input_topic = app.topic(os.environ["input"]) + output_topic = app.topic(os.environ["output"]) + + sdf = app.dataframe(input_topic) + + sdf = sdf.apply(lambda row: row["cpu_load"]) \ + .tumbling_window(timedelta(seconds=10)).mean().final() + + sdf["window_duration_s"] = (sdf["end"] - sdf["start"]) / 1000 + + def is_alert(row: dict, state: State): + + is_alert_sent_state = state.get("is_alert_sent", False) + + if row["cpu_load"] > 20: + if not is_alert_sent_state: + state.set("is_alert_sent", True) + return True + else: + return False + else: + state.set("is_alert_sent", False) + return False + + + sdf = sdf.filter(is_alert, stateful=True) -Note the data is unchanged, it is simply published as is to the output for now. + # Produce message payload with alert. + sdf = sdf.apply(lambda row: { + "alert": { + "timestamp": row["end"], + "title": "CPU overload", + "message": f"CPU {row["cpu_load"]} for duration of {row["window_duration_s"]} seconds." + } + }) -You can test the application is running by loading some CPU intensive apps on your laptop. You'll see messages printed to the console if the threshold is exceeded: + sdf = sdf.to_topic(output_topic) -!!! note + if __name__ == "__main__": + app.run(sdf) + ``` - This is a just an example approach. It would perhaps be better to put the threshold detection in the first pipeline, to detect this issue in real time. This code could easily be added to the conversion transform you created earlier. In this case you are querying the database for problematic values, just to show an alternative approach. ## 🏃♀️ Next step -[Part 8 - Downsample your data :material-arrow-right-circle:{ align=right }](./downsampling.md) +[Part 5 - Add PagerDuty alerting :material-arrow-right-circle:{ align=right }](./add-alerting.md) diff --git a/docs/tutorials/overview.md b/docs/tutorials/overview.md index 744eb671..28ca9d94 100644 --- a/docs/tutorials/overview.md +++ b/docs/tutorials/overview.md @@ -8,21 +8,21 @@ Some tutorials use [project templates](../get-started/project-templates.md) - th