forked from hyperledger-labs/go-perun
-
Notifications
You must be signed in to change notification settings - Fork 2
/
localbus.go
109 lines (93 loc) · 2.97 KB
/
localbus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright 2020 - See NOTICE file for copyright holders.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wire
import (
"context"
"sync"
"github.com/pkg/errors"
"perun.network/go-perun/log"
"perun.network/go-perun/wallet"
)
type localBusReceiver struct {
recv Consumer
exists chan struct{}
}
var _ Bus = (*LocalBus)(nil)
// LocalBus is a bus that only sends message in the same process.
type LocalBus struct {
mutex sync.RWMutex
recvs map[wallet.AddrKey]*localBusReceiver
}
// NewLocalBus creates a new local bus, which only targets receivers that lie
// within the same process.
func NewLocalBus() *LocalBus {
return &LocalBus{recvs: make(map[wallet.AddrKey]*localBusReceiver)}
}
// Publish implements wire.Bus.Publish. It returns only once the recipient
// received the message or the context times out.
func (h *LocalBus) Publish(ctx context.Context, e *Envelope) error {
recv := h.ensureRecv(e.Recipient)
select {
case <-recv.exists:
recv.recv.Put(e)
return nil
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "publishing message")
}
}
// SubscribeClient implements wire.Bus.SubscribeClient. There can only be one
// subscription per receiver address.
// When the Consumer closes, its subscription is removed.
func (h *LocalBus) SubscribeClient(c Consumer, receiver Address) error {
recv := h.ensureRecv(receiver)
recv.recv = c
close(recv.exists)
c.OnCloseAlways(func() {
h.mutex.Lock()
defer h.mutex.Unlock()
delete(h.recvs, wallet.Key(receiver))
log.WithField("id", receiver).Debug("Client unsubscribed.")
})
log.WithField("id", receiver).Debug("Client subscribed.")
return nil
}
// ensureRecv ensures that there is an entry for a recipient address in the
// bus' receiver map, and returns it. If it creates a new receiver, it is only
// a placeholder until a subscription appears.
func (h *LocalBus) ensureRecv(a Address) *localBusReceiver {
key := wallet.Key(a)
// First, we only use a read lock, hoping that the receiver already exists.
h.mutex.RLock()
recv, ok := h.recvs[key]
h.mutex.RUnlock()
if ok {
return recv
}
// If not, we have to insert one, so we need exclusive an lock.
h.mutex.Lock()
defer h.mutex.Unlock()
// We need to re-check, because between the RUnlock() and Lock(), it could
// have been added by another goroutine already.
recv, ok = h.recvs[key]
if ok {
return recv
}
// Insert and return the new entry.
recv = &localBusReceiver{
recv: nil,
exists: make(chan struct{}),
}
h.recvs[key] = recv
return recv
}