Skip to content

Commit

Permalink
refactor: KeepAlive is now handled properly.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Aug 18, 2020
1 parent f6cf75a commit 8dfe389
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 211 deletions.
Expand Up @@ -49,6 +49,7 @@ protected override void OnInvalidated()
// We intentionally suppress ComputedRegistry.Unregister here,
// otherwise it won't be possible to find IReplica using
// old IComputed.
this.CancelKeepAlive();
}
}
}
47 changes: 21 additions & 26 deletions src/Stl.Fusion/Bridge/ReplicaRegistry.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -126,60 +125,56 @@ public virtual bool Remove(IReplica replica)
return true;
}

// Private members

private void UpdatePruneCounterThreshold()
public Task PruneAsync()
{
lock (Lock) {
// Should be called inside Lock
var capacity = (long) _handles.GetCapacity();
var nextThreshold = (int) Math.Min(int.MaxValue >> 1, capacity);
_pruneCounterThreshold = nextThreshold;
if (_pruneTask == null || _pruneTask.IsCompleted)
_pruneTask = Task.Run(PruneInternal);
return _pruneTask;
}
}

// Protected members

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void OnOperation(int random)
protected void OnOperation(int random)
{
if (!_opCounter.Increment(random, out var opCounterValue))
return;
if (opCounterValue > _pruneCounterThreshold)
TryPrune();
}

private void TryPrune()
protected void TryPrune()
{
lock (Lock) {
// Double check locking
if (_opCounter.ApproximateValue <= _pruneCounterThreshold)
return;
_opCounter.ApproximateValue = 0;
Prune();
PruneAsync();
}
}

private void Prune()
protected virtual void PruneInternal()
{
foreach (var (key, gcHandle) in _handles) {
if (gcHandle.Target == null && _handles.TryRemove(key, gcHandle))
_gcHandlePool.Release(gcHandle, key.PublicationId.HashCode);
}
lock (Lock) {
if (_pruneTask == null || _pruneTask.IsCompleted)
_pruneTask = Task.Run(PruneInternal);
UpdatePruneCounterThreshold();
_opCounter.ApproximateValue = 0;
}
}

