Skip to content

Commit

Permalink
Fixed: Refreshing Plex Server series in high volume systems
Browse files Browse the repository at this point in the history
(cherry picked from commit 903aba5dee0284e571fc025883abbf2a9ccf4386)
  • Loading branch information
Taloth authored and ta264 committed Sep 28, 2020
1 parent 07517e3 commit 95e7d9d
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 30 deletions.
9 changes: 9 additions & 0 deletions src/NzbDrone.Common/Cache/CacheManager.cs
Expand Up @@ -8,6 +8,7 @@ public interface ICacheManager
{
ICached<T> GetCache<T>(Type host);
ICached<T> GetCache<T>(Type host, string name);
ICached<T> GetRollingCache<T>(Type host, string name, TimeSpan defaultLifeTime);
ICachedDictionary<T> GetCacheDictionary<T>(Type host, string name, Func<IDictionary<string, T>> fetchFunc = null, TimeSpan? lifeTime = null);
void Clear();
ICollection<ICached> Caches { get; }
Expand Down Expand Up @@ -43,6 +44,14 @@ public ICached<T> GetCache<T>(Type host, string name)
return (ICached<T>)_cache.Get(host.FullName + "_" + name, () => new Cached<T>());
}

public ICached<T> GetRollingCache<T>(Type host, string name, TimeSpan defaultLifeTime)
{
Ensure.That(host, () => host).IsNotNull();
Ensure.That(name, () => name).IsNotNullOrWhiteSpace();

return (ICached<T>)_cache.Get(host.FullName + "_" + name, () => new Cached<T>(defaultLifeTime, true));
}

public ICachedDictionary<T> GetCacheDictionary<T>(Type host, string name, Func<IDictionary<string, T>> fetchFunc = null, TimeSpan? lifeTime = null)
{
Ensure.That(host, () => host).IsNotNull();
Expand Down
73 changes: 54 additions & 19 deletions src/NzbDrone.Common/Cache/Cached.cs
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Linq;
using NzbDrone.Common.EnsureThat;
using NzbDrone.Common.Extensions;

namespace NzbDrone.Common.Cache
{
Expand All @@ -29,35 +30,49 @@ public bool IsExpired()
}

private readonly ConcurrentDictionary<string, CacheItem> _store;
private readonly TimeSpan? _defaultLifeTime;
private readonly bool _rollingExpiry;

public Cached()
public Cached(TimeSpan? defaultLifeTime = null, bool rollingExpiry = false)
{
_store = new ConcurrentDictionary<string, CacheItem>();
_defaultLifeTime = defaultLifeTime;
_rollingExpiry = rollingExpiry;
}

public void Set(string key, T value, TimeSpan? lifetime = null)
public void Set(string key, T value, TimeSpan? lifeTime = null)
{
Ensure.That(key, () => key).IsNotNullOrWhiteSpace();
_store[key] = new CacheItem(value, lifetime);
_store[key] = new CacheItem(value, lifeTime ?? _defaultLifeTime);
}

public T Find(string key)
{
CacheItem value;
_store.TryGetValue(key, out value);

if (value == null)
CacheItem cacheItem;
if (!_store.TryGetValue(key, out cacheItem))
{
return default(T);
}

if (value.IsExpired())
if (cacheItem.IsExpired())
{
_store.TryRemove(key, out value);
return default(T);
if (TryRemove(key, cacheItem))
{
return default(T);
}

if (!_store.TryGetValue(key, out cacheItem))
{
return default(T);
}
}

if (_rollingExpiry && _defaultLifeTime.HasValue)
{
_store.TryUpdate(key, new CacheItem(cacheItem.Object, _defaultLifeTime.Value), cacheItem);
}

return value.Object;
return cacheItem.Object;
}

public void Remove(string key)
Expand All @@ -72,20 +87,31 @@ public T Get(string key, Func<T> function, TimeSpan? lifeTime = null)
{
Ensure.That(key, () => key).IsNotNullOrWhiteSpace();

lifeTime = lifeTime ?? _defaultLifeTime;

CacheItem cacheItem;
T value;

if (!_store.TryGetValue(key, out cacheItem) || cacheItem.IsExpired())
if (_store.TryGetValue(key, out cacheItem) && !cacheItem.IsExpired())
{
value = function();
Set(key, value, lifeTime);
if (_rollingExpiry && lifeTime.HasValue)
{
_store.TryUpdate(key, new CacheItem(cacheItem.Object, lifeTime), cacheItem);
}
}
else
{
value = cacheItem.Object;
var newCacheItem = new CacheItem(function(), lifeTime);
if (cacheItem != null && _store.TryUpdate(key, newCacheItem, cacheItem))
{
cacheItem = newCacheItem;
}
else
{
cacheItem = _store.GetOrAdd(key, newCacheItem);
}
}

return value;
return cacheItem.Object;
}

public void Clear()
Expand All @@ -95,9 +121,11 @@ public void Clear()

public void ClearExpired()
{
foreach (var cached in _store.Where(c => c.Value.IsExpired()))
var collection = (ICollection<KeyValuePair<string, CacheItem>>)_store;

foreach (var cached in _store.Where(c => c.Value.IsExpired()).ToList())
{
Remove(cached.Key);
collection.Remove(cached);
}
}

Expand All @@ -108,5 +136,12 @@ public ICollection<T> Values
return _store.Values.Select(c => c.Object).ToList();
}
}

