Skip to content

Commit

Permalink
feat(message): Modify message API
Browse files Browse the repository at this point in the history
Convert GraphSyncRequest and GraphSyncResponse to structs, track by value, pass directly into add
methods on GraphSyncMessage
  • Loading branch information
hannahhoward committed Apr 19, 2019
1 parent 69ff295 commit 5caaec2
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 223 deletions.
5 changes: 2 additions & 3 deletions docs/go-graphsync.puml
Expand Up @@ -91,7 +91,7 @@ package "go-graphsync" {

package messagequeue {
class MessageQueue {
AddRequest(GraphSyncRequestID, selector []byte, GraphSyncPriority)
AddRequest(GraphSyncRequest)
Cancel(GraphSyncRequestID)
Startup()
Shutdown()
Expand All @@ -109,8 +109,7 @@ package "go-graphsync" {
Connected(p peer.ID)
Disconnected(p peer.ID)
ConnectedPeers() []peer.ID
SendRequest(peer.ID, GraphSyncRequestID, selector []byte, priority)
CancelRequest(peer.ID, GraphSyncRequestID)
SendRequest(peer.ID, GraphSyncRequest)
}

object "Package Public Functions" as goPeerManagerPF {
Expand Down
143 changes: 72 additions & 71 deletions message/message.go
Expand Up @@ -88,23 +88,6 @@ func IsTerminalResponseCode(status GraphSyncResponseStatusCode) bool {
return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

// GraphSyncRequest is an interface for accessing data on request contained in a
// GraphSyncMessage.
type GraphSyncRequest interface {
Selector() []byte
Priority() GraphSyncPriority
ID() GraphSyncRequestID
IsCancel() bool
}

// GraphSyncResponse is an interface for accessing data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse interface {
RequestID() GraphSyncRequestID
Status() GraphSyncResponseStatusCode
Extra() []byte
}

// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
Expand All @@ -114,16 +97,9 @@ type GraphSyncMessage interface {

Blocks() []blocks.Block

AddRequest(id GraphSyncRequestID,
selector []byte,
priority GraphSyncPriority)

Cancel(id GraphSyncRequestID)
AddRequest(graphSyncRequest GraphSyncRequest)

AddResponse(
requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
extra []byte)
AddResponse(graphSyncResponse GraphSyncResponse)

AddBlock(blocks.Block)

Expand All @@ -140,22 +116,26 @@ type Exportable interface {
ToNet(w io.Writer) error
}

type graphSyncRequest struct {
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
selector []byte
priority GraphSyncPriority
id GraphSyncRequestID
isCancel bool
}

type graphSyncResponse struct {
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
requestID GraphSyncRequestID
status GraphSyncResponseStatusCode
extra []byte
}

type graphSyncMessage struct {
requests map[GraphSyncRequestID]*graphSyncRequest
responses map[GraphSyncRequestID]*graphSyncResponse
requests map[GraphSyncRequestID]GraphSyncRequest
responses map[GraphSyncRequestID]GraphSyncResponse
blocks map[cid.Cid]blocks.Block
}

Expand All @@ -166,20 +146,55 @@ func New() GraphSyncMessage {

func newMsg() *graphSyncMessage {
return &graphSyncMessage{
requests: make(map[GraphSyncRequestID]*graphSyncRequest),
responses: make(map[GraphSyncRequestID]*graphSyncResponse),
requests: make(map[GraphSyncRequestID]GraphSyncRequest),
responses: make(map[GraphSyncRequestID]GraphSyncResponse),
blocks: make(map[cid.Cid]blocks.Block),
}
}

// NewRequest builds a new Graphsync request
func NewRequest(id GraphSyncRequestID,
selector []byte,
priority GraphSyncPriority) GraphSyncRequest {
return newRequest(id, selector, priority, false)
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id GraphSyncRequestID) GraphSyncRequest {
return newRequest(id, nil, 0, true)
}

func newRequest(id GraphSyncRequestID,
selector []byte,
priority GraphSyncPriority,
isCancel bool) GraphSyncRequest {
return GraphSyncRequest{
id: id,
selector: selector,
priority: priority,
isCancel: isCancel,
}
}

// NewResponse builds a new Graphsync response
func NewResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
extra []byte) GraphSyncResponse {
return GraphSyncResponse{
requestID: requestID,
status: status,
extra: extra,
}
}

func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Requests {
gsm.addRequest(GraphSyncRequestID(req.Id), req.Selector, GraphSyncPriority(req.Priority), req.Cancel)
gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), req.Selector, GraphSyncPriority(req.Priority), req.Cancel))
}

for _, res := range pbm.Responses {
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra)
gsm.AddResponse(NewResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra))
}

