Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use Channels as queueing mechanism for periodic websocket messages #11092

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 2 additions & 5 deletions Emby.Server.Implementations/Session/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ private static string GetSessionKey(string appName, string deviceId)

if (!_activeConnections.TryGetValue(key, out var sessionInfo))
{
_activeConnections[key] = await CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user).ConfigureAwait(false);
sessionInfo = _activeConnections[key];
sessionInfo = await CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user).ConfigureAwait(false);
_activeConnections[key] = sessionInfo;
}

sessionInfo.UserId = user?.Id ?? Guid.Empty;
Expand Down Expand Up @@ -614,9 +614,6 @@ private async void CheckForIdlePlayback(object state)
_logger.LogDebug(ex, "Error calling OnPlaybackStopped");
}
}

playingSessions = Sessions.Where(i => i.NowPlayingItem is not null)
.ToList();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ protected override void Start(WebSocketMessageInfo message)
base.Start(message);
}

private async void OnEntryCreated(object? sender, GenericEventArgs<ActivityLogEntry> e)
private void OnEntryCreated(object? sender, GenericEventArgs<ActivityLogEntry> e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ protected override void Dispose(bool dispose)
base.Dispose(dispose);
}

private async void OnTaskCompleted(object? sender, TaskCompletionEventArgs e)
private void OnTaskCompleted(object? sender, TaskCompletionEventArgs e)
{
e.Task.TaskProgress -= OnTaskProgress;
await SendData(true).ConfigureAwait(false);
SendData(true);
}

private async void OnTaskExecuting(object? sender, GenericEventArgs<IScheduledTaskWorker> e)
private void OnTaskExecuting(object? sender, GenericEventArgs<IScheduledTaskWorker> e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
e.Argument.TaskProgress += OnTaskProgress;
}

private async void OnTaskProgress(object? sender, GenericEventArgs<double> e)
private void OnTaskProgress(object? sender, GenericEventArgs<double> e)
{
await SendData(false).ConfigureAwait(false);
SendData(false);
}
}
28 changes: 14 additions & 14 deletions Jellyfin.Api/WebSocketListeners/SessionInfoWebSocketListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,38 @@ protected override void Start(WebSocketMessageInfo message)
base.Start(message);
}

private async void OnSessionManagerSessionActivity(object? sender, SessionEventArgs e)
private void OnSessionManagerSessionActivity(object? sender, SessionEventArgs e)
{
await SendData(false).ConfigureAwait(false);
SendData(false);
}

private async void OnSessionManagerCapabilitiesChanged(object? sender, SessionEventArgs e)
private void OnSessionManagerCapabilitiesChanged(object? sender, SessionEventArgs e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
}

private async void OnSessionManagerPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
private void OnSessionManagerPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
{
await SendData(!e.IsAutomated).ConfigureAwait(false);
SendData(!e.IsAutomated);
}

private async void OnSessionManagerPlaybackStopped(object? sender, PlaybackStopEventArgs e)
private void OnSessionManagerPlaybackStopped(object? sender, PlaybackStopEventArgs e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
}

private async void OnSessionManagerPlaybackStart(object? sender, PlaybackProgressEventArgs e)
private void OnSessionManagerPlaybackStart(object? sender, PlaybackProgressEventArgs e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
}

private async void OnSessionManagerSessionEnded(object? sender, SessionEventArgs e)
private void OnSessionManagerSessionEnded(object? sender, SessionEventArgs e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
}

private async void OnSessionManagerSessionStarted(object? sender, SessionEventArgs e)
private void OnSessionManagerSessionStarted(object? sender, SessionEventArgs e)
{
await SendData(true).ConfigureAwait(false);
SendData(true);
}
}
150 changes: 94 additions & 56 deletions MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using MediaBrowser.Controller.Net.WebSocketMessages;
using MediaBrowser.Model.Session;
Expand All @@ -25,22 +26,34 @@ public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType>
where TStateType : WebSocketListenerState, new()
where TReturnDataType : class
{
private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
cvium marked this conversation as resolved.
Show resolved Hide resolved
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
});

private readonly SemaphoreSlim _lock = new(1, 1);
crobibero marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// The _active connections.
/// </summary>
private readonly List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>> _activeConnections =
new List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>>();
private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)> _activeConnections = new();

/// <summary>
/// The logger.
/// </summary>
protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger;

private readonly Task _task;

