Skip to content

Commit

Permalink
muxrpc Handled()
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptix committed Feb 4, 2021
1 parent eebe726 commit 15d616d
Show file tree
Hide file tree
Showing 25 changed files with 137 additions and 160 deletions.
10 changes: 7 additions & 3 deletions blobstore/wants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package blobstore

import (
"bytes"
"context"
"crypto/sha256"
"fmt"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/stretchr/testify/require"
refs "go.mindeco.de/ssb-refs"

"go.cryptoscope.co/luigi"
"go.cryptoscope.co/muxrpc/v2"
mmock "go.cryptoscope.co/muxrpc/v2/mock"
"go.cryptoscope.co/ssb"
Expand Down Expand Up @@ -123,8 +123,12 @@ func TestWantManager(t *testing.T) {
wmsg = append(wmsg, ssb.BlobWant{Ref: ref, Dist: dist})
}

var outSlice []interface{}
out := luigi.NewSliceSink(&outSlice)
// var outSlice []interface{}
// out := luigi.NewSliceSink(&outSlice)

var outBuf bytes.Buffer
out := muxrpc.NewTestSink(&outBuf)

ctx := context.Background()
edp := &mmock.FakeEndpoint{
SourceStub: func(ctx context.Context, enc muxrpc.RequestEncoding, method muxrpc.Method, args ...interface{}) (*muxrpc.ByteSource, error) {
Expand Down
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func (c Client) Tangles(o message.TanglesArgs) (*muxrpc.ByteSource, error) {

type noopHandler struct{ logger log.Logger }

func (noopHandler) Handled(m muxrpc.Method) bool { return false }

func (h noopHandler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint) {}

func (h noopHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
go.cryptoscope.co/librarian v0.2.1-0.20200604160012-d85e03a70e79
go.cryptoscope.co/luigi v0.3.6-0.20200131144242-3256b54e72c8
go.cryptoscope.co/margaret v0.1.7-0.20201027171711-332f00d22dd0
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210201110246-6b708c77b871
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210202162901-fe642d405dc6
go.cryptoscope.co/netwrap v0.1.1
go.cryptoscope.co/secretstream v1.2.2
go.mindeco.de/ssb-gabbygrove v0.1.7-0.20200618115102-169cb68d2398
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ go.cryptoscope.co/muxrpc/v2 v2.0.0-20210201102350-3e60aec3131e h1:OezooYoVUEp1uK
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210201102350-3e60aec3131e/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210201110246-6b708c77b871 h1:PBftSpU+RqGhHV1we/PWB//yt7TsYEIaqRBJBnGKOtg=
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210201110246-6b708c77b871/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210202162901-fe642d405dc6 h1:p135TwijE3DbmklGygc7++MMRRVlujmjqed8kEOmwLs=
go.cryptoscope.co/muxrpc/v2 v2.0.0-20210202162901-fe642d405dc6/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
go.cryptoscope.co/netwrap v0.1.0/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
go.cryptoscope.co/netwrap v0.1.1 h1:JLzzGKEvrUrkKzu3iM0DhpHmt+L/gYqmpcf1lJMUyFs=
go.cryptoscope.co/netwrap v0.1.1/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
Expand Down
3 changes: 1 addition & 2 deletions network/isserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func (th testHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {
require.Equal(th.t, th.wantServer, muxrpc.IsServer(e), "server assertion failed")
}

func (th testHandler) HandleCall(ctx context.Context, req *muxrpc.Request, edp muxrpc.Endpoint) {
}
func (th testHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {}

func makeServerHandler(t *testing.T, wantServer bool) func(net.Conn) (muxrpc.Handler, error) {
return func(_ net.Conn) (muxrpc.Handler, error) {
Expand Down
7 changes: 2 additions & 5 deletions plugins/ebt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (h *MUXRPCHandler) check(err error) {
}
}

func (MUXRPCHandler) Handled(m muxrpc.Method) bool { return m.String() == "ebt.replicate" }

// HandleConnect does nothing. Feature negotiation is done by sbot
func (h *MUXRPCHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

Expand All @@ -63,11 +65,6 @@ func (h *MUXRPCHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
}
}

if req.Method.String() != "ebt.replicate" {
checkAndClose(fmt.Errorf("unknown command: %s", req.Method))
return
}

if req.Type != "duplex" {
checkAndClose(fmt.Errorf("invalid type: %s", req.Type))
return
Expand Down
2 changes: 2 additions & 0 deletions plugins/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type handler struct {
unboxer *private.Manager
}

func (handler) Handled(m muxrpc.Method) bool { return m.String() == "get" }

func (h handler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (h handler) HandleCall(ctx context.Context, req *muxrpc.Request) {
Expand Down
46 changes: 26 additions & 20 deletions plugins/gossip/feed_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
package gossip

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"testing"
"time"

"go.cryptoscope.co/muxrpc/v2/codec"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +21,7 @@ import (
"go.cryptoscope.co/librarian"
"go.cryptoscope.co/margaret"
"go.cryptoscope.co/margaret/multilog"
"go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/ssb"
"go.cryptoscope.co/ssb/internal/asynctesting"
"go.cryptoscope.co/ssb/internal/ctxutils"
Expand Down Expand Up @@ -160,34 +165,35 @@ func TestCreateHistoryStream(t *testing.T) {
r.EqualValues(userFeedLen-1, seqv)

test.Args.ID = keyPair.Id
var sink countSink
sink.info = infoAlice
var buf = new(bytes.Buffer)
var sink = muxrpc.NewTestSink(buf)

fm := NewFeedManager(context.TODO(), rootLog, userFeeds, infoAlice, nil, nil)

err = fm.CreateStreamHistory(ctx, &sink, &test.Args)
err = fm.CreateStreamHistory(ctx, sink, &test.Args)
r.NoError(err)
t.Log("serving")
create(t, test.LiveMessages, "post/live")
time.Sleep(200 * time.Millisecond)

require.Equal(t, sink.cnt, test.TotalReceived)
cnt := len(readAllPackets(buf))
// -1 for the EndErr packet (which isnt a message)
require.Equal(t, cnt-1, test.TotalReceived)
})
}
}

type countSink struct {
cnt int
info log.Logger
}

func (cs *countSink) Pour(ctx context.Context, val interface{}) error {
cs.info.Log("countSink", "got", "cnt", cs.cnt) //, "val", fmt.Sprintf("%s (%T)", val, val))
cs.cnt++
return nil
}

func (cs *countSink) Close() error {
cs.info.Log("countSink", "closed")
return nil
func readAllPackets(r io.Reader) []*codec.Packet {
var pkts []*codec.Packet
cr := codec.NewReader(r)
for {
pkt, err := cr.ReadPacket()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
panic(err)
}
pkts = append(pkts, pkt)
}
return pkts
}
6 changes: 3 additions & 3 deletions plugins/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type LegacyGossip struct {
rootCtx context.Context
}

func (LegacyGossip) Handled(m muxrpc.Method) bool { return m.String() == "createHistoryStream" }

// HandleConnect on this handler triggers legacy createHistoryStream replication.
func (g *LegacyGossip) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {
g.StartLegacyFetching(ctx, e, g.enableLiveStreaming)
Expand Down Expand Up @@ -236,9 +238,7 @@ func (g *LegacyGossip) HandleCall(
}
// don't close stream (feedManager will pass it on to live processing or close it itself)

case "tunnel.ping":
fallthrough

// TODO: move gossip.ping to it's own handler
case "gossip.ping":
snk.SetEncoding(muxrpc.TypeJSON)
ts := []byte(strconv.FormatInt(time.Now().UnixNano()/1000000, 10))
Expand Down
6 changes: 2 additions & 4 deletions plugins/legacyinvites/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ type acceptHandler struct {
service *Service
}

func (acceptHandler) Handled(m muxrpc.Method) bool { return m.String() == "invite.use" }

func (h acceptHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (h acceptHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
h.service.mu.Lock()
defer h.service.mu.Unlock()
if req.Method.String() != "invite.use" {
req.CloseWithError(fmt.Errorf("unknown method"))
return
}

// parse passed arguments
var args []struct {
Expand Down
7 changes: 2 additions & 5 deletions plugins/legacyinvites/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@ type createArguments struct {
Note string `json:"note,omitempty"`
}

func (createHandler) Handled(m muxrpc.Method) bool { return m.String() == "invite.create" }

func (h createHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (h createHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
if req.Method.String() != "invite.create" {
req.CloseWithError(fmt.Errorf("unknown method"))
return
}

// parse passed arguments
var args createArguments
if err := json.Unmarshal(req.RawArgs, &args); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions plugins/peerinvites/idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ type handler struct {

func (h handler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (h handler) HandleCall(ctx context.Context, req *muxrpc.Request, edp muxrpc.Endpoint) {
func (h handler) HandleCall(ctx context.Context, req *muxrpc.Request) {
if len(req.Args()) < 1 {
req.CloseWithError(fmt.Errorf("invalid arguments"))
return
}

guestRef, err := ssb.GetFeedRefFromAddr(edp.Remote())
guestRef, err := ssb.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
req.CloseWithError(fmt.Errorf("no guest ref: %w", err))
return
Expand Down
12 changes: 12 additions & 0 deletions plugins/private/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ type handler struct {
mngr *private.Manager
}

func (handler) Handled(m muxrpc.Method) bool {
if len(m) != 2 {
return false
}

if m[0] != "private" {
return false
}

return m[1] == "publish" || m[1] == "read"
}

func (h handler) HandleCall(ctx context.Context, req *muxrpc.Request) {
var closed bool
checkAndClose := func(err error) {
Expand Down
5 changes: 3 additions & 2 deletions plugins/rawread/rxlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type rxLogHandler struct {
root margaret.Log
}

func (g rxLogHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {
}
func (rxLogHandler) Handled(m muxrpc.Method) bool { return m.String() == "createLogStream" }

func (g rxLogHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (g rxLogHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
// fmt.Fprintln(os.Stderr, "createLogStream args:", string(req.RawArgs))
Expand Down
5 changes: 3 additions & 2 deletions plugins/rawread/seq.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ type seqStreamHandler struct {
root margaret.Log
}

func (g seqStreamHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {
}
func (seqStreamHandler) Handled(m muxrpc.Method) bool { return m.String() == "createSequenceStream" }

func (g seqStreamHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (g seqStreamHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
fmt.Fprintln(os.Stderr, "seqStream args:", string(req.RawArgs))
Expand Down
7 changes: 2 additions & 5 deletions plugins/replicate/upto.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@ type replicateHandler struct {
users multilog.MultiLog
}

func (replicateHandler) Handled(m muxrpc.Method) bool { return m.String() == "replicate.upto" }

func (g replicateHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (g replicateHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
if len(req.Method) < 2 && req.Method[1] != "upto" {
req.CloseWithError(fmt.Errorf("invalid method"))
return
}

src, err := ssb.FeedsWithSequnce(g.users)
if err != nil {
req.CloseWithError(fmt.Errorf("replicate: did not get feed source: %w", err))
Expand Down
2 changes: 2 additions & 0 deletions plugins/status/plug.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func (lt Plugin) Name() string { return "status" }
func (Plugin) Method() muxrpc.Method { return muxrpc.Method{"status"} }
func (lt Plugin) Handler() muxrpc.Handler { return lt }

func (Plugin) Handled(m muxrpc.Method) bool { return m.String() == "status" }

func (g Plugin) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {}

func (g Plugin) HandleCall(ctx context.Context, req *muxrpc.Request) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func MakeEmptyPeer(t testing.TB) (repo.Interface, string) {
return dstRepo, dstPath
}

func PrepareConnectAndServe(t testing.TB, alice, bob repo.Interface) (muxrpc.Packer, muxrpc.Packer, *muxtest.Transcript, func(rpc1, rpc2 muxrpc.Endpoint) func()) {
func PrepareConnectAndServe(t testing.TB, alice, bob repo.Interface) (*muxrpc.Packer, *muxrpc.Packer, *muxtest.Transcript, func(rpc1, rpc2 muxrpc.Endpoint) func()) {
r := require.New(t)
keyAlice, err := repo.DefaultKeyPair(alice)
r.NoError(err, "error opening alice's key pair")
Expand Down
10 changes: 2 additions & 8 deletions plugins/whoami/whoami.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,11 @@ type handler struct {
id *refs.FeedRef
}

func (handler) Handled(m muxrpc.Method) bool { return m.String() == "whoami" }

func (handler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint) {}

func (h handler) HandleCall(ctx context.Context, req *muxrpc.Request) {
// TODO: push manifest check into muxrpc
if req.Type == "" {
req.Type = "async"
}
if req.Method.String() != "whoami" {
req.CloseWithError(fmt.Errorf("wrong method"))
return
}
type ret struct {
ID string `json:"id"`
}
Expand Down

0 comments on commit 15d616d

Please sign in to comment.