-
Notifications
You must be signed in to change notification settings - Fork 28
/
pcqueue.go
155 lines (141 loc) · 4.5 KB
/
pcqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// A producer/consumer queue is a concurrent bounded buffer supporting
// multiple concurrent producers and consumers, with timeouts. The queue can be
// closed from either end, by the producer and/or the consumer. When closed,
// the contents are discarded, and subsequent operations return an error.
//
// Note: the main reason to use a producer/consumer queue instead of a channel
// is to allow the consumer to close the channel. This queue can be used for
// many-to-many communication with multiple producers and/or multiple consumers.
// Any of the producers and any of the consumers are allowed to close the
// queue.
package pcqueue
import (
"errors"
"sync"
)
var (
ErrQueueIsClosed = errors.New("queue is closed")
ErrCancelled = errors.New("operation was canceled")
ErrTryAgain = errors.New("operation failed, try again")
)
// T is a producer/consumer queue. It fulfills the same purpose as a Go
// channel, the main advantage is that the Put() operation does not panic, even
// after the queue is closed. The main disadvantage is that the T can't
// be used in a select operation.
type T struct {
// The mutex R/W mode depends only on whether the immediate struct fields
// are being read or modified. It isn't related to whether the channel
// operations are mutating. For example, the Put() method takes a read lock
// because it reads the contents and isClosed fields. It mutates the
// contents channel, but that doesn't matter.
mutex sync.RWMutex
contents chan interface{} // GUARDED_BY(mutex)
isClosed bool // GUARDED_BY(mutex), true iff <closed> is closed.
closed chan struct{}
}
// New(size) returns a producer/consumer queue with maximum
// <size> elements.
func New(maxSize int) *T {
return &T{
contents: make(chan interface{}, maxSize),
closed: make(chan struct{})}
}
// Put(item, cancel) adds an item to the queue, or returns an error if the queue
// is closed or the operation is cancelled. The <cancel> channel may be nil, in
// which case the operation can't be cancelled.
func (q *T) Put(item interface{}, cancel <-chan struct{}) error {
contents := q.putChannel()
select {
case contents <- item:
case <-q.closed:
return ErrQueueIsClosed
case <-cancel:
return ErrCancelled
}
return nil
}
// Get(cancel) returns the next item from the queue, or an error if
// the queue is closed or the operation is cancelled.
func (q *T) Get(cancel <-chan struct{}) (interface{}, error) {
contents := q.getChannel()
select {
case v := <-contents:
return v, nil
case <-q.closed:
return q.drain()
case <-cancel:
return nil, ErrCancelled
}
}
// TryPut attempts to add an item to the queue. If the queue is full,
// ErrTryAgain is returned immediately, without blocking. If the queue is
// closed, ErrQueueIsClosed is returned.
func (q *T) TryPut(item interface{}) error {
contents := q.putChannel()
select {
case contents <- item:
return nil
default:
}
q.mutex.RLock()
defer q.mutex.RUnlock()
if q.isClosed {
return ErrQueueIsClosed
}
return ErrTryAgain
}
// Close() closes the queue, without discarding the contents. All Put*() operations
// currently running may, or may not, add their values to the queue. All Put*()
// operations that happen-after the Close() will fail.
func (q *T) Close() {
q.mutex.Lock()
if !q.isClosed {
q.isClosed = true
close(q.closed)
}
q.mutex.Unlock()
}
// Shutdown() closes the queue and discards all contents. Any concurrent Get()
// and Put() operations might exchange values, but all operations that
// happen-after the Shutdown() will fail.
func (q *T) Shutdown() {
q.mutex.Lock()
if !q.isClosed {
q.isClosed = true
close(q.closed)
}
q.contents = nil
q.mutex.Unlock()
}
// putChannel() returns a channel for inserting new values. Returns nil if
// the queue has been closed.
func (q *T) putChannel() chan interface{} {
q.mutex.RLock()
defer q.mutex.RUnlock()
if q.isClosed {
return nil
}
return q.contents
}
// getChannel() returns the <contents> channel.
func (q *T) getChannel() chan interface{} {
q.mutex.RLock()
defer q.mutex.RUnlock()
return q.contents
}
// drain() returns any queued elements. Once the queue is empty, all subsequent
// values are discarded.
func (q *T) drain() (interface{}, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
select {
case v := <-q.contents:
return v, nil
default:
q.contents = nil
return nil, ErrQueueIsClosed
}
}