-
Notifications
You must be signed in to change notification settings - Fork 43
/
driver.d
621 lines (486 loc) · 19.5 KB
/
driver.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
/** Definition of the core event driver interface.
This module contains all declarations necessary for defining and using
event drivers. Event driver implementations will usually inherit from
`EventDriver` using a `final` class to avoid virtual function overhead.
Callback_Behavior:
All callbacks follow the same rules to enable generic implementation
of high-level libraries, such as vibe.d. Except for "listen" style
callbacks, each callback will only ever be called at most once.
If the operation does not get canceled, the callback will be called
exactly once. In case it gets manually canceled using the corresponding
API function, the callback is guaranteed to not be called. However,
the associated operation might still finish - either before the
cancellation function returns, or afterwards.
*/
module eventcore.driver;
@safe: /*@nogc:*/ nothrow:
import core.time : Duration;
import std.socket : Address;
/** Encapsulates a full event driver.
This interface provides access to the individual driver features, as well as
a central `dispose` method that must be called before the driver gets
destroyed or before the process gets terminated.
*/
interface EventDriver {
@safe: /*@nogc:*/ nothrow:
/// Core event loop functionality
@property EventDriverCore core();
/// Single shot and recurring timers
@property EventDriverTimers timers();
/// Cross-thread events (thread local access)
@property EventDriverEvents events();
/// Cross-thread events (cross-thread access)
@property shared(EventDriverEvents) events() shared;
/// UNIX/POSIX signal reception
@property EventDriverSignals signals();
/// Stream and datagram sockets
@property EventDriverSockets sockets();
/// DNS queries
@property EventDriverDNS dns();
/// Local file operations
@property EventDriverFiles files();
/// Directory change watching
@property EventDriverWatchers watchers();
/// Releases all resources associated with the driver
void dispose();
}
/** Provides generic event loop control.
*/
interface EventDriverCore {
@safe: /*@nogc:*/ nothrow:
/** The number of pending callbacks.
When this number drops to zero, the event loop can safely be quit. It is
guaranteed that no callbacks will be made anymore, unless new callbacks
get registered.
*/
size_t waiterCount();
/** Runs the event loop to process a chunk of events.
This method optionally waits for an event to arrive if none are present
in the event queue. The function will return after either the specified
timeout has elapsed, or once the event queue has been fully emptied.
Params:
timeout = Maximum amount of time to wait for an event. A duration of
zero will cause the function to only process pending events. A
duration of `Duration.max`, if necessary, will wait indefinitely
until an event arrives.
*/
ExitReason processEvents(Duration timeout);
/** Causes `processEvents` to return with `ExitReason.exited` as soon as
possible.
A call to `processEvents` that is currently in progress will be notified
so that it returns immediately. If no call is in progress, the next call
to `processEvents` will immediately return with `ExitReason.exited`.
*/
void exit();
/** Resets the exit flag.
`processEvents` will automatically reset the exit flag before it returns
with `ExitReason.exited`. However, if `exit` is called outside of
`processEvents`, the next call to `processEvents` will return with
`ExitCode.exited` immediately. This function can be used to avoid this.
*/
void clearExitFlag();
/// Low-level user data access. Use `getUserData` instead.
protected void* rawUserData(StreamSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
/// ditto
protected void* rawUserData(DatagramSocketFD descriptor, size_t size, DataInitializer initialize, DataInitializer destroy) @system;
/** Retrieves a reference to a user-defined value associated with a descriptor.
*/
@property final ref T userData(T, FD)(FD descriptor)
@trusted {
import std.conv : emplace;
static void init(void* ptr) { emplace(cast(T*)ptr); }
static void destr(void* ptr) { destroy(*cast(T*)ptr); }
return *cast(T*)rawUserData(descriptor, T.sizeof, &init, &destr);
}
}
/** Provides access to socket functionality.
The interface supports two classes of sockets - stream sockets and datagram
sockets.
*/
interface EventDriverSockets {
@safe: /*@nogc:*/ nothrow:
/** Connects to a stream listening socket.
*/
StreamSocketFD connectStream(scope Address peer_address, scope Address bind_address, ConnectCallback on_connect);
/** Adopts an existing stream socket.
The given socket must be in a connected state. It will be automatically
switched to non-blocking mode if necessary. Beware that this may have
side effects in other code that uses the socket and assumes blocking
operations.
Params:
socket: Socket file descriptor to adopt
Returns:
Returns a socket handle corresponding to the passed socket
descriptor. If the same file descriptor is already registered,
`StreamSocketFD.invalid` will be returned instead.
*/
StreamSocketFD adoptStream(int socket);
/// Creates a socket listening for incoming stream connections.
StreamListenSocketFD listenStream(scope Address bind_address, StreamListenOptions options, AcceptCallback on_accept);
final StreamListenSocketFD listenStream(scope Address bind_address, AcceptCallback on_accept) {
return listenStream(bind_address, StreamListenOptions.defaults, on_accept);
}
/// Starts to wait for incoming connections on a listening socket.
void waitForConnections(StreamListenSocketFD sock, AcceptCallback on_accept);
/// Determines the current connection state.
ConnectionState getConnectionState(StreamSocketFD sock);
/// Retrieves the bind address of a socket.
bool getLocalAddress(StreamSocketFD sock, scope RefAddress dst);
/// Sets the `TCP_NODELAY` option on a socket
void setTCPNoDelay(StreamSocketFD socket, bool enable);
/// Sets to `SO_KEEPALIVE` socket option on a socket.
void setKeepAlive(StreamSocketFD socket, bool enable);
/** Reads data from a stream socket.
Note that only a single read operation is allowed at once. The caller
needs to make sure that either `on_read_finish` got called, or
`cancelRead` was called before issuing the next call to `read`.
However, concurrent writes are legal.
Waiting_for_data_availability:
With the special combination of a zero-length buffer and `mode`
set to either `IOMode.once` or `IOMode.all`, this function will
wait until data is available on the socket without reading
anything.
Note that in this case the `IOStatus` parameter of the callback
will not reliably reflect a passive connection close. It is
necessary to actually read some data to make sure this case
is detected.
*/
void read(StreamSocketFD socket, ubyte[] buffer, IOMode mode, IOCallback on_read_finish);
/** Cancels an ongoing read operation.
After this function has been called, the `IOCallback` specified in
the call to `read` is guaranteed to not be called.
*/
void cancelRead(StreamSocketFD socket);
/** Reads data from a stream socket.
Note that only a single write operation is allowed at once. The caller
needs to make sure that either `on_write_finish` got called, or
`cancelWrite` was called before issuing the next call to `write`.
However, concurrent reads are legal.
*/
void write(StreamSocketFD socket, const(ubyte)[] buffer, IOMode mode, IOCallback on_write_finish);
/** Cancels an ongoing write operation.
After this function has been called, the `IOCallback` specified in
the call to `write` is guaranteed to not be called.
*/
void cancelWrite(StreamSocketFD socket);
/** Waits for incoming data without actually reading it.
*/
void waitForData(StreamSocketFD socket, IOCallback on_data_available);
/** Initiates a connection close.
*/
void shutdown(StreamSocketFD socket, bool shut_read, bool shut_write);
/** Creates a connection-less datagram socket.
Params:
bind_address: The local bind address to use for the socket. It
will be able to receive any messages sent to this address.
target_address: Optional default target address. If this is
specified and the target address parameter of `send` is
left to `null`, it will be used instead.
Returns:
Returns a datagram socket handle if the socket was created
successfully. Otherwise returns `DatagramSocketFD.invalid`.
*/
DatagramSocketFD createDatagramSocket(scope Address bind_address, scope Address target_address);
/** Adopts an existing datagram socket.
The socket must be properly bound before this function is called.
Params:
socket: Socket file descriptor to adopt
Returns:
Returns a socket handle corresponding to the passed socket
descriptor. If the same file descriptor is already registered,
`DatagramSocketFD.invalid` will be returned instead.
*/
DatagramSocketFD adoptDatagramSocket(int socket);
/// Sets the `SO_BROADCAST` socket option.
bool setBroadcast(DatagramSocketFD socket, bool enable);
/// Receives a single datagram.
void receive(DatagramSocketFD socket, ubyte[] buffer, IOMode mode, DatagramIOCallback on_receive_finish);
/// Cancels an ongoing wait for an incoming datagram.
void cancelReceive(DatagramSocketFD socket);
/// Sends a single datagram.
void send(DatagramSocketFD socket, const(ubyte)[] buffer, IOMode mode, Address target_address, DatagramIOCallback on_send_finish);
/// Cancels an ongoing wait for an outgoing datagram.
void cancelSend(DatagramSocketFD socket);
/** Increments the reference count of the given socket.
*/
void addRef(SocketFD descriptor);
/** Decrements the reference count of the given socket.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(SocketFD descriptor);
}
/** Performs asynchronous DNS queries.
*/
interface EventDriverDNS {
@safe: /*@nogc:*/ nothrow:
/// Looks up addresses corresponding to the given DNS name.
DNSLookupID lookupHost(string name, DNSLookupCallback on_lookup_finished);
/// Cancels an ongoing DNS lookup.
void cancelLookup(DNSLookupID handle);
}
/** Provides read/write operations on the local file system.
*/
interface EventDriverFiles {
@safe: /*@nogc:*/ nothrow:
FileFD open(string path, FileOpenMode mode);
FileFD adopt(int system_file_handle);
void close(FileFD file);
ulong getSize(FileFD file);
void write(FileFD file, ulong offset, const(ubyte)[] buffer, IOMode mode, FileIOCallback on_write_finish);
void read(FileFD file, ulong offset, ubyte[] buffer, IOMode mode, FileIOCallback on_read_finish);
void cancelWrite(FileFD file);
void cancelRead(FileFD file);
/** Increments the reference count of the given file.
*/
void addRef(FileFD descriptor);
/** Decrements the reference count of the given file.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(FileFD descriptor);
}
/** Cross-thread notifications
"Events" can be used to wake up the event loop of a foreign thread. This is
the basis for all kinds of thread synchronization primitives, such as
mutexes, condition variables, message queues etc. Such primitives, in case
of extended wait periods, should use events rather than traditional means
to block, such as busy loops or kernel based wait mechanisms to avoid
stalling the event loop.
*/
interface EventDriverEvents {
@safe: /*@nogc:*/ nothrow:
/// Creates a new cross-thread event.
EventID create();
/// Triggers an event owned by the current thread.
void trigger(EventID event, bool notify_all);
/// Triggers an event possibly owned by a different thread.
void trigger(EventID event, bool notify_all) shared;
/** Waits until an event gets triggered.
Multiple concurrent waits are allowed.
*/
void wait(EventID event, EventCallback on_event);
/// Cancels an ongoing wait operation.
void cancelWait(EventID event, EventCallback on_event);
/** Increments the reference count of the given event.
*/
void addRef(EventID descriptor);
/** Decrements the reference count of the given event.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(EventID descriptor);
}
/** Handling of POSIX signals.
*/
interface EventDriverSignals {
@safe: /*@nogc:*/ nothrow:
/** Starts listening for the specified POSIX signal.
Note that if a default signal handler exists for the signal, it will be
disabled by using this function.
Params:
sig: The number of the signal to listen for
on_signal: Callback that gets called whenever a matching signal gets
received
Returns:
Returns an identifier that identifies the resource associated with
the signal. Giving up ownership of this resource using `releaseRef`
will re-enable the default signal handler, if any was present.
For any error condition, `SignalListenID.invalid` will be returned
instead.
*/
SignalListenID listen(int sig, SignalCallback on_signal);
/** Increments the reference count of the given resource.
*/
void addRef(SignalListenID descriptor);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(SignalListenID descriptor);
}
interface EventDriverTimers {
@safe: /*@nogc:*/ nothrow:
TimerID create();
void set(TimerID timer, Duration timeout, Duration repeat);
void stop(TimerID timer);
bool isPending(TimerID timer);
bool isPeriodic(TimerID timer);
void wait(TimerID timer, TimerCallback callback);
void cancelWait(TimerID timer);
/** Increments the reference count of the given resource.
*/
void addRef(TimerID descriptor);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(TimerID descriptor);
/// Determines if the given timer's reference count equals one.
bool isUnique(TimerID descriptor) const;
}
interface EventDriverWatchers {
@safe: /*@nogc:*/ nothrow:
/// Watches a directory or a directory sub tree for changes.
WatcherID watchDirectory(string path, bool recursive, FileChangesCallback callback);
/** Increments the reference count of the given resource.
*/
void addRef(WatcherID descriptor);
/** Decrements the reference count of the given resource.
Once the reference count reaches zero, all associated resources will be
freed and the resource descriptor gets invalidated.
Returns:
Returns `false` $(I iff) the last reference was removed by this call.
*/
bool releaseRef(WatcherID descriptor);
}
// Helper class to enable fully stack allocated `std.socket.Address` instances.
final class RefAddress : Address {
version (Posix) import core.sys.posix.sys.socket : sockaddr, socklen_t;
version (Windows) import core.sys.windows.winsock2 : sockaddr, socklen_t;
private {
sockaddr* m_addr;
socklen_t m_addrLen;
}
this() @safe nothrow {}
this(sockaddr* addr, socklen_t addr_len) @safe nothrow { set(addr, addr_len); }
override @property sockaddr* name() { return m_addr; }
override @property const(sockaddr)* name() const { return m_addr; }
override @property socklen_t nameLen() const { return m_addrLen; }
void set(sockaddr* addr, socklen_t addr_len) @safe nothrow { m_addr = addr; m_addrLen = addr_len; }
void cap(socklen_t new_len)
@safe nothrow {
assert(new_len <= m_addrLen, "Cannot grow size of a RefAddress.");
m_addrLen = new_len;
}
}
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD, scope RefAddress remote_address);
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
alias DatagramIOCallback = void delegate(DatagramSocketFD, IOStatus, size_t, scope RefAddress);
alias DNSLookupCallback = void delegate(DNSLookupID, DNSStatus, scope RefAddress[]);
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
alias EventCallback = void delegate(EventID);
alias SignalCallback = void delegate(SignalListenID, SignalStatus, int);
alias TimerCallback = void delegate(TimerID);
alias FileChangesCallback = void delegate(WatcherID, in ref FileChange change);
@system alias DataInitializer = void function(void*);
enum ExitReason {
timeout,
idle,
outOfWaiters,
exited
}
enum ConnectStatus {
connected,
refused,
timeout,
bindFailure,
unknownError
}
enum ConnectionState {
initialized,
connecting,
connected,
passiveClose,
activeClose,
closed
}
enum StreamListenOptions {
defaults = 0,
reusePort = 1<<0,
}
/**
Specifies how a file is manipulated on disk.
*/
enum FileOpenMode {
/// The file is opened read-only.
read,
/// The file is opened for read-write random access.
readWrite,
/// The file is truncated if it exists or created otherwise and then opened for read-write access.
createTrunc,
/// The file is opened for appending data to it and created if it does not exist.
append
}
enum IOMode {
immediate, /// Process only as much as possible without waiting
once, /// Process as much as possible with a single call
all /// Process the full buffer
}
enum IOStatus {
ok, /// The data has been transferred normally
disconnected, /// The connection was closed before all data could be transterred
error, /// An error occured while transferring the data
wouldBlock /// Returned for `IOMode.immediate` when no data is readily readable/writable
}
enum DNSStatus {
ok,
error
}
/** Specifies the kind of change in a watched directory.
*/
enum FileChangeKind {
/// A file or directory was added
added,
/// A file or directory was deleted
removed,
/// A file or directory was modified
modified
}
enum SignalStatus {
ok,
error
}
/** Describes a single change in a watched directory.
*/
struct FileChange {
/// The type of change
FileChangeKind kind;
/// Directory containing the changed file
string directory;
/// Determines if the changed entity is a file or a directory.
bool isDirectory;
/// Name of the changed file
const(char)[] name;
}
struct Handle(string NAME, T, T invalid_value = T.init) {
import std.traits : isInstanceOf;
static if (isInstanceOf!(.Handle, T)) alias BaseType = T.BaseType;
else alias BaseType = T;
alias name = NAME;
enum invalid = Handle.init;
T value = invalid_value;
this(BaseType value) { this.value = T(value); }
U opCast(U : Handle!(V, M), V, int M)() {
// TODO: verify that U derives from typeof(this)!
return U(value);
}
U opCast(U : BaseType)()
{
return cast(U)value;
}
alias value this;
}
alias FD = Handle!("fd", size_t, size_t.max);
alias SocketFD = Handle!("socket", FD);
alias StreamSocketFD = Handle!("streamSocket", SocketFD);
alias StreamListenSocketFD = Handle!("streamListen", SocketFD);
alias DatagramSocketFD = Handle!("datagramSocket", SocketFD);
alias FileFD = Handle!("file", FD);
alias EventID = Handle!("event", FD);
alias TimerID = Handle!("timer", size_t);
alias WatcherID = Handle!("watcher", size_t);
alias EventWaitID = Handle!("eventWait", size_t);
alias SignalListenID = Handle!("signal", size_t);
alias DNSLookupID = Handle!("dns", size_t);