Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,35 @@ public void SendThenReceive_Multiple()
Assert.Equal(objects, receivedObjects);
}

[Fact]
public void SendThenReceive_MultipleUsingSingleCall()
{
const int send_count = 10000;

var cts = new CancellationTokenSource(10000);

var objects = new HashSet<FakeData>();
for (int i = 0; i < send_count; i++)
objects.Add(FakeData.New());

var receivedObjects = new HashSet<FakeData>();

processor.PushToQueue(objects);

processor.Received += o =>
{
lock (receivedObjects)
receivedObjects.Add(o);

if (receivedObjects.Count == send_count)
cts.Cancel();
};

processor.Run(cts.Token);

Assert.Equal(objects, receivedObjects);
}

/// <summary>
/// If the processor is cancelled mid-operation, every item should either be processed or still in the queue.
/// </summary>
Expand Down
15 changes: 13 additions & 2 deletions osu.Server.QueueProcessor/QueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,19 @@ private void outputStats()
}
}

public void PushToQueue(T obj) =>
redis.GetDatabase().ListLeftPush(QueueName, JsonConvert.SerializeObject(obj));
/// <summary>
/// Push a single item to the queue.
/// </summary>
/// <param name="item"></param>
public void PushToQueue(T item) =>
redis.GetDatabase().ListLeftPush(QueueName, JsonConvert.SerializeObject(item));

/// <summary>
/// Push multiple items to the queue.
/// </summary>
/// <param name="items"></param>
public void PushToQueue(IEnumerable<T> items) =>
redis.GetDatabase().ListLeftPush(QueueName, items.Select(obj => new RedisValue(JsonConvert.SerializeObject(obj))).ToArray());

public long GetQueueSize() =>
redis.GetDatabase().ListLength(QueueName);
Expand Down