@@ -13,7 +13,7 @@ import (
13
13
cmds "github.com/ipfs/go-ipfs/commands"
14
14
core "github.com/ipfs/go-ipfs/core"
15
15
16
- floodsub "gx/ipfs/QmWiLbk7eE1jGePDAuS26E2A9bMK3e3PMH3dcSeRY3MEBR /floodsub"
16
+ floodsub "gx/ipfs/QmRJs5veT3gnuYpLAagC3NbzixbkgwjSdUXTKfh3hMo6XM /floodsub"
17
17
pstore "gx/ipfs/QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo/go-libp2p-peerstore"
18
18
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
19
19
cid "gx/ipfs/QmcEcrBAMrwMyhSjXt4yfyPpzgSuV8HLHavnfmiKCSRqZU/go-cid"
@@ -77,7 +77,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
77
77
}
78
78
79
79
topic := req .Arguments ()[0 ]
80
- msgs , err := n .Floodsub .Subscribe (req . Context (), topic )
80
+ sub , err := n .Floodsub .Subscribe (topic )
81
81
if err != nil {
82
82
res .SetError (err , cmds .ErrNormal )
83
83
return
@@ -86,19 +86,19 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
86
86
out := make (chan interface {})
87
87
res .SetOutput ((<- chan interface {})(out ))
88
88
89
- ctx := req .Context ()
90
89
go func () {
90
+ defer sub .Cancel ()
91
91
defer close (out )
92
92
for {
93
- select {
94
- case msg , ok := <- msgs :
95
- if ! ok {
96
- return
97
- }
98
- out <- msg
99
- case <- ctx .Done ():
100
- n .Floodsub .Unsub (topic )
93
+ msg , err := sub .Next (req .Context ())
94
+ if err == io .EOF || err == context .Canceled {
95
+ break
96
+ } else if err != nil {
97
+ res .SetError (err , cmds .ErrNormal )
98
+ return
101
99
}
100
+
101
+ out <- msg
102
102
}
103
103
}()
104
104
0 commit comments