From 7beab58bbb0927be7945557411ee4a0210ff7b3c Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Fri, 14 Oct 2016 02:00:55 +0200 Subject: [PATCH 1/7] first prototype --- pubsub.go | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++ shell.go | 24 ++++- shell_test.go | 40 +++++++++ 3 files changed, 299 insertions(+), 1 deletion(-) create mode 100644 pubsub.go diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 000000000..469acf9a7 --- /dev/null +++ b/pubsub.go @@ -0,0 +1,236 @@ +package shell + +import ( + "encoding/base64" + "encoding/json" + "sync" +) + +type b64String string + +func (bs *b64String) UnmarshalJSON(in []byte) error { + var b64 string + + err := json.Unmarshal(in, &b64) + if err != nil { + return err + } + + bsStr, err := base64.StdEncoding.DecodeString(b64) + + *bs = b64String(bsStr) + return err +} + +func (bs *b64String) Marshal() (string, error) { + jsonBytes, err := json.Marshal( + base64.StdEncoding.EncodeToString( + []byte(*bs))) + + return string(jsonBytes), err +} + +/// + +type PubSubRecord struct { + From string `json:"from"` + Data b64String `json:"data"` + SeqNo b64String `json:"seqno"` + TopicIDs []string `json:"topicIDs"` +} + +/// + +type Subscription struct { + topic string + ch chan *PubSubRecord +} + +func newSubscription(topic string, ch chan *PubSubRecord) *Subscription { + return &Subscription{ + topic: topic, + ch: ch, + } +} + +func (s *Subscription) Next() *PubSubRecord { + return <-s.ch +} + +func (s *Subscription) Topic() string { + return s.topic +} + +/// + +type subscriptionHandler struct { + topic string + resp *Response + + readers map[chan *PubSubRecord]struct{} + + stop chan struct{} + add, drop chan chan *PubSubRecord + + failReason error +} + +func newSubscriptionHandler(topic string, resp *Response) *subscriptionHandler { + sh := &subscriptionHandler{ + // the topic that is being handled + topic: topic, + // stop shuts down the subscription handler. + stop: make(chan struct{}), + // readers is the set of listeners + readers: make(map[chan *PubSubRecord]struct{}), + //add is the channel in which you add more listeners + add: make(chan chan *PubSubRecord), + //drop is the channel to which you send channels + drop: make(chan chan *PubSubRecord), + resp: resp, + } + + go sh.work() + + return sh +} + +func (sh *subscriptionHandler) work() { + readOne := func(ch chan *PubSubRecord, errCh chan error) { + d := json.NewDecoder(sh.resp.Output) + if sh.resp.Error != nil { + errCh <- sh.resp.Error + return + } + + r := PubSubRecord{} + err := d.Decode(&r) + if err != nil { + errCh <- err + return + } + + ch <- &r + } + + ch := make(chan *PubSubRecord) + errCh := make(chan error) + + go readOne(ch, errCh) + +L: + for { + select { + // remove a rdCh from pool + case ch := <-sh.drop: + delete(sh.readers, ch) + + if len(sh.readers) == 0 { + break L + } + + // add a rdCh to pool + case ch := <-sh.add: + sh.readers[ch] = struct{}{} + + case r := <-ch: + for rdCh := range sh.readers { + rdCh <- r + } + + go readOne(ch, errCh) + + case err := <-errCh: + sh.failReason = err + break L + + case <-sh.stop: + break L + } + } + + for rdCh := range sh.readers { + delete(sh.readers, rdCh) + close(rdCh) + } + + sh.resp.Output.Close() + sh = nil +} + +func (sh *subscriptionHandler) Stop() { + sh.stop <- struct{}{} +} + +func (sh *subscriptionHandler) Sub() *Subscription { + ch := make(chan *PubSubRecord) + + sh.add <- ch + + return newSubscription(sh.topic, ch) +} + +func (sh *subscriptionHandler) Drop(s *Subscription) { + sh.drop <- s.ch +} + +func (sh *subscriptionHandler) Error() error { + return sh.failReason +} + +/// + +type subscriptionManager struct { + sync.Mutex + + s *Shell + subs map[string]*subscriptionHandler +} + +func newSubscriptionManager(s *Shell) *subscriptionManager { + return &subscriptionManager{ + s: s, + subs: make(map[string]*subscriptionHandler), + } +} + +func (sm *subscriptionManager) Sub(topic string) (*Subscription, error) { + // lock + sm.Lock() + defer sm.Unlock() + + // check if already subscribed + sh := sm.subs[topic] + if sh == nil { // if not, do so! + // connect + req := sm.s.newRequest("pubsub/sub", topic) + resp, err := req.Send(sm.s.httpcli) + if err != nil { + return nil, err + } + + // pass connection to handler and add handler to manager + sh = newSubscriptionHandler(topic, resp) + sm.subs[topic] = sh + } + + // success + return sh.Sub(), nil +} + +func (sm *subscriptionManager) Drop(s *Subscription) { + sm.Lock() + defer sm.Unlock() + + sh := sm.subs[s.topic] + if sh != nil { + sh.Drop(s) + } +} + +func (sm *subscriptionManager) dropHandler(sh *subscriptionHandler) { + sm.Lock() + defer sm.Unlock() + + delete(sm.subs, sh.topic) +} diff --git a/shell.go b/shell.go index 291dd393f..3840c8e10 100644 --- a/shell.go +++ b/shell.go @@ -22,6 +22,8 @@ import ( type Shell struct { url string httpcli *gohttp.Client + + sm *subscriptionManager } func NewShell(url string) *Shell { @@ -31,7 +33,10 @@ func NewShell(url string) *Shell { }, } - return NewShellWithClient(url, c) + s := NewShellWithClient(url, c) + s.sm = newSubscriptionManager(s) + + return s } func NewShellWithClient(url string, c *gohttp.Client) *Shell { @@ -689,6 +694,23 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) { return out.Hash, nil } +func (s *Shell) PubSubSubscribe(topic string) (*Subscription, error) { + return s.sm.Sub(topic) +} + +func (s *Shell) PubSubCancelSubscription(sub *Subscription) { + s.sm.Drop(sub) +} + +func (s *Shell) PubSubPublish(topic, data string) error { + resp, err := s.newRequest("pubsub/pub", topic, data).Send(s.httpcli) + if err != nil { + return err + } + + return resp.Error +} + func (s *Shell) DiagNet(format string) ([]byte, error) { var result = new(bytes.Buffer) diff --git a/shell_test.go b/shell_test.go index 1e246dd5a..35bdd817e 100644 --- a/shell_test.go +++ b/shell_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/cheekybits/is" ) @@ -116,3 +117,42 @@ func TestResolvePath(t *testing.T) { is.Nil(err) is.Equal(childHash, "QmZTR5bcpQD7cFgTorqxZDYaew1Wqgfbd2ud9QqGPAkK2V") } + +func TestPubSub(t *testing.T) { + is := is.New(t) + s := NewShell(shellUrl) + + var ( + sub *Subscription + err error + wait1 <-chan time.Time + wait = make(chan struct{}) + ) + + go func() { + wait1 = time.After(time.Second) + wait <- struct{}{} + t.Log("subscribing...") + sub, err = s.PubSubSubscribe("test") + is.Nil(err) + t.Log("sub: done") + + wait <- struct{}{} + }() + + <-wait + <-wait1 + + t.Log("publishing...") + is.Nil(s.PubSubPublish("test", "Hello World!")) + t.Log("pub: done") + + <-wait + + t.Log("next()...") + r := sub.Next() + t.Log("next: done. ") + + is.NotNil(r) + is.Equal(r.Data, "Hello World!") +} From 5a7ec200ece8d172282bf2003f42d7465b71a32c Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Fri, 14 Oct 2016 12:57:12 +0200 Subject: [PATCH 2/7] pubsub: remove subscription management; naive implementation --- pubsub.go | 31 ++++++++++++++++++++----------- shell.go | 16 ++++++++++------ shell_test.go | 29 +++++++++++++++++++++++++---- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/pubsub.go b/pubsub.go index 469acf9a7..ab6261967 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,7 +3,7 @@ package shell import ( "encoding/base64" "encoding/json" - "sync" + // "sync" ) type b64String string @@ -42,27 +42,35 @@ type PubSubRecord struct { /// type Subscription struct { - topic string - ch chan *PubSubRecord + resp *Response } -func newSubscription(topic string, ch chan *PubSubRecord) *Subscription { +func newSubscription(resp *Response) *Subscription { return &Subscription{ - topic: topic, - ch: ch, + resp: resp, } } -func (s *Subscription) Next() *PubSubRecord { - return <-s.ch +func (s *Subscription) Next() (*PubSubRecord, error) { + if s.resp.Error != nil { + return nil, s.resp.Error + } + + d := json.NewDecoder(s.resp.Output) + + r := &PubSubRecord{} + err := d.Decode(r) + + return r, err } -func (s *Subscription) Topic() string { - return s.topic +func (s *Subscription) Cancel() error { + return s.resp.Output.Close() } /// +/* type subscriptionHandler struct { topic string resp *Response @@ -154,7 +162,7 @@ L: close(rdCh) } - sh.resp.Output.Close() + //sh.resp.Output.Close() sh = nil } @@ -234,3 +242,4 @@ func (sm *subscriptionManager) dropHandler(sh *subscriptionHandler) { delete(sm.subs, sh.topic) } +*/ diff --git a/shell.go b/shell.go index 3840c8e10..1a5a9d195 100644 --- a/shell.go +++ b/shell.go @@ -23,7 +23,7 @@ type Shell struct { url string httpcli *gohttp.Client - sm *subscriptionManager + //sm *subscriptionManager } func NewShell(url string) *Shell { @@ -34,7 +34,7 @@ func NewShell(url string) *Shell { } s := NewShellWithClient(url, c) - s.sm = newSubscriptionManager(s) + //s.sm = newSubscriptionManager(s) return s } @@ -695,11 +695,15 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) { } func (s *Shell) PubSubSubscribe(topic string) (*Subscription, error) { - return s.sm.Sub(topic) -} + // connect + req := s.newRequest("pubsub/sub", topic) + + resp, err := req.Send(s.httpcli) + if err != nil { + return nil, err + } -func (s *Shell) PubSubCancelSubscription(sub *Subscription) { - s.sm.Drop(sub) + return newSubscription(resp), nil } func (s *Shell) PubSubPublish(topic, data string) error { diff --git a/shell_test.go b/shell_test.go index 35bdd817e..b6bdc8e1c 100644 --- a/shell_test.go +++ b/shell_test.go @@ -123,6 +123,8 @@ func TestPubSub(t *testing.T) { s := NewShell(shellUrl) var ( + topic = "test" + sub *Subscription err error wait1 <-chan time.Time @@ -130,11 +132,12 @@ func TestPubSub(t *testing.T) { ) go func() { - wait1 = time.After(time.Second) + wait1 = time.After(20 * time.Millisecond) // workaround for go-ipfs#3304 wait <- struct{}{} t.Log("subscribing...") - sub, err = s.PubSubSubscribe("test") + sub, err = s.PubSubSubscribe(topic) is.Nil(err) + is.NotNil(sub) t.Log("sub: done") wait <- struct{}{} @@ -144,15 +147,33 @@ func TestPubSub(t *testing.T) { <-wait1 t.Log("publishing...") - is.Nil(s.PubSubPublish("test", "Hello World!")) + is.Nil(s.PubSubPublish(topic, "Hello World!")) t.Log("pub: done") <-wait t.Log("next()...") - r := sub.Next() + r, err := sub.Next() t.Log("next: done. ") is.NotNil(r) + is.Nil(err) is.Equal(r.Data, "Hello World!") + + sub2, err := s.PubSubSubscribe(topic) + is.Nil(err) + is.NotNil(sub2) + + is.Nil(s.PubSubPublish(topic, "Hallo Welt!")) + + r, err = sub2.Next() // duplicate subscription error + is.NotNil(err) + is.Nil(r) + + r, err = sub.Next() + is.NotNil(r) + is.Nil(err) + is.Equal(r.Data, "Hallo Welt!") + + is.Nil(sub.Cancel()) } From 1ce3c0c5064af9994e218a660b8386ed9d3d52d5 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Sat, 15 Oct 2016 05:18:37 +0200 Subject: [PATCH 3/7] still prototyping --- pubsub.go | 34 +++++++++++++++++++++------------- shell.go | 12 +++++------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/pubsub.go b/pubsub.go index ab6261967..de40a1075 100644 --- a/pubsub.go +++ b/pubsub.go @@ -39,19 +39,23 @@ type PubSubRecord struct { TopicIDs []string `json:"topicIDs"` } +func (r PubSubRecord) DataString() string { + return string(r.Data) +} + /// -type Subscription struct { +type PubSubSubscription struct { resp *Response } -func newSubscription(resp *Response) *Subscription { - return &Subscription{ +func newPubSubSubscription(resp *Response) *PubSubSubscription { + return &PubSubSubscription{ resp: resp, } } -func (s *Subscription) Next() (*PubSubRecord, error) { +func (s *PubSubSubscription) Next() (*PubSubRecord, error) { if s.resp.Error != nil { return nil, s.resp.Error } @@ -64,7 +68,11 @@ func (s *Subscription) Next() (*PubSubRecord, error) { return r, err } -func (s *Subscription) Cancel() error { +func (s *PubSubSubscription) Cancel() error { + if s.resp.Output == nil { + return nil + } + return s.resp.Output.Close() } @@ -83,7 +91,7 @@ type subscriptionHandler struct { failReason error } -func newSubscriptionHandler(topic string, resp *Response) *subscriptionHandler { +func newPubSubSubscriptionHandler(topic string, resp *Response) *subscriptionHandler { sh := &subscriptionHandler{ // the topic that is being handled topic: topic, @@ -170,15 +178,15 @@ func (sh *subscriptionHandler) Stop() { sh.stop <- struct{}{} } -func (sh *subscriptionHandler) Sub() *Subscription { +func (sh *subscriptionHandler) Sub() *PubSubSubscription { ch := make(chan *PubSubRecord) sh.add <- ch - return newSubscription(sh.topic, ch) + return newPubSubSubscription(sh.topic, ch) } -func (sh *subscriptionHandler) Drop(s *Subscription) { +func (sh *subscriptionHandler) Drop(s *PubSubSubscription) { sh.drop <- s.ch } @@ -195,14 +203,14 @@ type subscriptionManager struct { subs map[string]*subscriptionHandler } -func newSubscriptionManager(s *Shell) *subscriptionManager { +func newPubSubSubscriptionManager(s *Shell) *subscriptionManager { return &subscriptionManager{ s: s, subs: make(map[string]*subscriptionHandler), } } -func (sm *subscriptionManager) Sub(topic string) (*Subscription, error) { +func (sm *subscriptionManager) Sub(topic string) (*PubSubSubscription, error) { // lock sm.Lock() defer sm.Unlock() @@ -218,7 +226,7 @@ func (sm *subscriptionManager) Sub(topic string) (*Subscription, error) { } // pass connection to handler and add handler to manager - sh = newSubscriptionHandler(topic, resp) + sh = newPubSubSubscriptionHandler(topic, resp) sm.subs[topic] = sh } @@ -226,7 +234,7 @@ func (sm *subscriptionManager) Sub(topic string) (*Subscription, error) { return sh.Sub(), nil } -func (sm *subscriptionManager) Drop(s *Subscription) { +func (sm *subscriptionManager) Drop(s *PubSubSubscription) { sm.Lock() defer sm.Unlock() diff --git a/shell.go b/shell.go index 1a5a9d195..f47fb2e2d 100644 --- a/shell.go +++ b/shell.go @@ -22,8 +22,6 @@ import ( type Shell struct { url string httpcli *gohttp.Client - - //sm *subscriptionManager } func NewShell(url string) *Shell { @@ -34,7 +32,7 @@ func NewShell(url string) *Shell { } s := NewShellWithClient(url, c) - //s.sm = newSubscriptionManager(s) + //s.sm = newPubSubSubscriptionManager(s) return s } @@ -694,7 +692,7 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) { return out.Hash, nil } -func (s *Shell) PubSubSubscribe(topic string) (*Subscription, error) { +func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) { // connect req := s.newRequest("pubsub/sub", topic) @@ -703,16 +701,16 @@ func (s *Shell) PubSubSubscribe(topic string) (*Subscription, error) { return nil, err } - return newSubscription(resp), nil + return newPubSubSubscription(resp), nil } func (s *Shell) PubSubPublish(topic, data string) error { - resp, err := s.newRequest("pubsub/pub", topic, data).Send(s.httpcli) + _, err := s.newRequest("pubsub/pub", topic, data).Send(s.httpcli) if err != nil { return err } - return resp.Error + return nil } func (s *Shell) DiagNet(format string) ([]byte, error) { From 35dd31f67deb870305f798cb25f21680b714181e Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Mon, 28 Nov 2016 14:42:28 +0100 Subject: [PATCH 4/7] reflect recent server-side pubsub changes, e.g. flushing --- pubsub.go | 190 ++++---------------------------------------------- shell.go | 5 +- shell_test.go | 36 ++++------ 3 files changed, 26 insertions(+), 205 deletions(-) diff --git a/pubsub.go b/pubsub.go index de40a1075..4fa001e31 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,11 +3,13 @@ package shell import ( "encoding/base64" "encoding/json" - // "sync" ) +// floodsub uses base64 encoding for just about everything. +// To Decode the base64 while also decoding the JSON the type b64String was added. type b64String string +// UnmarshalJSON implements the json.Unmarshaler interface. func (bs *b64String) UnmarshalJSON(in []byte) error { var b64 string @@ -32,6 +34,7 @@ func (bs *b64String) Marshal() (string, error) { /// +// PubSubRecord is a record received via PubSub. type PubSubRecord struct { From string `json:"from"` Data b64String `json:"data"` @@ -39,22 +42,28 @@ type PubSubRecord struct { TopicIDs []string `json:"topicIDs"` } +// DataString returns the string representation of the data field. func (r PubSubRecord) DataString() string { return string(r.Data) } /// +// PubSubSubscription allow you to receive pubsub records that where published on the network. type PubSubSubscription struct { resp *Response } func newPubSubSubscription(resp *Response) *PubSubSubscription { - return &PubSubSubscription{ + sub := &PubSubSubscription{ resp: resp, } + + sub.Next() // skip empty element used for flushing + return sub } +// Next waits for the next record and returns that. func (s *PubSubSubscription) Next() (*PubSubRecord, error) { if s.resp.Error != nil { return nil, s.resp.Error @@ -68,6 +77,7 @@ func (s *PubSubSubscription) Next() (*PubSubRecord, error) { return r, err } +// Cancel cancels the given subscription. func (s *PubSubSubscription) Cancel() error { if s.resp.Output == nil { return nil @@ -75,179 +85,3 @@ func (s *PubSubSubscription) Cancel() error { return s.resp.Output.Close() } - -/// - -/* -type subscriptionHandler struct { - topic string - resp *Response - - readers map[chan *PubSubRecord]struct{} - - stop chan struct{} - add, drop chan chan *PubSubRecord - - failReason error -} - -func newPubSubSubscriptionHandler(topic string, resp *Response) *subscriptionHandler { - sh := &subscriptionHandler{ - // the topic that is being handled - topic: topic, - // stop shuts down the subscription handler. - stop: make(chan struct{}), - // readers is the set of listeners - readers: make(map[chan *PubSubRecord]struct{}), - //add is the channel in which you add more listeners - add: make(chan chan *PubSubRecord), - //drop is the channel to which you send channels - drop: make(chan chan *PubSubRecord), - resp: resp, - } - - go sh.work() - - return sh -} - -func (sh *subscriptionHandler) work() { - readOne := func(ch chan *PubSubRecord, errCh chan error) { - d := json.NewDecoder(sh.resp.Output) - if sh.resp.Error != nil { - errCh <- sh.resp.Error - return - } - - r := PubSubRecord{} - err := d.Decode(&r) - if err != nil { - errCh <- err - return - } - - ch <- &r - } - - ch := make(chan *PubSubRecord) - errCh := make(chan error) - - go readOne(ch, errCh) - -L: - for { - select { - // remove a rdCh from pool - case ch := <-sh.drop: - delete(sh.readers, ch) - - if len(sh.readers) == 0 { - break L - } - - // add a rdCh to pool - case ch := <-sh.add: - sh.readers[ch] = struct{}{} - - case r := <-ch: - for rdCh := range sh.readers { - rdCh <- r - } - - go readOne(ch, errCh) - - case err := <-errCh: - sh.failReason = err - break L - - case <-sh.stop: - break L - } - } - - for rdCh := range sh.readers { - delete(sh.readers, rdCh) - close(rdCh) - } - - //sh.resp.Output.Close() - sh = nil -} - -func (sh *subscriptionHandler) Stop() { - sh.stop <- struct{}{} -} - -func (sh *subscriptionHandler) Sub() *PubSubSubscription { - ch := make(chan *PubSubRecord) - - sh.add <- ch - - return newPubSubSubscription(sh.topic, ch) -} - -func (sh *subscriptionHandler) Drop(s *PubSubSubscription) { - sh.drop <- s.ch -} - -func (sh *subscriptionHandler) Error() error { - return sh.failReason -} - -/// - -type subscriptionManager struct { - sync.Mutex - - s *Shell - subs map[string]*subscriptionHandler -} - -func newPubSubSubscriptionManager(s *Shell) *subscriptionManager { - return &subscriptionManager{ - s: s, - subs: make(map[string]*subscriptionHandler), - } -} - -func (sm *subscriptionManager) Sub(topic string) (*PubSubSubscription, error) { - // lock - sm.Lock() - defer sm.Unlock() - - // check if already subscribed - sh := sm.subs[topic] - if sh == nil { // if not, do so! - // connect - req := sm.s.newRequest("pubsub/sub", topic) - resp, err := req.Send(sm.s.httpcli) - if err != nil { - return nil, err - } - - // pass connection to handler and add handler to manager - sh = newPubSubSubscriptionHandler(topic, resp) - sm.subs[topic] = sh - } - - // success - return sh.Sub(), nil -} - -func (sm *subscriptionManager) Drop(s *PubSubSubscription) { - sm.Lock() - defer sm.Unlock() - - sh := sm.subs[s.topic] - if sh != nil { - sh.Drop(s) - } -} - -func (sm *subscriptionManager) dropHandler(sh *subscriptionHandler) { - sm.Lock() - defer sm.Unlock() - - delete(sm.subs, sh.topic) -} -*/ diff --git a/shell.go b/shell.go index f47fb2e2d..b91eac2ae 100644 --- a/shell.go +++ b/shell.go @@ -31,10 +31,7 @@ func NewShell(url string) *Shell { }, } - s := NewShellWithClient(url, c) - //s.sm = newPubSubSubscriptionManager(s) - - return s + return NewShellWithClient(url, c) } func NewShellWithClient(url string, c *gohttp.Client) *Shell { diff --git a/shell_test.go b/shell_test.go index b6bdc8e1c..2db24a98c 100644 --- a/shell_test.go +++ b/shell_test.go @@ -125,40 +125,29 @@ func TestPubSub(t *testing.T) { var ( topic = "test" - sub *Subscription - err error - wait1 <-chan time.Time - wait = make(chan struct{}) + sub *PubSubSubscription + err error ) - go func() { - wait1 = time.After(20 * time.Millisecond) // workaround for go-ipfs#3304 - wait <- struct{}{} - t.Log("subscribing...") - sub, err = s.PubSubSubscribe(topic) - is.Nil(err) - is.NotNil(sub) - t.Log("sub: done") - - wait <- struct{}{} - }() + t.Log("subscribing...") + sub, err = s.PubSubSubscribe(topic) + is.Nil(err) + is.NotNil(sub) + t.Log("sub: done") - <-wait - <-wait1 + time.Sleep(10 * time.Millisecond) t.Log("publishing...") is.Nil(s.PubSubPublish(topic, "Hello World!")) t.Log("pub: done") - <-wait - t.Log("next()...") r, err := sub.Next() t.Log("next: done. ") is.NotNil(r) is.Nil(err) - is.Equal(r.Data, "Hello World!") + is.Equal(r.DataString(), "Hello World!") sub2, err := s.PubSubSubscribe(topic) is.Nil(err) @@ -166,9 +155,10 @@ func TestPubSub(t *testing.T) { is.Nil(s.PubSubPublish(topic, "Hallo Welt!")) - r, err = sub2.Next() // duplicate subscription error - is.NotNil(err) - is.Nil(r) + r, err = sub2.Next() + is.Nil(err) + is.NotNil(r) + is.Equal(r.Data, "Hallo Welt!") r, err = sub.Next() is.NotNil(r) From c47982ff24166125506ed3113ec69716742b8b8d Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Mon, 28 Nov 2016 15:18:08 +0100 Subject: [PATCH 5/7] first check error, then value in tests --- shell_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shell_test.go b/shell_test.go index 2db24a98c..73a1b017c 100644 --- a/shell_test.go +++ b/shell_test.go @@ -145,8 +145,8 @@ func TestPubSub(t *testing.T) { r, err := sub.Next() t.Log("next: done. ") - is.NotNil(r) is.Nil(err) + is.NotNil(r) is.Equal(r.DataString(), "Hello World!") sub2, err := s.PubSubSubscribe(topic) From 8d0e84fc1207fc527f33e2e40bbc15b429a180ec Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Wed, 30 Nov 2016 16:08:50 +0100 Subject: [PATCH 6/7] make PubSubRecord an interface --- pubsub.go | 62 ++++++++++++++++++++++++--------------------------- shell_test.go | 6 ++--- 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/pubsub.go b/pubsub.go index 4fa001e31..37d40d47c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1,50 +1,46 @@ package shell import ( - "encoding/base64" + "encoding/binary" "encoding/json" -) -// floodsub uses base64 encoding for just about everything. -// To Decode the base64 while also decoding the JSON the type b64String was added. -type b64String string + "github.com/libp2p/go-floodsub" + "github.com/libp2p/go-libp2p-peer" +) -// UnmarshalJSON implements the json.Unmarshaler interface. -func (bs *b64String) UnmarshalJSON(in []byte) error { - var b64 string +// PubSubRecord is a record received via PubSub. +type PubSubRecord interface { + // From returns the peer ID of the node that published this record + From() peer.ID - err := json.Unmarshal(in, &b64) - if err != nil { - return err - } + // Data returns the data field + Data() []byte - bsStr, err := base64.StdEncoding.DecodeString(b64) + // SeqNo is the sequence number of this record + SeqNo() int64 - *bs = b64String(bsStr) - return err + //TopicIDs is the list of topics this record belongs to + TopicIDs() []string } -func (bs *b64String) Marshal() (string, error) { - jsonBytes, err := json.Marshal( - base64.StdEncoding.EncodeToString( - []byte(*bs))) +type floodsubRecord struct { + msg *floodsub.Message +} - return string(jsonBytes), err +func (r floodsubRecord) From() peer.ID { + return r.msg.GetFrom() } -/// +func (r floodsubRecord) Data() []byte { + return r.msg.GetData() +} -// PubSubRecord is a record received via PubSub. -type PubSubRecord struct { - From string `json:"from"` - Data b64String `json:"data"` - SeqNo b64String `json:"seqno"` - TopicIDs []string `json:"topicIDs"` +func (r floodsubRecord) SeqNo() int64 { + return int64(binary.BigEndian.Uint64(r.msg.GetSeqno())) } -// DataString returns the string representation of the data field. -func (r PubSubRecord) DataString() string { - return string(r.Data) +func (r floodsubRecord) TopicIDs() []string { + return r.msg.GetTopicIDs() } /// @@ -64,17 +60,17 @@ func newPubSubSubscription(resp *Response) *PubSubSubscription { } // Next waits for the next record and returns that. -func (s *PubSubSubscription) Next() (*PubSubRecord, error) { +func (s *PubSubSubscription) Next() (PubSubRecord, error) { if s.resp.Error != nil { return nil, s.resp.Error } d := json.NewDecoder(s.resp.Output) - r := &PubSubRecord{} + r := &floodsub.Message{} err := d.Decode(r) - return r, err + return floodsubRecord{msg: r}, err } // Cancel cancels the given subscription. diff --git a/shell_test.go b/shell_test.go index 73a1b017c..74ec854ec 100644 --- a/shell_test.go +++ b/shell_test.go @@ -147,7 +147,7 @@ func TestPubSub(t *testing.T) { is.Nil(err) is.NotNil(r) - is.Equal(r.DataString(), "Hello World!") + is.Equal(r.Data(), "Hello World!") sub2, err := s.PubSubSubscribe(topic) is.Nil(err) @@ -158,12 +158,12 @@ func TestPubSub(t *testing.T) { r, err = sub2.Next() is.Nil(err) is.NotNil(r) - is.Equal(r.Data, "Hallo Welt!") + is.Equal(r.Data(), "Hallo Welt!") r, err = sub.Next() is.NotNil(r) is.Nil(err) - is.Equal(r.Data, "Hallo Welt!") + is.Equal(r.Data(), "Hallo Welt!") is.Nil(sub.Cancel()) } From 50bf08725adee628ef534557bfab438ba3b6d417 Mon Sep 17 00:00:00 2001 From: Jan Winkelmann Date: Thu, 1 Dec 2016 16:57:46 +0100 Subject: [PATCH 7/7] enable pubsub for travis --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 6528f584e..40119f00f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ services: before_install: - docker pull ipfs/go-ipfs:master - mkdir /tmp/ipfs && chmod 0777 /tmp/ipfs -- docker run -d -v /tmp/ipfs:/data/ipfs -p 8080:8080 -p 4001:4001 -p 5001:5001 ipfs/go-ipfs:master +- docker run -d -v /tmp/ipfs:/data/ipfs -p 8080:8080 -p 4001:4001 -p 5001:5001 ipfs/go-ipfs:master --enable-pubsub-experiment install: - go get -t -v ./...