private bool TryRemove(string key, CacheItem value)
{
var collection = (ICollection<KeyValuePair<string, CacheItem>>)_store;

return collection.Remove(new KeyValuePair<string, CacheItem>(key, value));
}
}
}
8 changes: 8 additions & 0 deletions src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs
@@ -0,0 +1,8 @@
using NzbDrone.Common.Messaging;

namespace NzbDrone.Core.MediaFiles.Events
{
public class RenameCompletedEvent : IEvent
{
}
}
4 changes: 4 additions & 0 deletions src/NzbDrone.Core/MediaFiles/RenameTrackFileService.cs
Expand Up @@ -157,6 +157,8 @@ public void Execute(RenameFilesCommand message)
_logger.ProgressInfo("Renaming {0} files for {1}", trackFiles.Count, artist.Name);
RenameFiles(trackFiles, artist);
_logger.ProgressInfo("Selected track files renamed for {0}", artist.Name);

_eventAggregator.PublishEvent(new RenameCompletedEvent());
}

public void Execute(RenameArtistCommand message)
Expand All @@ -171,6 +173,8 @@ public void Execute(RenameArtistCommand message)
RenameFiles(trackFiles, artist);
_logger.ProgressInfo("All track files renamed for {0}", artist.Name);
}

_eventAggregator.PublishEvent(new RenameCompletedEvent());
}
}
}
1 change: 1 addition & 0 deletions src/NzbDrone.Core/Notifications/INotification.cs
Expand Up @@ -14,6 +14,7 @@ public interface INotification : IProvider
void OnDownloadFailure(DownloadFailedMessage message);
void OnImportFailure(AlbumDownloadMessage message);
void OnTrackRetag(TrackRetagMessage message);
void ProcessQueue();
bool SupportsOnGrab { get; }
bool SupportsOnReleaseImport { get; }
bool SupportsOnUpgrade { get; }
Expand Down
4 changes: 4 additions & 0 deletions src/NzbDrone.Core/Notifications/NotificationBase.cs
Expand Up @@ -64,6 +64,10 @@ public virtual void OnTrackRetag(TrackRetagMessage message)
{
}

public virtual void ProcessQueue()
{
}

