diff --git a/osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs b/osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs index 25ee536..7427506 100644 --- a/osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs +++ b/osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs @@ -73,6 +73,35 @@ public void SendThenReceive_Multiple() Assert.Equal(objects, receivedObjects); } + [Fact] + public void SendThenReceive_MultipleUsingSingleCall() + { + const int send_count = 10000; + + var cts = new CancellationTokenSource(10000); + + var objects = new HashSet(); + for (int i = 0; i < send_count; i++) + objects.Add(FakeData.New()); + + var receivedObjects = new HashSet(); + + processor.PushToQueue(objects); + + processor.Received += o => + { + lock (receivedObjects) + receivedObjects.Add(o); + + if (receivedObjects.Count == send_count) + cts.Cancel(); + }; + + processor.Run(cts.Token); + + Assert.Equal(objects, receivedObjects); + } + /// /// If the processor is cancelled mid-operation, every item should either be processed or still in the queue. /// diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index c5a7455..4ccb1e0 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -208,8 +208,19 @@ private void outputStats() } } - public void PushToQueue(T obj) => - redis.GetDatabase().ListLeftPush(QueueName, JsonConvert.SerializeObject(obj)); + /// + /// Push a single item to the queue. + /// + /// + public void PushToQueue(T item) => + redis.GetDatabase().ListLeftPush(QueueName, JsonConvert.SerializeObject(item)); + + /// + /// Push multiple items to the queue. + /// + /// + public void PushToQueue(IEnumerable items) => + redis.GetDatabase().ListLeftPush(QueueName, items.Select(obj => new RedisValue(JsonConvert.SerializeObject(obj))).ToArray()); public long GetQueueSize() => redis.GetDatabase().ListLength(QueueName);