Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c6d0978
State improvements
Apr 13, 2023
410e0ad
Implement TopicState
Apr 13, 2023
487a862
In c# integration tests use platform specific image
Apr 17, 2023
e0e7e26
Refactor State
Apr 18, 2023
ef27433
More StreamState work
Apr 18, 2023
54a1021
Up version to 0.5.3 and dev 1
Apr 18, 2023
ffd4f14
add management API on top of stream states and also add unit test
Apr 18, 2023
ba24ff6
Update InteropGenerator
Apr 19, 2023
5db5498
Rework state managers
Apr 25, 2023
b022ae6
Remove TopicState
Apr 25, 2023
96e4230
Change how prefixing works for sub streams
Apr 26, 2023
33b9422
Simplify the naming for localfilestorage
Apr 26, 2023
fa6879e
Fix documentation for raw_topic
Apr 26, 2023
263389d
InMemoryStorage exposed in python
Apr 27, 2023
5580997
up to 0.5.4.dev1
Apr 27, 2023
9ced32a
WIP
Apr 28, 2023
e49a484
Fix package imports at places to avoid using 2 version of the lib
Apr 28, 2023
ad19cb3
Several fixed to get state working in python
Apr 29, 2023
5593c86
dev2
Apr 29, 2023
3bf6c31
Fix reference type values
May 7, 2023
1ac86ac
Dev3
May 7, 2023
c3021b0
Dev 4 after rebase
May 8, 2023
d7aa5e0
Fix same type not being accepted for state
May 8, 2023
27c09d7
Add some unit test around possible threading problems
May 9, 2023
3ceefc3
Add option to reset state
May 9, 2023
819a167
update to dev 5
May 9, 2023
67dcb03
Update how python deals with state type
May 11, 2023
9a1d965
dev7 to test native arm64 build
May 15, 2023
124804a
PR Fixes
May 17, 2023
3af02fa
Rename GetState to GetDictionaryState
May 17, 2023
fb7039e
Remove native from generated python
May 17, 2023
a2cdf4a
Generate docs for state
May 17, 2023
532a72b
Missed PR comments fixed
May 18, 2023
9d16548
Merge remote-tracking branch 'origin/main' into feature/state-helpers
May 19, 2023
5450f7d
Add documentation, some missing annotations and methods
May 19, 2023
7d9f6b0
regen docs
May 19, 2023
94ec78e
Fix docs
May 23, 2023
95bfd83
Merge remote-tracking branch 'origin/main' into feature/state-helpers
May 25, 2023
4523ed2
Regen docs
May 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/api-reference/csharp/App.GetStateManager().md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[App](App.md 'QuixStreams.Streaming.App')

## App.GetStateManager() Method

Retrieves the state manager for the application

```csharp
public static QuixStreams.Streaming.States.AppStateManager GetStateManager();
```

#### Returns
[AppStateManager](AppStateManager.md 'QuixStreams.Streaming.States.AppStateManager')
17 changes: 17 additions & 0 deletions docs/api-reference/csharp/App.SetStateStorage(IStateStorage).md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[App](App.md 'QuixStreams.Streaming.App')

## App.SetStateStorage(IStateStorage) Method

Sets the state storage for the app

```csharp
public static void SetStateStorage(QuixStreams.State.Storage.IStateStorage stateStorage);
```
#### Parameters

<a name='QuixStreams.Streaming.App.SetStateStorage(QuixStreams.State.Storage.IStateStorage).stateStorage'></a>