public bool SupportsOnGrab => HasConcreteImplementation("OnGrab");
public bool SupportsOnRename => HasConcreteImplementation("OnRename");
public bool SupportsOnReleaseImport => HasConcreteImplementation("OnReleaseImport");
Expand Down
29 changes: 28 additions & 1 deletion src/NzbDrone.Core/Notifications/NotificationService.cs
Expand Up @@ -21,7 +21,9 @@ public class NotificationService
IHandle<HealthCheckFailedEvent>,
IHandle<DownloadFailedEvent>,
IHandle<AlbumImportIncompleteEvent>,
IHandle<TrackFileRetaggedEvent>
IHandle<TrackFileRetaggedEvent>,
IHandleAsync<RenameCompletedEvent>,
IHandleAsync<HealthCheckCompleteEvent>
{
private readonly INotificationFactory _notificationFactory;
private readonly Logger _logger;
Expand Down Expand Up @@ -272,5 +274,30 @@ public void Handle(TrackFileRetaggedEvent message)
}
}
}

public void HandleAsync(RenameCompletedEvent message)
{
ProcessQueue();
}

public void HandleAsync(HealthCheckCompleteEvent message)
{
ProcessQueue();
}

private void ProcessQueue()
{
foreach (var notification in _notificationFactory.GetAvailableProviders())
{
try
{
notification.ProcessQueue();
}
catch (Exception ex)
{
_logger.Warn(ex, "Unable to process notification queue for " + notification.Definition.Name);
}
}
}
}
}
77 changes: 75 additions & 2 deletions src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs
@@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using FluentValidation.Results;
using NLog;
using NzbDrone.Common.Cache;
using NzbDrone.Common.Extensions;
using NzbDrone.Core.Exceptions;
using NzbDrone.Core.Music;
Expand All @@ -13,11 +16,23 @@ public class PlexServer : NotificationBase<PlexServerSettings>
{
private readonly IPlexServerService _plexServerService;
private readonly IPlexTvService _plexTvService;
private readonly Logger _logger;

public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService)
private class PlexUpdateQueue
{
public Dictionary<int, Artist> Pending { get; } = new Dictionary<int, Artist>();
public bool Refreshing { get; set; }
}

private readonly ICached<PlexUpdateQueue> _pendingArtistCache;

public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService, ICacheManager cacheManager, Logger logger)
{
_plexServerService = plexServerService;
_plexTvService = plexTvService;
_logger = logger;

_pendingArtistCache = cacheManager.GetRollingCache<PlexUpdateQueue>(GetType(), "pendingArtists", TimeSpan.FromDays(1));
}

public override string Link => "https://www.plex.tv/";
Expand All @@ -42,7 +57,65 @@ private void UpdateIfEnabled(Artist artist)
{
if (Settings.UpdateLibrary)
{
_plexServerService.UpdateLibrary(artist, Settings);
_logger.Debug("Scheduling library update for artist {0} {1}", artist.Id, artist.Name);
var queue = _pendingArtistCache.Get(Settings.Host, () => new PlexUpdateQueue());
lock (queue)
{
queue.Pending[artist.Id] = artist;
}
}
}

public override void ProcessQueue()
{
PlexUpdateQueue queue = _pendingArtistCache.Find(Settings.Host);
if (queue == null)
{
return;
}

lock (queue)
{
if (queue.Refreshing)
{
return;
}

queue.Refreshing = true;
}

try
{
while (true)
{
List<Artist> refreshingArtist;
lock (queue)
{
if (queue.Pending.Empty())
{
queue.Refreshing = false;
return;
}

refreshingArtist = queue.Pending.Values.ToList();
queue.Pending.Clear();
}

if (Settings.UpdateLibrary)
{
_logger.Debug("Performing library update for {0} artist", refreshingArtist.Count);
_plexServerService.UpdateLibrary(refreshingArtist, Settings);
}
}
}
catch
{
lock (queue)
{
queue.Refreshing = false;
}

throw;
}
}

Expand Down

0 comments on commit 95e7d9d

Please sign in to comment.