/
mpmc-queue.el
110 lines (93 loc) · 4.1 KB
/
mpmc-queue.el
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
;;; mpmc-queue.el --- a multiple-producer-multiple-consumer queue -*- lexical-binding: t; -*-
;; Copyright (C) 2018 Sho Mizoe
;; Author: Sho Mizoe <sho.mizoe@gmail.com>
;; URL: https://github.com/smizoe/mpmc-queue
;; Version: 0.1.1
;; Keywords: lisp, async
;; Package-Requires: ((emacs "26.0") (queue "0.2.0"))
;; This program is free software; you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with this program. If not, see <http://www.gnu.org/licenses/>.
;;; Commentary:
;; a wrapper for queue.el, which makes it a mpmc queue
;;; Code:
(require 'queue)
(cl-defstruct
(mpmc-queue
(:constructor nil)
(:constructor mpmc-queue--create (&key
(internal-queue (make-queue))
(mutex (make-mutex))
&aux
(non-empty-condition (make-condition-variable mutex))
)
)
(:copier nil)
)
internal-queue
mutex
non-empty-condition
)
(defmacro mpmc-queue--with-mutex (mpmcq &rest body)
"While taking the mutex of mpmc-queue MPMCQ, evaluate BODY."
`(with-mutex (mpmc-queue-mutex ,mpmcq) ,@body)
)
(defun mpmc-queue-get (mpmcq &optional non-blocking)
"Get the first element from mpmc-queue MPMCQ.
If NON-BLOCKING is t, return nil immediately if the queue is empty.
Otherwise block until an element is available."
(mpmc-queue--with-mutex mpmcq
(unless (and non-blocking (queue-empty (mpmc-queue-internal-queue mpmcq)))
(while (queue-empty (mpmc-queue-internal-queue mpmcq))
(condition-wait (mpmc-queue-non-empty-condition mpmcq))
)
(cl-assert (not (queue-empty (mpmc-queue-internal-queue mpmcq)))
nil
"the queue was empty but mpmc-queue-get exited stopped waiting for an element addition.")
(queue-dequeue (mpmc-queue-internal-queue mpmcq))
)
)
)
(defun mpmc-queue-peek (mpmcq &optional non-blocking)
"Return the value at the head of MPMCQ.
If NON-BLOCKING is t and the queue is empty, return nil immediately.
Otherwise block until an element is available."
(mpmc-queue--with-mutex mpmcq
(unless (and non-blocking (queue-empty (mpmc-queue-internal-queue mpmcq)))
(while (queue-empty (mpmc-queue-internal-queue mpmcq))
(condition-wait (mpmc-queue-non-empty-condition mpmcq))
)
(cl-assert (not (queue-empty (mpmc-queue-internal-queue mpmcq)))
nil
"the queue was empty but mpmc-queue-peek exited stopped waiting for an element addition.")
(queue-first (mpmc-queue-internal-queue mpmcq))
)
)
)
(defun mpmc-queue-put (mpmcq elem)
"Given MPMCQ, append ELEM to it."
(mpmc-queue--with-mutex mpmcq
(queue-enqueue
(mpmc-queue-internal-queue mpmcq)
elem
)
(condition-notify (mpmc-queue-non-empty-condition mpmcq))
)
)
(defun mpmc-queue-empty-p (mpmcq)
"Check if MPMCQ is empty and return t if it's empty."
(mpmc-queue--with-mutex mpmcq
(let ((internal-queue (mpmc-queue-internal-queue mpmcq)))
(queue-empty internal-queue)
)
)
)
(provide 'mpmc-queue)
;;; mpmc-queue.el ends here