-
Notifications
You must be signed in to change notification settings - Fork 3
/
socket_service.hpp
225 lines (196 loc) · 8.62 KB
/
socket_service.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#pragma once
#include "fly/net/socket/concepts.hpp"
#include "fly/net/socket/types.hpp"
#include <functional>
#include <memory>
#include <vector>
namespace fly::task {
class SequencedTaskRunner;
} // namespace fly::task
namespace fly::net {
class NetworkConfig;
/**
* Class to monitor asynchronous socket handles for IO readiness. Sockets handles are monitored on a
* per-IO basis.
*
* @author Timothy Flynn (trflynn89@pm.me)
* @version March 6, 2021
*/
class SocketService : public std::enable_shared_from_this<SocketService>
{
public:
/**
* Destructor. Deinitialize the socket service.
*/
~SocketService() noexcept;
/**
* Create a socket service.
*
* @param task_runner Task runner for posting socket service tasks onto.
* @param config Reference to network configuration.
*
* @return The created socket service.
*/
static std::shared_ptr<SocketService> create(
std::shared_ptr<fly::task::SequencedTaskRunner> task_runner,
std::shared_ptr<NetworkConfig> config);
/**
* Create an asynchronous socket armed with this socket service for performing IO operations.
*
* @return The created socket.
*/
template <Socket SocketType>
std::shared_ptr<SocketType> create_socket();
/**
* Remove a socket handle from the service if it is being monitored. This is not guaranteed to
* cancel a pending IO readiness notification. If the service is ready to notify a socket about
* IO readiness, that notification will still occur.
*
* @param handle The socket handle to remove.
*/
void remove_socket(socket_type handle);
/**
* Monitor a socket handle for readiness to be written to.
*
* The provided callback may be any callable type which accepts a single argument, a strong
* pointer to the socket being monitored. The callback is protected by the provided strong
* socket pointer. When the monitor is queued, the strong pointer is stored as a weak pointer
* until the socket becomes ready for writing. It is then converted back to a strong pointer to
* invoke the callback; if the lock fails, the callback is dropped.
*
* Note: The provided callback will be triggered directly on the sequence that is monitoring all
* sockets. Thus, the callback should not perform any blocking operations.
*
* @tparam SocketType Type of the socket to monitor.
* @tparam Callback Type of the callback to invoke when the socket is ready for writing.
*
* @param socket The socket to monitor.
* @param callback The callback to invoke when the socket is ready for writing.
*/
template <Socket SocketType, SocketNotification<SocketType> Callback>
void notify_when_writable(std::shared_ptr<SocketType> const &socket, Callback &&callback);
/**
* Monitor a socket handle for readiness to be read from.
*
* The provided callback may be any callable type which accepts a single argument, a strong
* pointer to the socket being monitored. The callback is protected by the provided strong
* socket pointer. When the monitor is queued, the strong pointer is stored as a weak pointer
* until the socket becomes ready for reading. It is then converted back to a strong pointer to
* invoke the callback; if the lock fails, the callback is dropped.
*
* Note: The provided callback will be triggered directly on the sequence that is monitoring all
* sockets. Thus, the callback should not perform any blocking operations.
*
* @tparam SocketType Type of the socket to monitor.
* @tparam Callback Type of the callback to invoke when the socket is ready for reading.
*
* @param socket The socket to monitor.
* @param callback The callback to invoke when the socket is ready for reading.
*/
template <Socket SocketType, SocketNotification<SocketType> Callback>
void notify_when_readable(std::shared_ptr<SocketType> const &socket, Callback &&callback);
private:
using Notification = std::function<void()>;
struct Request
{
Request(socket_type handle, Notification callback) noexcept;
socket_type m_handle;
Notification m_callback;
};
/**
* Private constructor to ensure the serivce is created as a shared pointer.
*
* @param task_runner Task runner for posting socket service tasks onto.
*/
SocketService(
std::shared_ptr<fly::task::SequencedTaskRunner> task_runner,
std::shared_ptr<NetworkConfig> config) noexcept;
/**
* Monitor a socket handle for readiness to be written to. Once queued, if the polling task is
* not already armed, it will be triggered.
*
* Note: The provided callback will be triggered directly on the sequence that is monitoring all
* sockets. Thus, the callback should not perform any blocking operations.
*
* @param handle The socket handle to monitor.
* @param callback The callback to invoke when the socket is ready for writing.
*/
void notify_when_writable(socket_type handle, Notification &&callback);
/**
* Monitor a socket handle for readiness to be read from. Once queued, if the polling task is
* not already armed, it will be triggered.
*
* Note: The provided callback will be triggered directly on the sequence that is monitoring all
* sockets. Thus, the callback should not perform any blocking operations.
*
* @param handle The socket handle to monitor.
* @param callback The callback to invoke when the socket is ready for reading.
*/
void notify_when_readable(socket_type handle, Notification &&callback);
/**
* Wrap a callback in a lambda to be protected by the provied strong socket pointer. The strong
* pointer is bound to the lambda as a weak pointer. When the callback is ready to be executed,
* if the weak pointer fails to be locked, the callback is dropped.
*
* @tparam SocketType Type of the socket to monitor.
* @tparam Callback Type of the callback to wrap.
*
* @param socket The socket to monitor.
* @param callback The callback to wrap.
*
* @return The wrapped callback.
*/
template <Socket SocketType, SocketNotification<SocketType> Callback>
Notification wrap_callback(std::shared_ptr<SocketType> const &socket, Callback &&callback);
/**
* Check if any sockets are ready for IO. Trigger the callback for all ready sockets. Upon
* completion, if any sockets are still waiting to be ready for IO, the task re-arms itself.
*/
void poll();
std::shared_ptr<fly::task::SequencedTaskRunner> m_task_runner;
std::shared_ptr<NetworkConfig> m_config;
std::vector<Request> m_write_requests;
std::vector<Request> m_read_requests;
};
//==================================================================================================
template <Socket SocketType>
std::shared_ptr<SocketType> SocketService::create_socket()
{
return SocketType::create_socket(shared_from_this(), m_config);
}
//==================================================================================================
template <Socket SocketType, SocketNotification<SocketType> Callback>
void SocketService::notify_when_writable(
std::shared_ptr<SocketType> const &socket,
Callback &&callback)
{
notify_when_writable(socket->handle(), wrap_callback(socket, std::forward<Callback>(callback)));
}
//==================================================================================================
template <Socket SocketType, SocketNotification<SocketType> Callback>
void SocketService::notify_when_readable(
std::shared_ptr<SocketType> const &socket,
Callback &&callback)
{
notify_when_readable(socket->handle(), wrap_callback(socket, std::forward<Callback>(callback)));
}
//==================================================================================================
template <Socket SocketType, SocketNotification<SocketType> Callback>
auto SocketService::wrap_callback(std::shared_ptr<SocketType> const &socket, Callback &&callback)
-> Notification
{
// Further wrap the callback in a structure to allow perfect forwarding into the lambda below.
struct CallbackHolder
{
Callback m_callback;
};
std::weak_ptr<SocketType> weak_socket = socket;
CallbackHolder holder {std::forward<Callback>(callback)};
return [weak_socket = std::move(weak_socket), holder = std::move(holder)]() mutable {
if (std::shared_ptr<SocketType> strong_socket = weak_socket.lock(); strong_socket)
{
std::invoke(std::move(holder.m_callback), std::move(strong_socket));
}
};
}
} // namespace fly::net