cvium marked this conversation as resolved.
Show resolved Hide resolved
protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logger)
{
ArgumentNullException.ThrowIfNull(logger);

Logger = logger;

_task = HandleMessages();
cvium marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down Expand Up @@ -115,73 +128,83 @@ protected virtual void Start(WebSocketMessageInfo message)

lock (_activeConnections)
{
_activeConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>(message.Connection, cancellationTokenSource, state));
_activeConnections.Add((message.Connection, cancellationTokenSource, state));
}
}

protected async Task SendData(bool force)
protected void SendData(bool force)
{
Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>[] tuples;
_channel.Writer.TryWrite(force);
}

lock (_activeConnections)
private async Task HandleMessages()
{
await foreach (var force in _channel.Reader.ReadAllAsync())
{
tuples = _activeConnections
.Where(c =>
{
if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested)
{
var state = c.Item3;
try
{
(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)[] tuples;

if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs)
var now = DateTime.UtcNow;
await _lock.WaitAsync().ConfigureAwait(false);
try
{
tuples = _activeConnections
.Where(c =>
{
return true;
}
}
if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancellationRequested)
{
return false;
}

var state = c.State;
return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs;
})
.ToArray();
}
finally
{
_lock.Release();
}

return false;
})
.ToArray();
}
var data = await GetDataToSend().ConfigureAwait(false);
if (data is null || tuples.Length == 0)
{
continue;
}

IEnumerable<Task> GetTasks()
{
foreach (var tuple in tuples)
IEnumerable<Task> GetTasks()
{
foreach (var tuple in tuples)
{
yield return SendDataInternal(data, tuple);
}
}

await Task.WhenAll(GetTasks()).ConfigureAwait(false);
}
catch (Exception ex)
{
yield return SendData(tuple);
Logger.LogError(ex, "Failed to send updates to websockets");
}
}

await Task.WhenAll(GetTasks()).ConfigureAwait(false);
}

private async Task SendData(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> tuple)
private async Task SendDataInternal(TReturnDataType data, (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) tuple)
{
var connection = tuple.Item1;

try
{
var state = tuple.Item3;

var cancellationToken = tuple.Item2.Token;

var data = await GetDataToSend().ConfigureAwait(false);

if (data is not null)
{
await connection.SendAsync(
new OutboundWebSocketMessage<TReturnDataType>
{
MessageType = Type,
Data = data
},
cancellationToken).ConfigureAwait(false);
var (connection, cts, state) = tuple;
var cancellationToken = cts.Token;
await connection.SendAsync(
new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data },
cancellationToken).ConfigureAwait(false);

state.DateLastSendUtc = DateTime.UtcNow;
}
state.DateLastSendUtc = DateTime.UtcNow;
}
catch (OperationCanceledException)
{
if (tuple.Item2.IsCancellationRequested)
if (tuple.CancellationTokenSource.IsCancellationRequested)
{
DisposeConnection(tuple);
}
Expand All @@ -199,32 +222,37 @@ private async Task SendData(Tuple<IWebSocketConnection, CancellationTokenSource,
/// <param name="message">The message.</param>
private void Stop(WebSocketMessageInfo message)
{
lock (_activeConnections)
_lock.Wait();
try
{
var connection = _activeConnections.FirstOrDefault(c => c.Item1 == message.Connection);
var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection);

if (connection is not null)
if (connection != default)
{
DisposeConnection(connection);
}
}
finally
{
_lock.Release();
}
}

/// <summary>
/// Disposes the connection.
/// </summary>
/// <param name="connection">The connection.</param>
private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> connection)
private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) connection)
{
Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name);
Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name);

// TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really...
// connection.Item1.Dispose();

try
{
connection.Item2.Cancel();
connection.Item2.Dispose();
connection.CancellationTokenSource.Cancel();
connection.CancellationTokenSource.Dispose();
}
catch (ObjectDisposedException ex)
{
Expand All @@ -237,10 +265,15 @@ private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSour
Logger.LogError(ex, "Error disposing websocket");
}

lock (_activeConnections)
_lock.Wait();
try
{
_activeConnections.Remove(connection);
}
finally
{
_lock.Release();
}
}

/// <summary>
Expand All @@ -251,13 +284,18 @@ protected virtual void Dispose(bool dispose)
{
if (dispose)
{
lock (_activeConnections)
_lock.Wait();
try
{
foreach (var connection in _activeConnections.ToArray())
{
DisposeConnection(connection);
}
}
finally
{
_lock.Release();
}
}
}

Expand Down