This repository has been archived by the owner on Sep 15, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
/
LogChangeHandler.cs
65 lines (58 loc) · 2.04 KB
/
LogChangeHandler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
using RethinkDb.Driver;
using RethinkDbLogProvider;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RethinkDb.Driver.Net;
namespace LogWatcher
{
public class LogChangeHandler
{
private static RethinkDB R = RethinkDB.R;
private readonly Microsoft.AspNetCore.SignalR.Infrastructure.IConnectionManager _signalManager;
private readonly IRethinkDbConnectionFactory _rethinkDbFactory;
private RethinkDbOptions _options;
private Connection _conn;
private DateTime _lastLogTimestamp = DateTime.UtcNow.AddSeconds(-1);
private int _retyCount = 0;
public LogChangeHandler(IRethinkDbConnectionFactory rethinkDbFactory,
IOptions<RethinkDbOptions> options,
Microsoft.AspNetCore.SignalR.Infrastructure.IConnectionManager signalManager)
{
_rethinkDbFactory = rethinkDbFactory;
_options = options.Value;
_signalManager = signalManager;
}
public void HandleUpdates()
{
try
{
_conn = _rethinkDbFactory.CreateConnection();
RunChangefeed();
}
catch (Exception)
{
_retyCount++;
//TODO: retry limit
HandleUpdates();
}
}
private void RunChangefeed()
{
var hubContext = _signalManager.GetHubContext<LogHub>();
var feed = R.Db(_options.Database).Table("Logs")
.Between(_lastLogTimestamp, R.Maxval())[new { index = nameof(LogEntry.Timestamp) }]
.Changes().RunChanges<LogEntry>(_conn);
foreach (var log in feed)
{
// push new value to SignalR hub
hubContext.Clients.All.OnLog(log.NewValue);
// start point on reconnect
_lastLogTimestamp = log.NewValue.Timestamp;
}
}
}
}