From 4d82fc4b9fec2d1bb3b63d20c4651c4b10e9d592 Mon Sep 17 00:00:00 2001 From: tbedford Date: Tue, 7 Feb 2023 15:52:26 +0000 Subject: [PATCH 1/4] [chore] - update to reflect changes to sdk --- README.md | 6 +++--- docs/connect.md | 4 ++-- docs/read.md | 12 ++++++------ docs/state-management.md | 6 +++--- docs/write.md | 2 +- src/PythonClient/local_testing/README.md | 2 +- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index c60f1cd55..48c267946 100644 --- a/README.md +++ b/README.md @@ -195,17 +195,17 @@ These scripts compile the C# base library and then use the `InteropGenerator` pr ``` ┌───────────────────────────┐ - │ Python client library │ /Python/lib/quixstreaming + │ Python client library │ /Python/lib/quixstreams └─────────────┬─────────────┘ │ │ ┌─────────────▼─────────────┐ - │ Python Interop wrapper │ /Python/lib/quixstreaming/native/Python (auto-generated) + │ Python Interop wrapper │ /Python/lib/quixstreams/native/Python (auto-generated) └─────────────┬─────────────┘ │ │ ┌─────────────▼─────────────┐ - │ C# AoT compiled library │ /Python/lib/quixstreaming/native/win64 (auto-generated) + │ C# AoT compiled library │ /Python/lib/quixstreams/native/win64 (auto-generated) └───────────────────────────┘ ``` diff --git a/docs/connect.md b/docs/connect.md index dcd16320c..745d9f418 100644 --- a/docs/connect.md +++ b/docs/connect.md @@ -52,14 +52,14 @@ The following code shows you how to set up the `SecurityOptions` for your connec ``` python security = SecurityOptions(CERTIFICATES_FOLDER, QUIX_USER, QUIX_PASSWORD) - client = StreamingClient('kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093,kafka-k3.quix.ai:9093', security) + client = KafkaStreamingClient('kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093,kafka-k3.quix.ai:9093', security) ``` === "C\#" ``` cs var security = new SecurityOptions(CERTIFICATES_FOLDER, QUIX_USER, QUIX_PASSWORD); - var client = new Quix.Sdk.Streaming.StreamingClient("kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093,kafka-k3.quix.ai:9093", security); + var client = new Quix.Sdk.Streaming.KafkaStreamingClient("kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093,kafka-k3.quix.ai:9093", security); ``` === "JavaScript" diff --git a/docs/read.md b/docs/read.md index 98b3fb3fd..35aaf6007 100644 --- a/docs/read.md +++ b/docs/read.md @@ -465,7 +465,7 @@ If you wish to use different automatic commit intervals, use the following code: === "Python" ``` python - from quixstreaming import CommitOptions + from quixstreams import CommitOptions commit_settings = CommitOptions() commit_settings.commit_every = 100 # note, you can set this to none @@ -494,7 +494,7 @@ Some use cases need manual committing to mark completion of work, for example wh === "Python" ``` python - from quixstreaming import CommitMode + from quixstreams import CommitMode input_topic = client.open_input_topic('yourtopic', commit_settings=CommitMode.Manual) ``` @@ -610,7 +610,7 @@ One or more streams are revoked from your client. You can no longer commit to th === "Python" ``` python - from quixstreaming import StreamReader + from quixstreams import StreamReader def on_streams_revoked_handler(readers: [StreamReader]): for reader in readers: @@ -668,9 +668,9 @@ This is a minimal code example you can use to read data from a topic using the Q === "Python" ``` python - from quixstreaming import * - from quixstreaming.app import App - from quixstreaming.models.parametersbufferconfiguration import ParametersBufferConfiguration + from quixstreams import * + from quixstreams.app import App + from quixstreams.models.parametersbufferconfiguration import ParametersBufferConfiguration import sys import signal import threading diff --git a/docs/state-management.md b/docs/state-management.md index cec89a931..af07bff46 100644 --- a/docs/state-management.md +++ b/docs/state-management.md @@ -8,12 +8,12 @@ The Quix SDK has state management built in to allow values to be used and persis ## Usage -To use the SDK’s state management feature create an instance of *LocalFileStorage*. This is in *quixstreaming.state.localfilestorage*. Then use the set, get, containsKey and clear methods to manipulate the state as needed. +To use the SDK’s state management feature create an instance of *LocalFileStorage*. This is in *quixstreams.state.localfilestorage*. Then use the `set`, `get`, `containsKey` (`contains_key` for Python), and `clear` methods to manipulate the state as needed. === "Python" ``` python - from quixstreaming.state.localfilestorage import LocalFileStorage + from quixstreams.state.localfilestorage import LocalFileStorage storage = LocalFileStorage() @@ -30,7 +30,7 @@ To use the SDK’s state management feature create an instance of *LocalFileStor storage.set("KEY4", False) #check if the storage contains key - storage.containsKey("KEY1") + storage.contains_key("KEY1") #get value value = storage.get("KEY1") diff --git a/docs/write.md b/docs/write.md index 83f54abc7..dd6f8cc9c 100644 --- a/docs/write.md +++ b/docs/write.md @@ -938,7 +938,7 @@ This is a minimal code example you can use to write data to a topic using the Qu import datetime import math - from quixstreaming import * + from quixstreams import * # Quix injects credentials automatically to the client. Alternatively, you can always pass an SDK token manually as an argument. client = QuixStreamingClient() diff --git a/src/PythonClient/local_testing/README.md b/src/PythonClient/local_testing/README.md index e6ecbcec1..74f5b1039 100644 --- a/src/PythonClient/local_testing/README.md +++ b/src/PythonClient/local_testing/README.md @@ -9,7 +9,7 @@ scp -r ./lib user@targetip:~/app/lib python3 -m virtualenv env && \ chmod +x ./env/bin/activate && \ # NOTE: for command below, use whichever python you have - mv ./quixstreaming ./env/lib/python3.8/site-packages/ && \ + mv ./quixstreams ./env/lib/python3.8/site-packages/ && \ . ./env/bin/activate python3 -m pip install -r requirements.txt --extra-index-url https://pkgs.dev.azure.com/quix-analytics/53f7fe95-59fe-4307-b479-2473b96de6d1/_packaging/public/pypi/simple/ From 4676a5c707def5f31a840b1aa54be4763c219f47 Mon Sep 17 00:00:00 2001 From: tbedford Date: Tue, 7 Feb 2023 16:04:34 +0000 Subject: [PATCH 2/4] [chore] - use with in on_parameter_data_handler --- docs/process.md | 39 ++++++++++++++++++++------------------- docs/read.md | 29 +++++++++++++++-------------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/docs/process.md b/docs/process.md index 0c9df3ccd..4b5adfe18 100644 --- a/docs/process.md +++ b/docs/process.md @@ -19,16 +19,17 @@ Let’s see some examples of how to read and write data in a Data processor usin ``` python # Callback triggered for each new data frame def on_parameter_data_handler(data: ParameterData): + with data: - df = data.to_panda_frame() # Input data frame - output_df = pd.DataFrame() - output_df["time"] = df["time"] - output_df["TAG__LapNumber"] = df["TAG__LapNumber"] - - # If braking force applied is more than 50%, we mark HardBraking with True - output_df["HardBraking"] = df.apply(lambda row: "True" if row.Brake > 0.5 else "False", axis=1) - - stream_output.parameters.write(output_df) # Send data back to the stream + df = data.to_panda_frame() # Input data frame + output_df = pd.DataFrame() + output_df["time"] = df["time"] + output_df["TAG__LapNumber"] = df["TAG__LapNumber"] + + # If braking force applied is more than 50%, we mark HardBraking with True + output_df["HardBraking"] = df.apply(lambda row: "True" if row.Brake > 0.5 else "False", axis=1) + + stream_output.parameters.write(output_df) # Send data back to the stream ``` === "Python - Plain" @@ -36,16 +37,16 @@ Let’s see some examples of how to read and write data in a Data processor usin ``` python # Callback triggered for each new data frame def on_parameter_data_handler(data: ParameterData): - - for row in data.timestamps: - # If braking force applied is more than 50%, we mark HardBraking with True - hard_braking = row.parameters["Brake"].numeric_value > 0.5 - - stream_output.parameters \ - .add_timestamp(row.timestamp) \ - .add_tag("LapNumber", row.tags["LapNumber"]) \ - .add_value("HardBraking", hard_braking) \ - .write() + with data: + for row in data.timestamps: + # If braking force applied is more than 50%, we mark HardBraking with True + hard_braking = row.parameters["Brake"].numeric_value > 0.5 + + stream_output.parameters \ + .add_timestamp(row.timestamp) \ + .add_tag("LapNumber", row.tags["LapNumber"]) \ + .add_value("HardBraking", hard_braking) \ + .write() ``` === "C\#" diff --git a/docs/read.md b/docs/read.md index 35aaf6007..15d1b2541 100644 --- a/docs/read.md +++ b/docs/read.md @@ -109,10 +109,10 @@ For instance, in the following example we read and print the first timestamp and def on_stream_received_handler(new_stream: StreamReader): def on_parameter_data_handler(data: ParameterData): - - timestamp = data.timestamps[0].timestamp - num_value = data.timestamps[0].parameters['ParameterA'].numeric_value - print("ParameterA - " + str(timestamp) + ": " + str(num_value)) + with data: + timestamp = data.timestamps[0].timestamp + num_value = data.timestamps[0].parameters['ParameterA'].numeric_value + print("ParameterA - " + str(timestamp) + ": " + str(num_value)) new_stream.on_read += on_parameter_data_handler @@ -248,9 +248,10 @@ Reading data from that buffer is as simple as using its `OnRead` event. For each ``` python def on_parameter_data_handler(data: ParameterData): - timestamp = data.timestamps[0].timestamp - num_value = data.timestamps[0].parameters['ParameterA'].numeric_value - print("ParameterA - " + str(timestamp) + ": " + str(num_value)) + with data: + timestamp = data.timestamps[0].timestamp + num_value = data.timestamps[0].parameters['ParameterA'].numeric_value + print("ParameterA - " + str(timestamp) + ": " + str(num_value)) buffer.on_read += on_parameter_data_handler ``` @@ -383,10 +384,10 @@ def read_stream(new_stream: StreamReader): buffer = new_stream.parameters.create_buffer() def on_parameter_data_handler(data: ParameterData): - - # read from input stream - df = data.to_panda_frame() - print(df.to_string()) + with data: + # read from input stream + df = data.to_panda_frame() + print(df.to_string()) buffer.on_read += on_parameter_data_handler @@ -686,9 +687,9 @@ This is a minimal code example you can use to read data from a topic using the Q buffer = new_stream.parameters.create_buffer() def on_parameter_data_handler(data: ParameterData): - - df = data.to_panda_frame() - print(df.to_string()) + with data: + df = data.to_panda_frame() + print(df.to_string()) buffer.on_read += on_parameter_data_handler From 8af7cbb8144286c3db749e6beb75ba1467c4a8c1 Mon Sep 17 00:00:00 2001 From: tbedford Date: Tue, 7 Feb 2023 16:22:29 +0000 Subject: [PATCH 3/4] [chore] - use with in on_event_data_handler --- docs/read.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/read.md b/docs/read.md index 15d1b2541..389c57c84 100644 --- a/docs/read.md +++ b/docs/read.md @@ -422,7 +422,8 @@ Reading events from a stream is as easy as reading parameter data. In this case, ``` python def on_event_data_handler(data: EventData): - print("Event read for stream. Event Id: " + data.Id) + with data: + print("Event read for stream. Event Id: " + data.Id) new_stream.events.on_read += on_event_data_handler ``` From 8b0f1f418804e37887b49f4c18881910edb613e6 Mon Sep 17 00:00:00 2001 From: tbedford Date: Tue, 7 Feb 2023 16:27:08 +0000 Subject: [PATCH 4/4] [chore] - use with in from_panda_frame --- docs/write.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/write.md b/docs/write.md index dd6f8cc9c..49e6116dc 100644 --- a/docs/write.md +++ b/docs/write.md @@ -660,7 +660,8 @@ Alternatively, you can convert a Pandas Data Frame to a [ParameterData](#paramet ``` python data = ParameterData.from_panda_frame(df) -stream.parameters.buffer.write(data) +with data: + stream.parameters.buffer.write(data) ``` !!! tip