/
MessageMemorizerActor.cs
99 lines (86 loc) · 3.31 KB
/
MessageMemorizerActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// -----------------------------------------------------------------------
// <copyright file="MessageMemorizerActor.cs" company="Petabridge, LLC">
// Copyright (C) 2017 - 2017 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
namespace Petabridge.Cmd.QuickStart
{
/// <summary>
/// Actor responsible for memorizing messages that will be saved on the server.
/// </summary>
public class MessageMemorizerActor : ReceiveActor
{
private readonly SortedSet<Message> _messages = new SortedSet<Message>();
public MessageMemorizerActor()
{
Receive<Message>(m =>
{
_messages.Add(m);
Sender.Tell(CommandResponse.Empty);
});
Receive<FetchMessages>(f => f.Since == null, f => // all messages
{
foreach (var msg in _messages)
Sender.Tell(new CommandResponse(msg.ToString(), false));
// by setting final:false we signal to client that more responses are coming
Sender.Tell(CommandResponse.Empty); // tells the client not to expect any more responses (final == true)
});
Receive<FetchMessages>(f =>
{
var acceptableTime = DateTime.UtcNow - f.Since;
var matchingMessages =
_messages.Where(x => x.TimeStamp >= acceptableTime).OrderBy(x => x.TimeStamp).ToList();
foreach (var msg in matchingMessages)
Sender.Tell(new CommandResponse(msg.ToString(), false));
// by setting final:false we signal to client that more responses are coming
Sender.Tell(CommandResponse.Empty); // tells the client not to expect any more responses (final == true)
});
Receive<PurgeMessages>(_ =>
{
_messages.Clear();
Sender.Tell(CommandResponse.Empty);
});
}
public class Message : IComparable<Message>
{
public Message(string msg, DateTime timeStamp, string ip)
{
Msg = msg;
TimeStamp = timeStamp;
Ip = ip;
}
public DateTime TimeStamp { get; }
public string Msg { get; }
public string Ip { get; }
public int CompareTo(Message other)
{
if (ReferenceEquals(this, other)) return 0;
if (ReferenceEquals(null, other)) return 1;
return TimeStamp.CompareTo(other.TimeStamp);
}
public override string ToString()
{
return $"[{Ip}][{TimeStamp.ToShortTimeString()}]: {Msg}";
}
}
public class FetchMessages
{
public FetchMessages(TimeSpan? since = null)
{
Since = since;
}
public TimeSpan? Since { get; }
}
public class PurgeMessages
{
public static readonly PurgeMessages Instance = new PurgeMessages();
private PurgeMessages()
{
}
}
}
}