for _, b := range pbm.GetData() {
Expand Down Expand Up @@ -232,38 +247,12 @@ func (gsm *graphSyncMessage) Blocks() []blocks.Block {
return bs
}

func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) {
delete(gsm.requests, id)
gsm.addRequest(id, nil, 0, true)
}

func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID,
selector []byte,
priority GraphSyncPriority,
) {
gsm.addRequest(id, selector, priority, false)
}

func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
selector []byte,
priority GraphSyncPriority,
isCancel bool) {
gsm.requests[id] = &graphSyncRequest{
id: id,
selector: selector,
priority: priority,
isCancel: isCancel,
}
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
gsm.requests[graphSyncRequest.id] = graphSyncRequest
}

func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
extra []byte) {
gsm.responses[requestID] = &graphSyncResponse{
requestID: requestID,
status: status,
extra: extra,
}
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
}

func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
Expand Down Expand Up @@ -339,11 +328,23 @@ func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
}
}

func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id }
func (gsr *graphSyncRequest) Selector() []byte { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel }
// ID Returns the request ID for this Request
func (gsr GraphSyncRequest) ID() GraphSyncRequestID { return gsr.id }

// Selector returns the byte representation of the selector for this request
func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }

// Priority returns the priority of this request
func (gsr GraphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }

// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }

// Status returns the status for a response
func (gsr GraphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }

