-
Notifications
You must be signed in to change notification settings - Fork 2
/
buffer.lisp
113 lines (99 loc) · 3.99 KB
/
buffer.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
(defpackage grip.ext.buffer
(:use :cl)
(:import-from :grip.logger
:base-journal
:send-message)
(:import-from :grip.message
:base-message
:batch-message
:message-batch
:merge-messages)
(:import-from :local-time :now)
(:import-from :local-time-duration
:duration
:duration-as
:timestamp-difference)
(:import-from :chanl
:bounded-channel
:send
:recv
:thread-alive-p
:task-thread
:pcall
:select
:thread-name
:current-thread
:kill)
(:export :buffered-journal
:send-message
:close-journal))
(in-package :grip.ext.buffer)
(defclass buffered-journal (base-journal)
((wrapped-journal :reader buffer-journal :initarg :journal :type (or null base-journal) :initform nil)
(size :accessor buffer-size :initarg :size :type integer :initform 100)
(interval :accessor buffer-interval :initarg :interval :type duration :initform (duration :sec 10))
(worker-send :accessor buffer-message-worker :type (or null bt:thread) :initform nil)
(worker-timer :accessor buffer-timer-worker :type (or null bt:thread) :initform nil)
(buffer-chan :accessor buffer-chan :type (or null bounded-channel) :initform nil)
(timer-chan :accessor buffer-timer :type bounded-channel :initform (make-instance 'bounded-channel :size 1))
(signal-chan :accessor buffer-signal :type bounded-channel :initform (make-instance 'bounded-channel :size 1)))
(:documentation "The buffered journal wraps another journal
implementation and buffers the underlying implementation to send
messages on a fixed interval or after a certain volume of
messages."))
(defmethod send-message ((journal buffered-journal) (message base-message))
(setup-journal journal)
(send (buffer-chan journal) message))
(defmethod initialize-instance :after ((journal buffered-journal) &key)
(unless (buffer-journal journal)
(signal 'error "must specify journal to buffer"))
(setf (buffer-chan journal) (make-instance 'bounded-channel :size (buffer-size journal)))
(setf (buffer-timer-worker journal) (start-timer journal))
(setf (buffer-message-worker journal) (start-worker journal)))
(defmethod setup-journal ((journal buffered-journal))
(let ((timer-thread (buffer-timer-worker journal))
(message-thread (buffer-message-worker journal)))
(unless (and message-thread (thread-alive-p message-thread))
(setf (buffer-message-worker journal) (start-worker journal)))
(unless (and timer-thread (thread-alive-p timer-thread))
(setf (buffer-timer-worker journal) (start-timer journal)))))
(defmethod start-worker ((journal buffered-journal))
(task-thread
(pcall
(lambda ()
(let ((buf (make-instance 'batch-message)))
(loop
(select
((recv (buffer-chan journal) message)
(merge-messages buf message))
((recv (buffer-signal journal) res)
(send-message (buffer-journal journal) buf)
(setf buf (make-instance 'batch-message))
(return res))
((recv (buffer-timer journal) res)
(send-message (buffer-journal journal) buf)
(setf buf (make-instance 'batch-message)) res))
(when (>= (length (message-batch buf)) (buffer-size journal))
(send-message (buffer-journal journal) buf)
(setf buf (make-instance 'batch-message))))))
:name "grip.ext.buffered.worker")))
(defmethod start-timer ((journal buffered-journal))
(task-thread
(pcall
(lambda ()
(loop
(sleep (duration-as (buffer-interval journal) :sec))
(send (buffer-timer journal) t)))
:name "grip.ext.buffered.timer")))
(defmethod close-journal ((journal buffered-journal))
(send (buffer-signal journal) t)
(with-accessors ((timer-thread buffer-timer-worker)
(message-thread buffer-message-worker))
journal
(when (and timer-thread (thread-alive-p timer-thread))
(kill timer-thread)
(setf timer-thread nil))
(when (and message-thread (thread-alive-p message-thread))
(sleep 0.5) ;; this is just a kindness to allow the last flush to go through
(kill message-thread)
(setf message-thread nil))))