`stateStorage` [QuixStreams.State.Storage.IStateStorage](https://docs.microsoft.com/en-us/dotnet/api/QuixStreams.State.Storage.IStateStorage 'QuixStreams.State.Storage.IStateStorage')

The state storage to use for app's state manager
2 changes: 2 additions & 0 deletions docs/api-reference/csharp/App.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ Inheritance [System.Object](https://docs.microsoft.com/en-us/dotnet/api/System.O

| Methods | |
| :--- | :--- |
| [GetStateManager()](App.GetStateManager().md 'QuixStreams.Streaming.App.GetStateManager()') | Retrieves the state manager for the application |
| [Run(CancellationToken, Action)](App.Run(CancellationToken,Action).md 'QuixStreams.Streaming.App.Run(System.Threading.CancellationToken, System.Action)') | Helper method to handle default streaming behaviors and handle automatic resource cleanup on shutdown<br/>It also ensures topic consumers defined at the time of invocation are subscribed to receive messages. |
| [SetStateStorage(IStateStorage)](App.SetStateStorage(IStateStorage).md 'QuixStreams.Streaming.App.SetStateStorage(QuixStreams.State.Storage.IStateStorage)') | Sets the state storage for the app |
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming.States](QuixStreams.Streaming.States.md 'QuixStreams.Streaming.States').[AppStateManager](AppStateManager.md 'QuixStreams.Streaming.States.AppStateManager')

## AppStateManager(IStateStorage, ILoggerFactory) Constructor

Initializes a new instance of the AppStateManager class.

```csharp
public AppStateManager(QuixStreams.State.Storage.IStateStorage storage, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory=null);
```
#### Parameters

<a name='QuixStreams.Streaming.States.AppStateManager.AppStateManager(QuixStreams.State.Storage.IStateStorage,Microsoft.Extensions.Logging.ILoggerFactory).storage'></a>

`storage` [QuixStreams.State.Storage.IStateStorage](https://docs.microsoft.com/en-us/dotnet/api/QuixStreams.State.Storage.IStateStorage 'QuixStreams.State.Storage.IStateStorage')

The state storage the use for the application state

<a name='QuixStreams.Streaming.States.AppStateManager.AppStateManager(QuixStreams.State.Storage.IStateStorage,Microsoft.Extensions.Logging.ILoggerFactory).loggerFactory'></a>

`loggerFactory` [Microsoft.Extensions.Logging.ILoggerFactory](https://docs.microsoft.com/en-us/dotnet/api/Microsoft.Extensions.Logging.ILoggerFactory 'Microsoft.Extensions.Logging.ILoggerFactory')

The logger factory to use
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming.States](QuixStreams.Streaming.States.md 'QuixStreams.Streaming.States').[AppStateManager](AppStateManager.md 'QuixStreams.Streaming.States.AppStateManager')

## AppStateManager.DeleteTopicState(string) Method

Deletes the topic state with the specified name

```csharp
public bool DeleteTopicState(string topicName);
```
#### Parameters

<a name='QuixStreams.Streaming.States.AppStateManager.DeleteTopicState(string).topicName'></a>

`topicName` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

#### Returns
[System.Boolean](https://docs.microsoft.com/en-us/dotnet/api/System.Boolean 'System.Boolean')
Whether the topic state was deleted
14 changes: 14 additions & 0 deletions docs/api-reference/csharp/AppStateManager.DeleteTopicStates().md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming.States](QuixStreams.Streaming.States.md 'QuixStreams.Streaming.States').[AppStateManager](AppStateManager.md 'QuixStreams.Streaming.States.AppStateManager')

## AppStateManager.DeleteTopicStates() Method

Deletes all topic states for the current app.

```csharp
public int DeleteTopicStates();
```

#### Returns
[System.Int32](https://docs.microsoft.com/en-us/dotnet/api/System.Int32 'System.Int32')
The number of topic states that were deleted.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming.States](QuixStreams.Streaming.States.md 'QuixStreams.Streaming.States').[AppStateManager](AppStateManager.md 'QuixStreams.Streaming.States.AppStateManager')

## AppStateManager.GetTopicStateManager(string) Method

Gets an instance of the [TopicStateManager](TopicStateManager.md 'QuixStreams.Streaming.States.TopicStateManager') class for the specified [topicName](AppStateManager.GetTopicStateManager(string).md#QuixStreams.Streaming.States.AppStateManager.GetTopicStateManager(string).topicName 'QuixStreams.Streaming.States.AppStateManager.GetTopicStateManager(string).topicName').

```csharp
public QuixStreams.Streaming.States.TopicStateManager GetTopicStateManager(string topicName);
```
#### Parameters

<a name='QuixStreams.Streaming.States.AppStateManager.GetTopicStateManager(string).topicName'></a>

`topicName` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

The topic name

#### Returns
[TopicStateManager](TopicStateManager.md 'QuixStreams.Streaming.States.TopicStateManager')
The newly created [TopicStateManager](TopicStateManager.md 'QuixStreams.Streaming.States.TopicStateManager') instance.
14 changes: 14 additions & 0 deletions docs/api-reference/csharp/AppStateManager.GetTopicStates().md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming.States](QuixStreams.Streaming.States.md 'QuixStreams.Streaming.States').[AppStateManager](AppStateManager.md 'QuixStreams.Streaming.States.AppStateManager')

## AppStateManager.GetTopicStates() Method

Returns an enumerable collection of all available topic states for the current app.

```csharp
public System.Collections.Generic.IEnumerable<string> GetTopicStates();
```

#### Returns
[System.Collections.Generic.IEnumerable&lt;](https://docs.microsoft.com/en-us/dotnet/api/System.Collections.Generic.IEnumerable-1 'System.Collections.Generic.IEnumerable`1')[System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')[&gt;](https://docs.microsoft.com/en-us/dotnet/api/System.Collections.Generic.IEnumerable-1 'System.Collections.Generic.IEnumerable`1')
An enumerable collection of string values representing the topic state names.
23 changes: 23 additions & 0 deletions docs/api-reference/csharp/AppStateManager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming.States](QuixStreams.Streaming.States.md 'QuixStreams.Streaming.States')

## AppStateManager Class

Manages the states of a app.

```csharp
public class AppStateManager
```

Inheritance [System.Object](https://docs.microsoft.com/en-us/dotnet/api/System.Object 'System.Object') &#129106; AppStateManager

| Constructors | |
| :--- | :--- |
| [AppStateManager(IStateStorage, ILoggerFactory)](AppStateManager.AppStateManager(IStateStorage,ILoggerFactory).md 'QuixStreams.Streaming.States.AppStateManager.AppStateManager(QuixStreams.State.Storage.IStateStorage, Microsoft.Extensions.Logging.ILoggerFactory)') | Initializes a new instance of the AppStateManager class. |

| Methods | |
| :--- | :--- |
| [DeleteTopicState(string)](AppStateManager.DeleteTopicState(string).md 'QuixStreams.Streaming.States.AppStateManager.DeleteTopicState(string)') | Deletes the topic state with the specified name |
| [DeleteTopicStates()](AppStateManager.DeleteTopicStates().md 'QuixStreams.Streaming.States.AppStateManager.DeleteTopicStates()') | Deletes all topic states for the current app. |
| [GetTopicStateManager(string)](AppStateManager.GetTopicStateManager(string).md 'QuixStreams.Streaming.States.AppStateManager.GetTopicStateManager(string)') | Gets an instance of the [TopicStateManager](TopicStateManager.md 'QuixStreams.Streaming.States.TopicStateManager') class for the specified [topicName](AppStateManager.GetTopicStateManager(string).md#QuixStreams.Streaming.States.AppStateManager.GetTopicStateManager(string).topicName 'QuixStreams.Streaming.States.AppStateManager.GetTopicStateManager(string).topicName'). |
| [GetTopicStates()](AppStateManager.GetTopicStates().md 'QuixStreams.Streaming.States.AppStateManager.GetTopicStates()') | Returns an enumerable collection of all available topic states for the current app. |
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[IKafkaStreamingClient](IKafkaStreamingClient.md 'QuixStreams.Streaming.IKafkaStreamingClient')

## IKafkaStreamingClient.GetRawTopicConsumer(string, string, Nullable<AutoOffsetReset>) Method

Gets a topic consumer capable of subscribing to receive non-quixstreams incoming messages.

```csharp
QuixStreams.Streaming.Raw.IRawTopicConsumer GetRawTopicConsumer(string topic, string consumerGroup=null, System.Nullable<QuixStreams.Telemetry.Kafka.AutoOffsetReset> autoOffset=null);
```
#### Parameters

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetRawTopicConsumer(string,string,System.Nullable_QuixStreams.Telemetry.Kafka.AutoOffsetReset_).topic'></a>

`topic` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

Name of the topic.

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetRawTopicConsumer(string,string,System.Nullable_QuixStreams.Telemetry.Kafka.AutoOffsetReset_).consumerGroup'></a>

`consumerGroup` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

The consumer group id to use for consuming messages. If null, consumer group is not used and only consuming new messages.

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetRawTopicConsumer(string,string,System.Nullable_QuixStreams.Telemetry.Kafka.AutoOffsetReset_).autoOffset'></a>

`autoOffset` [System.Nullable&lt;](https://docs.microsoft.com/en-us/dotnet/api/System.Nullable-1 'System.Nullable`1')[QuixStreams.Telemetry.Kafka.AutoOffsetReset](https://docs.microsoft.com/en-us/dotnet/api/QuixStreams.Telemetry.Kafka.AutoOffsetReset 'QuixStreams.Telemetry.Kafka.AutoOffsetReset')[&gt;](https://docs.microsoft.com/en-us/dotnet/api/System.Nullable-1 'System.Nullable`1')

The offset to use when there is no saved offset for the consumer group.

#### Returns
[IRawTopicConsumer](IRawTopicConsumer.md 'QuixStreams.Streaming.Raw.IRawTopicConsumer')
Instance of [IRawTopicConsumer](IRawTopicConsumer.md 'QuixStreams.Streaming.Raw.IRawTopicConsumer')
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[IKafkaStreamingClient](IKafkaStreamingClient.md 'QuixStreams.Streaming.IKafkaStreamingClient')

## IKafkaStreamingClient.GetRawTopicProducer(string) Method

Gets a topic producer capable of publishing non-quixstreams messages.

```csharp
QuixStreams.Streaming.Raw.IRawTopicProducer GetRawTopicProducer(string topic);
```
#### Parameters

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetRawTopicProducer(string).topic'></a>

`topic` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

Name of the topic.

#### Returns
[IRawTopicProducer](IRawTopicProducer.md 'QuixStreams.Streaming.Raw.IRawTopicProducer')
Instance of [IRawTopicProducer](IRawTopicProducer.md 'QuixStreams.Streaming.Raw.IRawTopicProducer')
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[IKafkaStreamingClient](IKafkaStreamingClient.md 'QuixStreams.Streaming.IKafkaStreamingClient')

## IKafkaStreamingClient.GetTopicConsumer(string, string, CommitOptions, AutoOffsetReset) Method

Gets a topic consumer capable of subscribing to receive incoming streams.

```csharp
QuixStreams.Streaming.ITopicConsumer GetTopicConsumer(string topic, string consumerGroup=null, QuixStreams.Transport.Fw.CommitOptions options=null, QuixStreams.Telemetry.Kafka.AutoOffsetReset autoOffset=QuixStreams.Telemetry.Kafka.AutoOffsetReset.Latest);
```
#### Parameters

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetTopicConsumer(string,string,QuixStreams.Transport.Fw.CommitOptions,QuixStreams.Telemetry.Kafka.AutoOffsetReset).topic'></a>

`topic` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

Name of the topic.

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetTopicConsumer(string,string,QuixStreams.Transport.Fw.CommitOptions,QuixStreams.Telemetry.Kafka.AutoOffsetReset).consumerGroup'></a>

`consumerGroup` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

The consumer group id to use for consuming messages. If null, consumer group is not used and only consuming new messages.

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetTopicConsumer(string,string,QuixStreams.Transport.Fw.CommitOptions,QuixStreams.Telemetry.Kafka.AutoOffsetReset).options'></a>

`options` [QuixStreams.Transport.Fw.CommitOptions](https://docs.microsoft.com/en-us/dotnet/api/QuixStreams.Transport.Fw.CommitOptions 'QuixStreams.Transport.Fw.CommitOptions')

The settings to use for committing

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetTopicConsumer(string,string,QuixStreams.Transport.Fw.CommitOptions,QuixStreams.Telemetry.Kafka.AutoOffsetReset).autoOffset'></a>

`autoOffset` [QuixStreams.Telemetry.Kafka.AutoOffsetReset](https://docs.microsoft.com/en-us/dotnet/api/QuixStreams.Telemetry.Kafka.AutoOffsetReset 'QuixStreams.Telemetry.Kafka.AutoOffsetReset')

The offset to use when there is no saved offset for the consumer group.

#### Returns
[ITopicConsumer](ITopicConsumer.md 'QuixStreams.Streaming.ITopicConsumer')
Instance of [ITopicConsumer](ITopicConsumer.md 'QuixStreams.Streaming.ITopicConsumer')
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[IKafkaStreamingClient](IKafkaStreamingClient.md 'QuixStreams.Streaming.IKafkaStreamingClient')

## IKafkaStreamingClient.GetTopicProducer(string) Method

Gets a topic producer capable of publishing stream messages.

```csharp
QuixStreams.Streaming.ITopicProducer GetTopicProducer(string topic);
```
#### Parameters

<a name='QuixStreams.Streaming.IKafkaStreamingClient.GetTopicProducer(string).topic'></a>

`topic` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

Name of the topic.

#### Returns
[ITopicProducer](ITopicProducer.md 'QuixStreams.Streaming.ITopicProducer')
Instance of [ITopicProducer](ITopicProducer.md 'QuixStreams.Streaming.ITopicProducer')
18 changes: 18 additions & 0 deletions docs/api-reference/csharp/IKafkaStreamingClient.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming')

## IKafkaStreamingClient Interface

```csharp
public interface IKafkaStreamingClient
```

Derived
&#8627; [KafkaStreamingClient](KafkaStreamingClient.md 'QuixStreams.Streaming.KafkaStreamingClient')

| Methods | |
| :--- | :--- |
| [GetRawTopicConsumer(string, string, Nullable&lt;AutoOffsetReset&gt;)](IKafkaStreamingClient.GetRawTopicConsumer(string,string,Nullable_AutoOffsetReset_).md 'QuixStreams.Streaming.IKafkaStreamingClient.GetRawTopicConsumer(string, string, System.Nullable<QuixStreams.Telemetry.Kafka.AutoOffsetReset>)') | Gets a topic consumer capable of subscribing to receive non-quixstreams incoming messages. |
| [GetRawTopicProducer(string)](IKafkaStreamingClient.GetRawTopicProducer(string).md 'QuixStreams.Streaming.IKafkaStreamingClient.GetRawTopicProducer(string)') | Gets a topic producer capable of publishing non-quixstreams messages. |
| [GetTopicConsumer(string, string, CommitOptions, AutoOffsetReset)](IKafkaStreamingClient.GetTopicConsumer(string,string,CommitOptions,AutoOffsetReset).md 'QuixStreams.Streaming.IKafkaStreamingClient.GetTopicConsumer(string, string, QuixStreams.Transport.Fw.CommitOptions, QuixStreams.Telemetry.Kafka.AutoOffsetReset)') | Gets a topic consumer capable of subscribing to receive incoming streams. |
| [GetTopicProducer(string)](IKafkaStreamingClient.GetTopicProducer(string).md 'QuixStreams.Streaming.IKafkaStreamingClient.GetTopicProducer(string)') | Gets a topic producer capable of publishing stream messages. |
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#### [QuixStreams.Streaming](index.md 'index')
### [QuixStreams.Streaming](QuixStreams.Streaming.md 'QuixStreams.Streaming').[IQuixStreamingClient](IQuixStreamingClient.md 'QuixStreams.Streaming.IQuixStreamingClient')

## IQuixStreamingClient.GetRawTopicConsumer(string, string, Nullable<AutoOffsetReset>) Method

Gets a topic consumer capable of subscribing to receive non-quixstreams incoming messages.

```csharp
QuixStreams.Streaming.Raw.IRawTopicConsumer GetRawTopicConsumer(string topicIdOrName, string consumerGroup=null, System.Nullable<QuixStreams.Telemetry.Kafka.AutoOffsetReset> autoOffset=null);
```
#### Parameters

<a name='QuixStreams.Streaming.IQuixStreamingClient.GetRawTopicConsumer(string,string,System.Nullable_QuixStreams.Telemetry.Kafka.AutoOffsetReset_).topicIdOrName'></a>

`topicIdOrName` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

Id or name of the topic. If name is provided, workspace will be derived from environment variable or token, in that order

<a name='QuixStreams.Streaming.IQuixStreamingClient.GetRawTopicConsumer(string,string,System.Nullable_QuixStreams.Telemetry.Kafka.AutoOffsetReset_).consumerGroup'></a>

`consumerGroup` [System.String](https://docs.microsoft.com/en-us/dotnet/api/System.String 'System.String')

The consumer group id to use for consuming messages. If null, consumer group is not used and only consuming new messages.

<a name='QuixStreams.Streaming.IQuixStreamingClient.GetRawTopicConsumer(string,string,System.Nullable_QuixStreams.Telemetry.Kafka.AutoOffsetReset_).autoOffset'></a>

`autoOffset` [System.Nullable&lt;](https://docs.microsoft.com/en-us/dotnet/api/System.Nullable-1 'System.Nullable`1')[QuixStreams.Telemetry.Kafka.AutoOffsetReset](https://docs.microsoft.com/en-us/dotnet/api/QuixStreams.Telemetry.Kafka.AutoOffsetReset 'QuixStreams.Telemetry.Kafka.AutoOffsetReset')[&gt;](https://docs.microsoft.com/en-us/dotnet/api/System.Nullable-1 'System.Nullable`1')

The offset to use when there is no saved offset for the consumer group.

#### Returns
[IRawTopicConsumer](IRawTopicConsumer.md 'QuixStreams.Streaming.Raw.IRawTopicConsumer')
Instance of [IRawTopicConsumer](IRawTopicConsumer.md 'QuixStreams.Streaming.Raw.IRawTopicConsumer')
Loading