diff --git a/docs/develop/dotnet/streams/blocking-reads/blocking-reads.md b/docs/develop/dotnet/streams/blocking-reads/blocking-reads.md new file mode 100644 index 0000000000..5e30ddf4a8 --- /dev/null +++ b/docs/develop/dotnet/streams/blocking-reads/blocking-reads.md @@ -0,0 +1,22 @@ +--- +id: blocking-reads +title: Blocking Stream Reads +sidebar_label: Blocking Stream Reads +slug: /develop/dotnet/streams/blocking-reads +--- + +[Redis Streams](https://redis.io/topics/streams-intro) can be used to build a message bus for our applications. The ability of multiple readers to consume messages from a Redis Stream in a consumer group makes Redis Streams ideal for a variety of use cases where you want the assurance of message delivery and where you have high volumes of data you want to distribute across multiple consumers. + +One of the great things about Redis Streams is that you can reduce the number of requests you need to make to Redis by having consumers use blocking requests and wait for new messages to come into the stream. In terms of commands, this would look something like this: + +```bash +XREADGROUP GROUP average avg1 COUNT 1 BLOCK 1000 STREAMS numbers > +``` + +Or, for a simple XREAD, you can wait for the next message to come in: + +```bash +127.0.0.1:6379> XREAD BLOCK 1000 STREAMS numbers $ +``` + +The main .NET Redis client [StackExchange.Redis](https://github.com/StackExchange/StackExchange.Redis) does not support this particular feature. The reason for this lack of support is architectural, the StackExchange.Redis client centers all commands to Redis around a single connection. Because of this, blocking that connection for a single client will block all other requests to Redis. If we want to do blocking stream reads with Redis in .NET we'll need to use different clients to do so. Contained in this section are tutorials for doing so with both [ServiceStack.Redis](blocking-reads/service-stack) and [CsRedis](blocking-reads/cs-redis) \ No newline at end of file diff --git a/docs/develop/dotnet/streams/blocking-reads/cs-redis/cs-redis.md b/docs/develop/dotnet/streams/blocking-reads/cs-redis/cs-redis.md new file mode 100644 index 0000000000..75500f9d1e --- /dev/null +++ b/docs/develop/dotnet/streams/blocking-reads/cs-redis/cs-redis.md @@ -0,0 +1,166 @@ +--- +id: cs-redis +title: Blocking Stream reads with CSRedis +sidebar_label: Blocking Stream Reads with CSRedis +slug: /develop/dotnet/streams/blocking-reads/cs-redis +--- + +[CSRedis](https://github.com/2881099/csredis) is an MIT Licensed Open source project which provides a straightforward interface for executing commands. CSRedis can be used effectively for performing blocking stream reads with the one major downside that it does not support any async API for them. + +## Start Redis + +Before we begin, we'll start up Redis. If you are developing locally, which we'll assume you are for the duration of this tutorial, you can start Redis with a simple docker command. + +```bash +docker run -p 6379:6379 redis +``` + +## Create the app + +We will build a simple console application for streaming telemetry using the library. To do so, use the `dotnet new` command: + +```bash +dotnet new console -n StreamsWithCSRedis +``` + +## Add the package to your app + +Run the `cd StreamsWithCSRedis` command to change directories into the application's directory and run the following to add the CSRedis package + +```bash +dotnet add package CSRedisCore +``` + +## Create group + +When we start up our app, the first thing we'll do is create our `avg` group. To make this group, open up `Program.cs` and add to it the following: + +```csharp +var cancellationTokenSource = new CancellationTokenSource(); +var token = cancellationTokenSource.Token; + +var client = new CSRedisClient("localhost"); +if (!client.Exists("stream") || client.XInfoStream("stream").groups == 0) +{ + client.XGroupCreate("stream", "avg", "$", MkStream: true); +} +``` + +This code will create a cancellation token for the threads we'll spin up to do the writes/reads to the stream, create a client, check if our `avg` group already exists, and finally create the `avg` group if it doesn't. + +## Write to the stream + +Next, we'll write out to the stream. We'll call the stream `stream`, and send a `temp` and `time` field along with the stream. We'll do this every 2 seconds. We'll put this on its own thread, since this operation isn't actually 'blocking' in the Redis sense, it may be alright to spin it out on its task, but as the other two operations in here are blocking, it's better to spin it off on its own thread as well. Add the following to your `Program.cs` file: + +```csharp +var writeThread = new Thread(() => +{ + var writeClient = new CSRedisClient("localhost"); + var random = new Random(); + while (!token.IsCancellationRequested) + { + writeClient.XAdd("stream", new (string, string)[]{new ("temp", random.Next(50,65).ToString()), new ("time", DateTimeOffset.Now.ToUnixTimeSeconds().ToString())}); + Thread.Sleep(2000); + } +}); +``` + +## Parsing read results + +The next issue we'll need to dispose of is parsing the read results from the `XREAD` and `XREADGROUP` commands. CSRedis handles return types generally as tuples in a reply, so we'll need a way to parse the result into something more useable. In this case, we'll parse the results into a dictionary. For the sake of brevity, we will keep everything in this project in `Program.cs` on the top-level method, so we'll declare a `Func` to handle the parsing. This function will pull the first message from the first stream and arrange the values returned into a dictionary. A couple of things to consider here if you wanted to expand this further is that you could reply with a dictionary of dictionaries if you were pulling back multiple messages from multiple streams. This complexity is intentionally left out. + + +```csharp +Func<(string key, (string id, string[] items)[] data), Dictionary> parse = delegate((string key, (string id, string[] items)[] data) streamResult) +{ + var message = streamResult.data.First().items; + var result = new Dictionary(); + for (var i = 0; i < message.Length; i += 2) + { + result.Add(message[i], message[i+1]); + } + + return result; +}; +``` + +## Blocking XREAD + +There are two primary types of 'read' methods, `XREAD` and `XREADGROUP`, this is in addition to the various range methods, which are their category and operate semantically differently from the read operations. `XREAD` lets you read off a given stream and read the *next* item that hit's the stream. You can do this with the special `$` id. For our purposes here, we are going to block for two seconds, or whenever we get a response back from redis, whichever comes first: + +```csharp +var readThread = new Thread(() => +{ + var readClient = new CSRedisClient("localhost"); + while (!token.IsCancellationRequested) + { + var result = readClient.XRead(1, 5000, new (string key, string id)[] {new("stream", "$")}); + if (result != null) + { + var dictionary = parse(result[0]); + Console.WriteLine($"Most recent message, time: {dictionary["time"]} temp: {dictionary["temp"]}"); + } + } +}); +``` + +## Blocking XREADGROUP + +Blocking `XREADGROUP` commands operate very similarly to `XREAD`. In this case, however, the creation of the group told us what id to start at, and by passing in the `>` we necessarily start off at the next message in the queue. Because we are reading out of a group, we'll also want to `XACK` to any messages that we pull down. Also, since this is our average group, we'll maintain an average for our stream's temperatures. + +```csharp +var total = 0; +var count = 0; +var groupReadThread = new Thread(() => +{ + var groupReadClient = new CSRedisClient("localhost"); + var id = string.Empty; + while (!token.IsCancellationRequested) + { + if (!string.IsNullOrEmpty(id)) + { + client.XAck("stream", "avg", id); + } + var result = + groupReadClient.XReadGroup("avg", "avg-1", 1, 5000, new (string key, string id)[] {new("stream", ">")}); + if (result != null) + { + id = result.First().data.First().id; + var dictionary = parse(result[0]); + if (dictionary.ContainsKey("temp")) + { + count++; + total += int.Parse(dictionary["temp"]); + double avg = (double) total / count; + Console.WriteLine($"Most recent group message, time: {dictionary["time"]} temp: {dictionary["temp"]} avg: {avg:00.00}"); + } + } + } +}); +``` + +## Spin up threads + +The last thing we'll need to do is start up all the threads, set a cancellation timeout (so the app doesn't run forever), and join all the threads back together: + +```csharp +readThread.Start(); +writeThread.Start(); +groupReadThread.Start(); + +cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(10)); + +readThread.Join(); +writeThread.Join(); +groupReadThread.Join(); +``` + +## Run the app + +Now that the app is written, all that's left to do is run it. You can do so by running `dotnet run in your terminal. + +## Resources: + +* The source for this tutorial is in [GitHub](https://github.com/redis-developer/redis-streams-with-dotnet/tree/main/StreamsWithCSRedis) +* Redis University has an extensive [course](https://university.redis.com/courses/ru202/) on Redis Streams where you can learn everything you need to know about them. +* You can learn more about Redis Streams in the [Streams Info](https://redis.io/topics/streams-intro) article on redis.io \ No newline at end of file diff --git a/docs/develop/dotnet/streams/blocking-reads/service-stack/service-stack.md b/docs/develop/dotnet/streams/blocking-reads/service-stack/service-stack.md new file mode 100644 index 0000000000..28a86d7046 --- /dev/null +++ b/docs/develop/dotnet/streams/blocking-reads/service-stack/service-stack.md @@ -0,0 +1,196 @@ +--- +id: service-stack +title: How to handle blocking stream reads with ServiceStack.Redis +sidebar_label: Blocking Stream Reads with ServiceStack.Redis +slug: /develop/dotnet/streams/blocking-reads/service-stack +--- + +[ServiceStack.Redis](https://github.com/ServiceStack/ServiceStack.Redis) is part of the ServiceStack suite, it has some restrictions when used for commercial purposes - see their [license](https://github.com/ServiceStack/ServiceStack.Redis/blob/master/license.txt) + +## Start Redis + +If you're developing locally (which is what we will assume for the balance of this tutorial), you can start Redis fairly quickly with docker: + +```bash +docker run -p 6379:6379 redis +``` + +## Create the app + +We will build a simple console application for streaming telemetry using the library. To do so, use the `dotnet new` command: + +```bash +dotnet new console -n StreamsWithServiceStack +``` + +### Add the package to your app + +You can add this package to your app with: + +```bash +dotnet add package ServiceStack.Redis +``` + +### Initialize the client manager + +To initialize a client with ServiceStack, you'll need to create a [`RedisClientManager`](https://github.com/ServiceStack/ServiceStack.Redis#redis-client-managers). Then, add the following to `Program.cs`. + +```csharp +var manager = new BasicRedisClientManager("localhost"); +``` + +### Add items to streams + +Redis streams are not yet fully supported by ServiceStack.Redis, however, you can run raw commands easily with the `CustomAsync` method. So let's create a new class called `Producer.cs` and add the following to it. + +```csharp +public static class Producer +{ + public static async Task Produce(BasicRedisClientManager manager, CancellationToken token) + { + var client = await manager.GetClientAsync(token); + var random = new Random(); + while (!token.IsCancellationRequested) + { + await client.CustomAsync("XADD", "telemetry", "*", "temp",random.Next(50,65), "time", DateTimeOffset.Now.ToUnixTimeSeconds()); + await Task.Delay(10000, token); + } + } +} +``` + +This code will send new telemetry every 10 seconds to the `telemetry` stream, with a `temp` record and a `time` record. + +### Reading messages + +As mentioned earlier, ServiceStack does not have native support for the Streams API, so we need to do a bit of work after retrieving a record from a stream. However, this isn't a complex operation since the resulting structure is a predictable set of nested arrays going from an array of the streams requested to an array of messages retrieved from each stream to the message itself split between its id and its attributes. Finally, the field value pairs within a message; this looks something like this: + +``` +127.0.0.1:6379> XREAD COUNT 1 BLOCK 20000 STREAMS telemetry $ +1) 1) "telemetry" + 2) 1) 1) "1642857504469-0" + 2) 1) "temp" + 2) "57" + 3) "time" + 4) "1642857504" +``` + +This data structure is pretty predictable to parse, so we'll add a little parsing method. First, Create `Consumer.cs` and add the following to it: + +```csharp +using ServiceStack.Redis; + +namespace StreamsWithServicestack; + +public static class Consumer +{ + public static IDictionary ParseStreamResult(RedisText text, out string id) + { + var result = new Dictionary(); + + var fieldValPairs = text.Children[0].Children[1].Children[0].Children[1].Children; + id = text.Children[0].Children[1].Children[0].Children[0].Text; + for (var i = 0; i < fieldValPairs.Count; i += 2) + { + result.Add(fieldValPairs[i].Text, fieldValPairs[i+1].Text); + } + + return result; + } +} +``` + +`ParseStreamResult` will yield the first message from the first stream of an `XREAD` or `XREADGROUP`, this isn't a fully generalized solution but will serve our purposes here. + +### Reading a stream outside a group with XREAD + +To read the next message in a stream, which is necessarily a blocking operation, you will use the `XREAD` command with the `BLOCK` option and the special `$` id. Then, in the `Consumer` class, add the following, which will read off the stream in a continuous loop, blocking for 20 seconds at each request. + +```csharp +public static async Task Consume(IRedisClientsManagerAsync manager, CancellationToken token) +{ + var client = await manager.GetClientAsync(token); + while (!token.IsCancellationRequested) + { + string id; + var result = await client.CustomAsync("XREAD", "COUNT", 1, "BLOCK", 20000, "STREAMS", "telemetry", "$"); + var fvp = ParseStreamResult(result, out id); + Console.WriteLine($"read: result {id} - temp: {fvp["temp"]} time: {fvp["time"]}"); + } +} +``` + +### Reading with consumer groups + +Reading messages in a consumer group can be helpful in cases where you have a common task that you want to distribute across many consumers in a high-throughput environment. It's a two-step process: + +1. Read the stream +2. Acknowledge receipt of the message + +This task can be done by running an `XREADGROUP` and a `XACK` back to back. The `XREADGROUP` will take, in addition to the parameters we spoke about for the `XREAD`, the `GROUP` name, the consumer's name, and instead of taking the special `$` id, it will take the special `>` id, which will have it take the next unassigned id for the group. We'll then extract the information from it, update our average, and then acknowledge the receipt of the message. + +```csharp +public static async Task ConsumeFromGroup(IRedisClientsManagerAsync manager, CancellationToken token) +{ + var client = await manager.GetClientAsync(token); + var total = 0; + var num = 0; + while (!token.IsCancellationRequested) + { + string id; + var result = await client.CustomAsync("XREADGROUP", "GROUP", "avg", "avg-1", "COUNT", "1", "BLOCK", + 20000, "STREAMS", "telemetry", ">"); + var fvp = ParseStreamResult(result, out id); + total += int.Parse(fvp["temp"]); + num++; + Console.WriteLine( + $"Group-read: result {id} - temp: {fvp["temp"]} time: {fvp["time"]}, current average: {total / num}"); + await client.CustomAsync("XACK", "telemetry", "avg", id); + } +} +``` + +### Create the group and start the tasks + +The final bit we need is to create the group and start up all the tasks. We'll use the `XGROUP` command with the `MKSTREAM` option to create the group. We'll then start up all the tasks we need for our producer and consumers, and we'll await everything. Add the following to your `Program.cs` file: + +```csharp +using ServiceStack.Redis; +using StreamsWithServicestack; + +var manager = new BasicRedisClientManager("localhost"); +var asyncClient = await manager.GetClientAsync(); + +var tokenSource = new CancellationTokenSource(); +var token = tokenSource.Token; + +try +{ + await asyncClient.CustomAsync("XGROUP", "CREATE", "telemetry", "avg", "0-0", "MKSTREAM"); +} +catch (Exception ex) +{ + Console.WriteLine(ex); +} + +var writeTask = Producer.Produce(manager, token); +var readTask = Consumer.Consume(manager, token); +var groupReadTask = Consumer.ConsumeFromGroup(manager, token); + +await Task.WhenAll(writeTask, readTask, groupReadTask); + +``` + +## Run the app + +All that's left to do is to run the app, and you'll see a continuous stream of messages coming in every 10 seconds. You can run the app by running: + +```bash +dotnet run +``` + +## Resources: + +* The source for this tutorial is in [GitHub](https://github.com/redis-developer/redis-streams-with-dotnet/tree/main/StreamsWithServicestack) +* Redis University has an extensive [course](https://university.redis.com/courses/ru202/) on Redis Streams where you can learn everything you need to know about them. +* You can learn more about Redis Streams in the [Streams Info](https://redis.io/topics/streams-intro) article on redis.io \ No newline at end of file diff --git a/docs/develop/dotnet/streams/streams-basics.md b/docs/develop/dotnet/streams/streams-basics.md new file mode 100644 index 0000000000..9d4054524e --- /dev/null +++ b/docs/develop/dotnet/streams/streams-basics.md @@ -0,0 +1,168 @@ +--- +id: stream-basics +title: How to use Redis Streams with .NET +sidebar_label: Using Redis Streams with .NET +slug: /develop/dotnet/streams/stream-basics +--- + +Redis Streams are a powerful data structure that allows you to use Redis as a sort of Message bus to transport messages between different application components. The way streams operate in Redis is very fast and memory efficient. This article will not go over the minutia of every command available for Redis Streams, but rather it's aimed to provide a high-level tutorial for how you can use Redis Streams with .NET. + +## Start Redis + +The first thing we'll want to do is start Redis. If you already have an instance of Redis, you can ignore this bit and adjust the connection step below to connect to your instance of Redis. Redis is straightforward to get up and running; you can do so using docker: + +```bash +docker run -p 6379:6379 redis +``` + +## Create your .NET app + +For simplicity's sake, we'll stick to a simple console app, from which we'll spin out a few tasks that will perform the various add/read operations that we'll use. Create a new console app with the `dotnet new` command: + +```bash +dotnet new console -n RedisStreamsBasics +``` + +## Add StackExchange.Redis package + +Next, we'll need to add the client library that we will use to interface with Redis StackExchange.Redis is the canonical package, thus, we will use that in this example. First cd into the RedisStreamsBasics directory and then run the `dotnet add package` directory: + + +```bash +cd RedisStreamsBasics +dotnet add package StackExchange.Redis +``` + +## Initialize the Multiplexer + +StackExchange.Redis centers more or less around the `ConnectionMultiplexer`, which handles the routing and queuing of all commands that you send to Redis. So our first step that's code-related is to initialize the Multiplexer. Creating the Multiplexer is pretty straightforward; open up `Program.cs` in your IDE and add the following bit to it: + +```csharp +using StackExchange.Redis; + +var tokenSource = new CancellationTokenSource(); +var token = tokenSource.Token; + +var muxer = ConnectionMultiplexer.Connect("localhost"); +var db = muxer.GetDatabase(); + +const string streamName = "telemetry"; +const string groupName = "avg"; +``` + +We're also initializing a `CancellationToken` and `CancellationTokenSource` here. We'll set these up towards the end of this tutorial so that this application does not run endlessly. Also, we're creating a couple of constants, the stream's name and the group's name, that we'll use later, and we are also grabbing an `IDatabase` object from the Multiplexer to use + +## Create the consumer group + +A Consumer Group in a Redis Stream allows you to group a bunch of consumers to pull messages off the stream for the group. This functionality is excellent when you have high throughput workloads, and you want to scale out the workers who will process your messages. To use a consumer group, you first need to create it. To create a consumer group, you'll use the `StreamCreateConsumerGroupAsync` method, passing in the `streamName` and `groupName`, as well as the starting id - we'll use the `0-0` id (the lowest id allowable in Redis Streams). Before invoking this call, it's wise to validate that the group doesn't exist yet, as creating an already existing user group will result in an error. So first, we'll check if the stream exists; if it doesn't, we can create the group. Next, we'll use the stream info method to see if any groups match the `avg` `groupName`. + +```csharp +if (!(await db.KeyExistsAsync(streamName)) || + (await db.StreamGroupInfoAsync(streamName)).All(x=>x.Name!=groupName)) +{ + await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", true); +} +``` + +## Spin up producer task + +Three tasks will run in parallel for our program. The first is the `producerTask`. This Task will write a random number between 50 and 65 as the `temp` and send the current time as the `time`. + +```csharp +var producerTask = Task.Run(async () => +{ + var random = new Random(); + while (!token.IsCancellationRequested) + { + await db.StreamAddAsync(streamName, + new NameValueEntry[] + {new("temp", random.Next(50, 65)), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())}); + await Task.Delay(2000); + } +}); +``` + +## Parser helper function for reading results + +The results retrieved from Redis will be in a reasonably readable form; all the same, it is helpful for our purposes to parse the result into a dictionary. To do this, add an inline function to handle the parsing: + +```csharp +Dictionary ParseResult(StreamEntry entry) => entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString()); +``` + +> Note: Stream messages enforce no requirement that field names be unique. We use a dictionary for clarity sake in this example, but you will need to ensure that you are not passing in multiple fields with the same names in your usage to prevent an issue using a dictionary. + + +## Spin up most recent element task + +Next, we'll need to spin up a task to read the most recent element off of the stream. To do this, we'll use the `StreamRangeAsync` method passing in two special ids, `-` which means the lowest id, and `+`, which means the highest id. Running this command will result in some duplication. This redundancy is necessary because the `StackExchange.Redis` library does not support blocking stream reads and does not support the special `$` character for stream reads. Overcoming this behavior is explored in-depth in the [Blocking Reads](blocking-reads) tutorial. For this tutorial, you can manage these most-recent reads with the following code: + +```csharp +var readTask = Task.Run(async () => +{ + while (!token.IsCancellationRequested) + { + var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending); + if (result.Any()) + { + var dict = ParseResult(result.First()); + Console.WriteLine($"Read result: temp {dict["temp"]} time: {dict["time"]}"); + } + + await Task.Delay(1000); + } +}); +``` + +## Spin up consumer group read Task + +The final Task we'll spin up is the read task for the consumer group. Due to the nature of consumer groups, you can spin this Task up multiple times to scale out the processing as needed. It's the responsibility of Redis to keep track of which messages it's distributed to the consumer group. As well as tracking which messages Consumers have acknowledged. Acknowledging messages adds a layer of validation that all messages were processed. If something happens to one of your processing tasks or processes, you can more easily know what messages you missed. + +We'll check to see if we have a recent message-id to handle all of this. If we do, we will send an acknowledgment to the server that the id was processed. Then we will grab the next message to be processed from the stream, pull out the data and the id and print out the result. + +```csharp +double count = default; +double total = default; + +var consumerGroupReadTask = Task.Run(async () => +{ + string id = string.Empty; + while (!token.IsCancellationRequested) + { + if (!string.IsNullOrEmpty(id)) + { + await db.StreamAcknowledgeAsync(streamName, groupName, id); + id = string.Empty; + } + var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1); + if (result.Any()) + { + id = result.First().Id; + count++; + var dict = ParseResult(result.First()); + total += double.Parse(dict["temp"]); + Console.WriteLine($"Group read result: temp: {dict["temp"]}, time: {dict["time"]}, current average: {total/count:00.00}"); + } + await Task.Delay(1000); + } +}); +``` + +## Set timeout and await tasks + +Finally, we need to set the timeout and await the tasks at the end of our program: + +```csharp +tokenSource.CancelAfter(TimeSpan.FromSeconds(20)); +await Task.WhenAll(producerTask, readTask, consumerGroupReadTask); +``` + +## Run the app + +You can now run this app with the `dotnet run` command. + +## Resources: + +* The source for this tutorial is in [GitHub](https://github.com/redis-developer/redis-streams-with-dotnet/tree/main/RedisStreamsStackExchange) +* Redis University has an extensive [course](https://university.redis.com/courses/ru202/) on Redis Streams where you can learn everything you need to know about them. +* You can learn more about Redis Streams in the [Streams Info](https://redis.io/topics/streams-intro) article on redis.io \ No newline at end of file diff --git a/sidebars.js b/sidebars.js index 88e3308294..93eb42548c 100644 --- a/sidebars.js +++ b/sidebars.js @@ -217,6 +217,21 @@ module.exports = { ] } ] + }, + { + type: 'category', + label: 'Streams', + items:['develop/dotnet/streams/stream-basics', + { + type:'category', + label: "Blocking Reads", + items:[ + 'develop/dotnet/streams/blocking-reads/blocking-reads', + 'develop/dotnet/streams/blocking-reads/service-stack/service-stack', + 'develop/dotnet/streams/blocking-reads/cs-redis/cs-redis', + ] + } + ] } ] },