func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Extra() []byte { return gsr.extra }
// Extra returns any metadata on a response
func (gsr GraphSyncResponse) Extra() []byte { return gsr.extra }
12 changes: 6 additions & 6 deletions message/message_test.go
Expand Up @@ -18,7 +18,7 @@ func TestAppendingRequests(t *testing.T) {
priority := GraphSyncPriority(rand.Int31())

gsm := New()
gsm.AddRequest(id, selector, priority)
gsm.AddRequest(NewRequest(id, selector, priority))
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not add request to message")
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestAppendingResponses(t *testing.T) {
status := RequestAcknowledged

gsm := New()
gsm.AddResponse(requestID, status, extra)
gsm.AddResponse(NewResponse(requestID, status, extra))
responses := gsm.Responses()
if len(responses) != 1 {
t.Fatal("Did not add response to message")
Expand Down Expand Up @@ -135,9 +135,9 @@ func TestRequestCancel(t *testing.T) {
priority := GraphSyncPriority(rand.Int31())

gsm := New()
gsm.AddRequest(id, selector, priority)
gsm.AddRequest(NewRequest(id, selector, priority))

gsm.Cancel(id)
gsm.AddRequest(CancelRequest(id))

requests := gsm.Requests()
if len(requests) != 1 {
Expand All @@ -158,8 +158,8 @@ func TestToNetFromNetEquivalency(t *testing.T) {
status := RequestAcknowledged

gsm := New()
gsm.AddRequest(id, selector, priority)
gsm.AddResponse(id, status, extra)
gsm.AddRequest(NewRequest(id, selector, priority))
gsm.AddResponse(NewResponse(id, status, extra))

gsm.AddBlock(blocks.NewBlock([]byte("W")))
gsm.AddBlock(blocks.NewBlock([]byte("E")))
Expand Down
26 changes: 8 additions & 18 deletions messagequeue/messagequeue.go
Expand Up @@ -52,34 +52,24 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
}

// AddRequest adds an outgoing request to the message queue.
func (mq *MessageQueue) AddRequest(
id gsmsg.GraphSyncRequestID,
selector []byte,
priority gsmsg.GraphSyncPriority) {
func (mq *MessageQueue) AddRequest(graphSyncRequest gsmsg.GraphSyncRequest) {

if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
nextMessage.AddRequest(id, selector, priority)
nextMessage.AddRequest(graphSyncRequest)
}, nil) {
mq.signalWork()
}
}

// Cancel adds a cancel message for the give request id to the queue.
func (mq *MessageQueue) Cancel(id gsmsg.GraphSyncRequestID) {

if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
nextMessage.Cancel(id)
}, nil) {
mq.signalWork()
}
}

// AddBlocks adds the given blocks to the next message and returns a channel
// that sends a notification when the blocks are read. If ignored by the consumer
// AddResponses adds the given blocks and responses to the next message and
// returns a channel that sends a notification when sending initiates. If ignored by the consumer
// sending will not block.
func (mq *MessageQueue) AddBlocks(blks []blocks.Block) <-chan struct{} {
func (mq *MessageQueue) AddResponses(responses []gsmsg.GraphSyncResponse, blks []blocks.Block) <-chan struct{} {
notificationChannel := make(chan struct{}, 1)
if mq.mutateNextMessage(func(nextMessage gsmsg.GraphSyncMessage) {
for _, response := range responses {
nextMessage.AddResponse(response)
}
for _, block := range blks {
nextMessage.AddBlock(block)
}
Expand Down
29 changes: 20 additions & 9 deletions messagequeue/messagequeue_test.go
Expand Up @@ -69,7 +69,7 @@ func TestStartupAndShutdown(t *testing.T) {
selector := testutil.RandomBytes(100)

waitGroup.Add(1)
messageQueue.AddRequest(id, selector, priority)
messageQueue.AddRequest(gsmsg.NewRequest(id, selector, priority))

select {
case <-ctx.Done():
Expand Down Expand Up @@ -105,20 +105,25 @@ func TestProcessingNotification(t *testing.T) {
waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128)

blocksProcessing := messageQueue.AddBlocks(blks)
newMessage := gsmsg.New()
responseID := gsmsg.GraphSyncRequestID(rand.Int31())
extra := testutil.RandomBytes(100)
status := gsmsg.RequestCompletedFull
newMessage.AddResponse(gsmsg.NewResponse(responseID, status, extra))
processing := messageQueue.AddResponses(newMessage.Responses(), blks)
select {
case <-blocksProcessing:
t.Fatal("Blocks should not be processing but already received notification")
case <-processing:
t.Fatal("Message should not be processing but already received notification")
default:
}

// wait for send attempt
messageQueue.Startup()
waitGroup.Wait()
select {
case <-blocksProcessing:
case <-processing:
case <-ctx.Done():
t.Fatal("blocks should have been processed but were not")
t.Fatal("Message should have been processed but were not")
}

select {
Expand All @@ -131,6 +136,12 @@ func TestProcessingNotification(t *testing.T) {
t.Fatal("sent incorrect block")
}
}
firstResponse := message.Responses()[0]
if responseID != firstResponse.RequestID() ||
status != firstResponse.Status() ||
!reflect.DeepEqual(firstResponse.Extra(), extra) {
t.Fatal("Send incorrect response")
}
}
}

Expand All @@ -154,7 +165,7 @@ func TestDedupingMessages(t *testing.T) {
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)

messageQueue.AddRequest(id, selector, priority)
messageQueue.AddRequest(gsmsg.NewRequest(id, selector, priority))
// wait for send attempt
waitGroup.Wait()
id2 := gsmsg.GraphSyncRequestID(rand.Int31())
Expand All @@ -163,8 +174,8 @@ func TestDedupingMessages(t *testing.T) {
id3 := gsmsg.GraphSyncRequestID(rand.Int31())
priority3 := gsmsg.GraphSyncPriority(rand.Int31())
selector3 := testutil.RandomBytes(100)
messageQueue.AddRequest(id2, selector2, priority2)
messageQueue.AddRequest(id3, selector3, priority3)
messageQueue.AddRequest(gsmsg.NewRequest(id2, selector2, priority2))
messageQueue.AddRequest(gsmsg.NewRequest(id3, selector3, priority3))

select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions network/libp2p_impl_test.go
Expand Up @@ -69,8 +69,8 @@ func TestMessageSendAndReceive(t *testing.T) {
status := gsmsg.RequestAcknowledged

sent := gsmsg.New()
sent.AddRequest(id, selector, priority)
sent.AddResponse(id, status, extra)
sent.AddRequest(gsmsg.NewRequest(id, selector, priority))
sent.AddResponse(gsmsg.NewResponse(id, status, extra))

err = gsnet1.ConnectTo(ctx, host2.ID())
if err != nil {
Expand Down

0 comments on commit 5caaec2

Please sign in to comment.