private void PruneInternal()
protected void UpdatePruneCounterThreshold()
{
foreach (var (key, gcHandle) in _handles) {
if (gcHandle.Target != null)
continue;
if (!_handles.TryRemove(key, gcHandle))
continue;
var random = key.PublicationId.HashCode;
_gcHandlePool.Release(gcHandle, random);
}

lock (Lock) {
UpdatePruneCounterThreshold();
_opCounter.ApproximateValue = 0;
// Should be called inside Lock
var capacity = (long) _handles.GetCapacity();
var nextThreshold = (int) Math.Min(int.MaxValue >> 1, capacity);
_pruneCounterThreshold = nextThreshold;
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/Stl.Fusion/Computed.Static.cs
Expand Up @@ -9,7 +9,6 @@ namespace Stl.Fusion
{
public static class Computed
{
public static readonly TimeSpan DefaultKeepAliveTime = TimeSpan.FromSeconds(1);
private static readonly AsyncLocal<IComputed?> CurrentLocal = new AsyncLocal<IComputed?>();

// GetCurrent & ChangeCurrent
Expand Down
23 changes: 4 additions & 19 deletions src/Stl.Fusion/Computed.cs
Expand Up @@ -8,7 +8,6 @@
using Stl.Collections.Slim;
using Stl.Frozen;
using Stl.Fusion.Internal;
using Stl.Time;

namespace Stl.Fusion
{
Expand All @@ -29,10 +28,8 @@ public interface IComputed : IResult
ComputedState State { get; }
bool IsConsistent { get; }
event Action<IComputed> Invalidated;
Moment LastAccessTime { get; }

bool Invalidate();
void Touch();
TResult Apply<TArg, TResult>(IComputedApplyHandler<TArg, TResult> handler, TArg arg);

ValueTask<IComputed> UpdateAsync(bool addDependency, CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -70,7 +67,6 @@ public class Computed<TIn, TOut> : IComputed<TIn, TOut>, IComputedImpl
// ReSharper disable once InconsistentNaming
private event Action<IComputed>? _invalidated;
private bool _invalidateOnSetOutput;
private long _lastAccessTimeTicks;
private object Lock => this;

public ComputedOptions Options {
Expand All @@ -87,13 +83,6 @@ public class Computed<TIn, TOut> : IComputed<TIn, TOut>, IComputedImpl
public IFunction<TIn, TOut> Function => (IFunction<TIn, TOut>) Input.Function;
public LTag Version { get; }

public Moment LastAccessTime {
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => new Moment(Volatile.Read(ref _lastAccessTimeTicks));
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected set => Volatile.Write(ref _lastAccessTimeTicks, value.EpochOffset.Ticks);
}

public Type OutputType => typeof(TOut);
public Result<TOut> Output {
get {
Expand Down Expand Up @@ -140,7 +129,6 @@ public Computed(ComputedOptions options, TIn input, LTag version)
_options = options;
Input = input;
Version = version;
LastAccessTime = CoarseCpuClock.Now;
ComputedRegistry.Instance.Register(this);
}

Expand All @@ -153,7 +141,6 @@ public Computed(ComputedOptions options, TIn input, Result<TOut> output, LTag ve
_state = (int) (isConsistent ? ComputedState.Consistent : ComputedState.Invalidated);
_output = output;
Version = version;
LastAccessTime = CoarseCpuClock.Now;
if (isConsistent)
ComputedRegistry.Instance.Register(this);
}
Expand Down Expand Up @@ -290,7 +277,10 @@ public bool Invalidate()
}

protected virtual void OnInvalidated()
=> ComputedRegistry.Instance.Unregister(this);
{
ComputedRegistry.Instance.Unregister(this);
this.CancelKeepAlive();
}

// UpdateAsync

Expand All @@ -316,11 +306,6 @@ public async ValueTask<TOut> UseAsync(CancellationToken cancellationToken = defa
return computed.Value;
}

// Touch

public void Touch()
=> LastAccessTime = CoarseCpuClock.Now;

// Apply

public TResult Apply<TArg, TResult>(IComputedApplyHandler<TArg, TResult> handler, TArg arg)
Expand Down
25 changes: 25 additions & 0 deletions src/Stl.Fusion/ComputedEx.cs
@@ -1,13 +1,17 @@
using System;
using System.Reactive;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Stl.Async;
using Stl.Fusion.Internal;

namespace Stl.Fusion
{
public static partial class ComputedEx
{
private static readonly TimeSpan CancelKeepAliveThreshold = TimeSpan.FromSeconds(1.1);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static T Strip<T>(this IComputed<T>? computed)
=> computed != null ? computed.Value : default!;
Expand All @@ -20,5 +24,26 @@ public static Task WhenInvalidatedAsync<T>(this IComputed<T> computed, Cancellat
computed.Invalidated += c => ts.SetResult(default);
return ts.Task.WithFakeCancellation(cancellationToken);
}

public static void KeepAlive(this IComputed computed)
{
var keepAliveTime = computed.Options.KeepAliveTime;
if (keepAliveTime > TimeSpan.Zero && computed.State != ComputedState.Invalidated)
RefHolder.Hold(computed, keepAliveTime);
}

public static void CancelKeepAlive(this IComputed computed, TimeSpan threshold)
{
var keepAliveTime = computed.Options.KeepAliveTime;
if (keepAliveTime >= threshold)
RefHolder.Release(computed);
}

public static void CancelKeepAlive(this IComputed computed)
{
var keepAliveTime = computed.Options.KeepAliveTime;
if (keepAliveTime >= CancelKeepAliveThreshold)
RefHolder.Release(computed);
}
}
}
2 changes: 1 addition & 1 deletion src/Stl.Fusion/ComputedOptions.cs
Expand Up @@ -6,7 +6,7 @@ namespace Stl.Fusion
[Serializable]
public class ComputedOptions
{
public static readonly TimeSpan DefaultKeepAliveTime = TimeSpan.FromSeconds(1);
public static readonly TimeSpan DefaultKeepAliveTime = TimeSpan.Zero;
public static readonly TimeSpan DefaultErrorAutoInvalidateTime = TimeSpan.FromSeconds(1);
public static readonly TimeSpan DefaultAutoInvalidateTime = TimeSpan.MaxValue; // No auto invalidation

Expand Down

0 comments on commit 8dfe389

Please sign in to comment.