Skip to content

Commit

Permalink
UI SignalR improvements / Move UI notification outside of loop blocks (
Browse files Browse the repository at this point in the history
  • Loading branch information
fassadlr committed Aug 31, 2021
1 parent 251698b commit c6dfa10
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 38 deletions.
32 changes: 13 additions & 19 deletions src/Stratis.Bitcoin.Features.SignalR/EventSubscriptionService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using NLog;
using Stratis.Bitcoin.EventBus;
using Stratis.Bitcoin.Signals;
Expand Down Expand Up @@ -33,31 +33,25 @@ public class EventSubscriptionService : IEventsSubscriptionService, IDisposable

public void Init()
{
MethodInfo subscribeMethod = this.signals.GetType().GetMethod("Subscribe");
MethodInfo onEventCallbackMethod = typeof(EventSubscriptionService).GetMethod("OnEvent");
foreach (IClientEvent eventToHandle in this.options.EventsToHandle)
{
this.logger.Debug("Create subscription for {0}", eventToHandle.NodeEventType);
MethodInfo subscribeMethodInfo = subscribeMethod.MakeGenericMethod(eventToHandle.NodeEventType);
Type callbackType = typeof(Action<>).MakeGenericType(eventToHandle.NodeEventType);
Delegate onEventDelegate = Delegate.CreateDelegate(callbackType, this, onEventCallbackMethod);

var token = (SubscriptionToken)subscribeMethodInfo.Invoke(this.signals, new object[] { onEventDelegate });
this.subscriptions.Add(token);
}
}
async Task callback(EventBase eventBase)
{
Type childType = eventBase.GetType();

/// <summary> This is invoked through reflection.</summary>
public void OnEvent(EventBase @event)
{
Type childType = @event.GetType();
IClientEvent clientEvent = this.options.EventsToHandle.FirstOrDefault(ev => ev.NodeEventType == childType);
if (clientEvent == null)
return;

IClientEvent clientEvent = this.options.EventsToHandle.FirstOrDefault(ev => ev.NodeEventType == childType);
if (clientEvent == null)
return;
clientEvent.BuildFrom(eventBase);

clientEvent.BuildFrom(@event);
this.eventsHub.SendToClientsAsync(clientEvent).ConfigureAwait(false).GetAwaiter().GetResult();
await this.eventsHub.SendToClientsAsync(clientEvent).ConfigureAwait(false);
}

this.signals.Subscribe(eventToHandle.NodeEventType, callback);
}
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ namespace Stratis.Bitcoin.Features.SignalR.Events
{
public sealed class WalletProcessedTransactionOfInterestClientEvent : IClientEvent
{
public string Source { get; set; }

public Type NodeEventType { get; } = typeof(WalletProcessedTransactionOfInterestEvent);

public void BuildFrom(EventBase @event)
{
if (@event is WalletProcessedTransactionOfInterestEvent progressEvent)
if (@event is WalletProcessedTransactionOfInterestEvent txEvent)
{
this.Source = txEvent.Source;
return;
}

throw new ArgumentException();
}
Expand Down
7 changes: 4 additions & 3 deletions src/Stratis.Bitcoin.Features.SignalR/EventsHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ public void UnSubscribeToIncomingSignalRMessages(string target)

public async Task SendToClientsAsync(IClientEvent @event)
{
// Check if any there are any connected clients
if (this.Clients == null) return;
// Check if any there are any connected clients.
if (this.Clients == null)
return;

try
{
await this.Clients.All.SendAsync("receiveEvent", @event);
await this.Clients.All.SendAsync("receiveEvent", @event).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ namespace Stratis.Bitcoin.Features.Wallet.Events
/// </summary>
public sealed class WalletProcessedTransactionOfInterestEvent : EventBase
{
public string Source { get; set; }
}
}
11 changes: 10 additions & 1 deletion src/Stratis.Bitcoin/EventBus/IEventBus.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace Stratis.Bitcoin.EventBus
{
Expand All @@ -8,13 +9,21 @@ namespace Stratis.Bitcoin.EventBus
public interface IEventBus
{
/// <summary>
/// Subscribes to the specified event type with the specified action
/// Subscribes to the specified event type with the specified action.
/// </summary>
/// <typeparam name="TEventBase">The type of event</typeparam>
/// <param name="action">The Action to invoke when an event of this type is published</param>
/// <returns>A <see cref="SubscriptionToken"/> to be used when calling <see cref="Unsubscribe"/></returns>
SubscriptionToken Subscribe<TEventBase>(Action<TEventBase> action) where TEventBase : EventBase;

/// <summary>
/// Subscribes to the specified event type with the specified function.
/// </summary>
/// <param name="eventType">The type of event</typeparam>
/// <param name="handler">The Function to invoke when an event of this type is published</param>
/// <returns>A <see cref="SubscriptionToken"/> to be used when calling <see cref="Unsubscribe"/></returns>
SubscriptionToken Subscribe(Type eventType, Func<EventBase, Task> handler);

/// <summary>
/// Unsubscribe from the Event type related to the specified <see cref="SubscriptionToken"/>
/// </summary>
Expand Down
21 changes: 21 additions & 0 deletions src/Stratis.Bitcoin/EventBus/InMemoryEventBus.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Stratis.Bitcoin.EventBus.PerformanceCounters.InMemoryEventBus;
using Stratis.Bitcoin.Utilities;
Expand Down Expand Up @@ -45,6 +46,26 @@ public InMemoryEventBus(ILoggerFactory loggerFactory, ISubscriptionErrorHandler
this.performanceCounter = new InMemoryEventBusPerformanceCounter();
}

/// <inheritdoc />
public SubscriptionToken Subscribe(Type eventType, Func<EventBase, Task> handler)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));

lock (this.subscriptionsLock)
{
if (!this.subscriptions.ContainsKey(eventType))
{
this.subscriptions.Add(eventType, new List<ISubscription>());
}

var subscriptionToken = new SubscriptionToken(this, eventType);
this.subscriptions[eventType].Add(new Subscription(handler, subscriptionToken));

return subscriptionToken;
}
}

/// <inheritdoc />
public SubscriptionToken Subscribe<TEvent>(Action<TEvent> handler) where TEvent : EventBase
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TracerAttributes;

namespace Stratis.Bitcoin.EventBus.PerformanceCounters.InMemoryEventBus
Expand All @@ -14,7 +15,13 @@ public ExecutionsCountAndDelay GetExecutionsCountAndDelay<TEvent>(ISubscription
{
var actionField = subscription.GetType().GetField("action", BindingFlags.NonPublic | BindingFlags.Instance);
var action = actionField.GetValue(subscription);
MethodInfo methodInfo = ((Action<TEvent>)action).Method;

MethodInfo methodInfo = null;
if (action is Action<TEvent> actionEvent)
methodInfo = actionEvent.Method;

if (action is Func<EventBase, Task> actionFunc)
methodInfo = actionFunc.Method;

if (!this.EventExecutionTime.TryGetValue(typeof(TEvent), out ConcurrentDictionary<MethodInfo, ExecutionsCountAndDelay> eventExecutionCountsAndDelay))
{
Expand Down
28 changes: 28 additions & 0 deletions src/Stratis.Bitcoin/EventBus/Subscription.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace Stratis.Bitcoin.EventBus
{
Expand Down Expand Up @@ -28,4 +29,31 @@ public void Publish(EventBase eventItem)
this.action.Invoke(eventItem as TEventBase);
}
}

internal class Subscription : ISubscription
{
/// <summary>
/// Token returned to the subscriber
/// </summary>
public SubscriptionToken SubscriptionToken { get; }

/// <summary>
/// The action to invoke when a subscripted event type is published.
/// </summary>
private readonly Func<EventBase, Task> action;

public Subscription(Func<EventBase, Task> del, SubscriptionToken token)
{
this.action = del ?? throw new ArgumentNullException(nameof(del));
this.SubscriptionToken = token ?? throw new ArgumentNullException(nameof(token));
}

public void Publish(EventBase eventItem)
{
if (!(eventItem is EventBase))
throw new ArgumentException("Event Item is not the correct type.");

this.action.Invoke(eventItem);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ public void ProcessBlocks(IEnumerable<(ChainedHeader header, Block block)> block

Parallel.ForEach(rounds, round =>
{
if (!ParallelProcessBlock(round, block, chainedHeader))
if (!ParallelProcessBlock(round, block, chainedHeader, out bool _))
done = true;
});

Expand All @@ -885,18 +885,34 @@ public void ProcessBlocks(IEnumerable<(ChainedHeader header, Block block)> block
ProcessBlocksInfo round = (walletName != null) ? this.Wallets[walletName] : this.processBlocksInfo;

if (this.StartBatch(round, blocks.First().header))
{
bool signalUI = false;

foreach ((ChainedHeader chainedHeader, Block block) in blocks.Append((null, null)))
{
this.logger.LogDebug("Processing '{0}'.", chainedHeader);

if (!ParallelProcessBlock(round, block, chainedHeader))
if (!ParallelProcessBlock(round, block, chainedHeader, out bool transactionOfInterestProcessed))
break;

if (transactionOfInterestProcessed)
signalUI = true;
}

if (signalUI)
{
// We only want to raise events for the UI (via SignalR) to query the wallet balance if a transaction pertaining to the wallet
// was processed.
this.signals?.Publish(new WalletProcessedTransactionOfInterestEvent() { Source = "processblock" });
}
}
}
}

private bool ParallelProcessBlock(ProcessBlocksInfo round, Block block, ChainedHeader chainedHeader)
private bool ParallelProcessBlock(ProcessBlocksInfo round, Block block, ChainedHeader chainedHeader, out bool transactionOfInterestProcessed)
{
transactionOfInterestProcessed = false;

try
{
HDWallet wallet = round.Wallet;
Expand Down Expand Up @@ -1018,10 +1034,7 @@ private bool ParallelProcessBlock(ProcessBlocksInfo round, Block block, ChainedH
ITransactionsToLists transactionsToLists = new TransactionsToLists(this.Network, this.ScriptAddressReader, round, this.dateTimeProvider);
if (transactionsToLists.ProcessTransactions(block.Transactions, new HashHeightPair(chainedHeader), blockTime: block.Header.BlockTime.ToUnixTimeSeconds()))
{
// We only want to raise events for the UI (via SignalR) to query the wallet balance if a transaction pertaining to the wallet
// was processed.
this.signals?.Publish(new WalletProcessedTransactionOfInterestEvent());

transactionOfInterestProcessed = true;
this.Metrics.ProcessCount++;
}

Expand Down Expand Up @@ -1224,17 +1237,15 @@ public void ProcessTransaction(string walletName, Transaction transaction, uint2

ProcessBlocksInfo processBlocksInfo = walletContainer;

bool notifyWallet = false;

try
{
IEnumerable<IEnumerable<string>> txToScript;
{
var transactionsToLists = new TransactionsToLists(this.Network, this.ScriptAddressReader, processBlocksInfo, this.dateTimeProvider);
if (transactionsToLists.ProcessTransactions(new[] { transaction }, null, fixedTxId))
{
// We only want to raise events for the UI (via SignalR) to query the wallet balance if a transaction pertaining to the wallet
// was processed.
this.signals?.Publish(new WalletProcessedTransactionOfInterestEvent());
}
notifyWallet = true;

txToScript = (new[] { processBlocksInfo.Outputs, processBlocksInfo.PrevOuts }).Select(list => list.CreateScript());
}
Expand Down Expand Up @@ -1278,6 +1289,13 @@ public void ProcessTransaction(string walletName, Transaction transaction, uint2
walletContainer.LockProcessBlocks.Release();
walletContainer.WriteLockRelease();
}

if (notifyWallet)
{
// We only want to raise events for the UI (via SignalR) to query the wallet balance if a transaction pertaining to the wallet
// was processed.
this.signals?.Publish(new WalletProcessedTransactionOfInterestEvent() { Source = "processtx" });
}
}

/// <inheritdoc />
Expand Down Expand Up @@ -1513,7 +1531,7 @@ public AccountHistory GetHistory(HdAccount account, int limit, int offset, strin
(HDWallet HDWallet, DBConnection conn) = (walletContainer.Wallet, walletContainer.Conn);

var result = HDTransactionData.GetHistory(conn, HDWallet.WalletId, account.Index, limit, offset, txId, accountAddress, forSmartContracts, this.Network.Name.Contains("Cirrus"));

// Filter ColdstakeUtxos
result = result.Where(r =>
{
Expand Down

0 comments on commit c6dfa10

Please sign in to comment.