Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockwise: Use hash instead of token for cache #554

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dtls/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func Client(conn *dtls.Conn, opts ...udp.Option) *udpClient.Conn {
v,
cfg.BlockwiseTransferTimeout,
cfg.Errors,
func(token message.Token) (*pool.Message, bool) {
return v.GetObservationRequest(token)
func(hash uint64) (*pool.Message, bool) {
return v.GetObservationRequest(hash)
},
)
}
Expand Down
4 changes: 2 additions & 2 deletions dtls/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (s *Server) createConn(connection *coapNet.Conn, inactivityMonitor udpClien
v,
s.cfg.BlockwiseTransferTimeout,
s.cfg.Errors,
func(token message.Token) (*pool.Message, bool) {
return v.GetObservationRequest(token)
func(hash uint64) (*pool.Message, bool) {
return v.GetObservationRequest(hash)
},
)
}
Expand Down
4 changes: 4 additions & 0 deletions message/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type OptionID uint16
| 35 | x | x | - | | Proxy-Uri | string | 1-1034 | (none) |
| 39 | x | x | - | | Proxy-Scheme | string | 1-255 | (none) |
| 60 | | | x | | Size1 | uint | 0-4 | (none) |
| 292 | | | | x | Request-Tag | opaque | 0-8 | (none) |
+-----+----+---+---+---+----------------+--------+--------+---------+
C=Critical, U=Unsafe, N=NoCacheKey, R=Repeatable
*/
Expand All @@ -73,6 +74,7 @@ const (
ProxyScheme OptionID = 39
Size1 OptionID = 60
NoResponse OptionID = 258
RequestTag OptionID = 292
)

var optionIDToString = map[OptionID]string{
Expand All @@ -96,6 +98,7 @@ var optionIDToString = map[OptionID]string{
ProxyScheme: "ProxyScheme",
Size1: "Size1",
NoResponse: "NoResponse",
RequestTag: "RequestTag",
}

func (o OptionID) String() string {
Expand Down Expand Up @@ -153,6 +156,7 @@ var CoapOptionDefs = map[OptionID]OptionDef{
ProxyScheme: {ValueFormat: ValueString, MinLen: 1, MaxLen: 255},
Size1: {ValueFormat: ValueUint, MinLen: 0, MaxLen: 4},
NoResponse: {ValueFormat: ValueUint, MinLen: 0, MaxLen: 1},
RequestTag: {ValueFormat: ValueOpaque, MinLen: 0, MaxLen: 8},
}

// MediaType specifies the content format of a message.
Expand Down
109 changes: 75 additions & 34 deletions net/blockwise/blockwise.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"errors"
"fmt"
"hash/crc64"
"io"
"net"
"time"

"github.com/dsnet/golib/memfile"
Expand Down Expand Up @@ -131,14 +133,17 @@ type Client interface {
AcquireMessage(ctx context.Context) *pool.Message
// return back the message to the pool for next use
ReleaseMessage(m *pool.Message)

// The remote address for determining the endpoint pair
RemoteAddr() net.Addr
}

type BlockWise[C Client] struct {
cc C
receivingMessagesCache *cache.Cache[uint64, *messageGuard]
sendingMessagesCache *cache.Cache[uint64, *pool.Message]
errors func(error)
getSentRequestFromOutside func(token message.Token) (*pool.Message, bool)
getSentRequestFromOutside func(hash uint64) (*pool.Message, bool)
expiration time.Duration
}

Expand All @@ -160,10 +165,10 @@ func New[C Client](
cc C,
expiration time.Duration,
errors func(error),
getSentRequestFromOutside func(token message.Token) (*pool.Message, bool),
getSentRequestFromOutside func(hash uint64) (*pool.Message, bool),
) *BlockWise[C] {
if getSentRequestFromOutside == nil {
getSentRequestFromOutside = func(message.Token) (*pool.Message, bool) { return nil, false }
getSentRequestFromOutside = func(uint64) (*pool.Message, bool) { return nil, false }
}
return &BlockWise[C]{
cc: cc,
Expand Down Expand Up @@ -214,11 +219,12 @@ func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, do
if !ok {
expire = time.Now().Add(b.expiration)
}
_, loaded := b.sendingMessagesCache.LoadOrStore(r.Token().Hash(), cache.NewElement(r, expire, nil))
matchableHash := generateMatchableHash(r.Options(), b.cc.RemoteAddr(), r.Code())
_, loaded := b.sendingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(r, expire, nil))
if loaded {
return nil, errors.New("invalid token")
}
defer b.sendingMessagesCache.Delete(r.Token().Hash())
defer b.sendingMessagesCache.Delete(matchableHash)
if r.Body() == nil {
return do(r)
}
Expand Down Expand Up @@ -282,9 +288,9 @@ func (b *BlockWise[C]) WriteMessage(request *pool.Message, maxSZX SZX, maxMessag
if err != nil {
return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err)
}

matchableHash := generateMatchableHash(request.Options(), b.cc.RemoteAddr(), request.Code())
w := newWriteRequestResponse(b.cc, request)
err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock)
err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, matchableHash)
if err != nil {
return fmt.Errorf("cannot start writing request: %w", err)
}
Expand Down Expand Up @@ -333,8 +339,8 @@ func wantsToBeReceived(r *pool.Message) bool {
return true
}

func (b *BlockWise[C]) getSendingMessageCode(token uint64) (codes.Code, bool) {
v := b.sendingMessagesCache.Load(token)
func (b *BlockWise[C]) getSendingMessageCode(hash uint64) (codes.Code, bool) {
v := b.sendingMessagesCache.Load(hash)
if v == nil {
return codes.Empty, false
}
Expand All @@ -348,19 +354,20 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa
}
token := r.Token()

matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code())

if len(token) == 0 {
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next)
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next, matchableHash)
if err != nil {
b.sendEntityIncomplete(w, token)
b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err))
}
return
}
tokenStr := token.Hash()

sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(tokenStr)
sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(matchableHash)
if !sendingMessageExist || wantsToBeReceived(r) {
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next)
err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next, matchableHash)
if err != nil {
b.sendEntityIncomplete(w, token)
b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err))
Expand All @@ -369,17 +376,17 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa
}
more, err := b.continueSendingMessage(w, r, maxSZX, maxMessageSize, sendingMessageCode)
if err != nil {
b.sendingMessagesCache.Delete(tokenStr)
b.sendingMessagesCache.Delete(matchableHash)
b.errors(fmt.Errorf("continueSendingMessage(%v): %w", r, err))
return
}
// For codes GET,POST,PUT,DELETE, we want them to wait for pairing response and then delete them when the full response comes in or when timeout occurs.
if !more && sendingMessageCode > codes.DELETE {
b.sendingMessagesCache.Delete(tokenStr)
b.sendingMessagesCache.Delete(matchableHash)
}
}

func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error {
func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), rxHash uint64) error {
startSendingMessageBlock, err := EncodeBlockOption(maxSZX, 0, true)
if err != nil {
return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err)
Expand Down Expand Up @@ -411,7 +418,7 @@ func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C]
return errP
}
}
return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock)
return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, rxHash)
}

func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX SZX, maxMessageSize uint32, block uint32) (sendMessage *pool.Message, more bool, err error) {
Expand Down Expand Up @@ -504,7 +511,8 @@ func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C
}
var sendMessage *pool.Message
var more bool
b.sendingMessagesCache.LoadWithFunc(r.Token().Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpenate-ellenbytech Thank you for contribution :)

This is an issue: the hash for the response will not match the request hash because r.Code() is different, and r.Options(), such as the URI, are not part of the response. Therefore, you cannot pair it correctly for block1.

For sending a big payload (Block1):
You can use RemoteAddress and MessageID, which need to be registered for each new request of type Confirmation/NonConfirmation and response. If MessageID is not supported, then only the remote address can be used to verify block1.

For receiving a big payload (Block2):
When you want to locate the sent request, this will not work because the hash will again be different. Therefore, you need to pair it with the request using MessageID. However, you need to set a new MessageID for each exchange (request and response).

BTW: For blockwise transfer via TCP, the token must be used as implemented now.

Therefore, create blockwiseTCP.go (original code) and blockwiseUDP.go, where blockwiseUDP.go will utilize MessageIDs.

b.sendingMessagesCache.LoadWithFunc(matchableHash, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
sendMessage, more, err = b.createSendingMessage(value.Data(), maxSZX, maxMessageSize, block)
if err != nil {
err = fmt.Errorf("cannot create sending message: %w", err)
Expand All @@ -529,7 +537,7 @@ func isObserveResponse(msg *pool.Message) bool {
return msg.Code() >= codes.Created
}

func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32) error {
func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32, rxHash uint64) error {
payloadSize, err := w.Message().BodySize()
if err != nil {
return payloadSizeError(err)
Expand All @@ -552,16 +560,16 @@ func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C],
if !ok {
expire = time.Now().Add(b.expiration)
}
el, loaded := b.sendingMessagesCache.LoadOrStore(sendingMessage.Token().Hash(), cache.NewElement(originalSendingMessage, expire, nil))
el, loaded := b.sendingMessagesCache.LoadOrStore(rxHash, cache.NewElement(originalSendingMessage, expire, nil))
if loaded {
defer b.cc.ReleaseMessage(originalSendingMessage)
return fmt.Errorf("cannot add message (%v) to sending message cache: message(%v) with token(%v) already exist", originalSendingMessage, el.Data(), sendingMessage.Token())
}
return nil
}

func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message {
data, ok := b.sendingMessagesCache.LoadWithFunc(token.Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
func (b *BlockWise[C]) getSentRequest(hash uint64) *pool.Message {
data, ok := b.sendingMessagesCache.LoadWithFunc(hash, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
if value == nil {
return nil
}
Expand All @@ -576,7 +584,7 @@ func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message {
if ok {
return data.Data()
}
globalRequest, ok := b.getSentRequestFromOutside(token)
globalRequest, ok := b.getSentRequestFromOutside(hash)
if ok {
return globalRequest
}
Expand All @@ -595,7 +603,8 @@ func (b *BlockWise[C]) handleObserveResponse(sentRequest *pool.Message) (message
validUntil := time.Now().Add(b.expiration) // context of observation can be expired.
bwSentRequest := b.cloneMessage(sentRequest)
bwSentRequest.SetToken(token)
_, loaded := b.sendingMessagesCache.LoadOrStore(token.Hash(), cache.NewElement(bwSentRequest, validUntil, nil))
matchableHash := generateMatchableHash(sentRequest.Options(), b.cc.RemoteAddr(), sentRequest.Code())
_, loaded := b.sendingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(bwSentRequest, validUntil, nil))
if loaded {
return nil, time.Time{}, errors.New("cannot process message: message with token already exist")
}
Expand Down Expand Up @@ -674,7 +683,7 @@ func copyToPayloadFromOffset(r *pool.Message, payloadFile *memfile.File, offset
return payloadSize, nil
}

func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Message, tokenStr uint64, validUntil time.Time) (*pool.Message, func(), error) {
func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Message, matchableHash uint64, validUntil time.Time) (*pool.Message, func(), error) {
cannotLockError := func(err error) error {
return fmt.Errorf("processReceivedMessage: cannot lock message: %w", err)
}
Expand Down Expand Up @@ -708,11 +717,11 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag
return nil, nil, cannotLockError(errA)
}
appendToClose(mg)
element, loaded := b.receivingMessagesCache.LoadOrStore(tokenStr, cache.NewElement(mg, validUntil, func(d *messageGuard) {
element, loaded := b.receivingMessagesCache.LoadOrStore(matchableHash, cache.NewElement(mg, validUntil, func(d *messageGuard) {
if d == nil {
return
}
b.sendingMessagesCache.Delete(tokenStr)
b.sendingMessagesCache.Delete(matchableHash)
}))
// request was already stored in cache, silently
if loaded {
Expand All @@ -732,6 +741,38 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag
return mg.Message, closeFn, nil
}

/*
RFC9175 1.1:
Two request messages are said to be "matchable" if they occur between
the same endpoint pair, have the same code, and have the same set of
options, with the exception that elective NoCacheKey options and
options involved in block-wise transfer (Block1, Block2, and Request-
Tag) need not be the same. Two blockwise request operations are said
to be matchable if their request messages are matchable.

This function concatenates the IDs and values of relevant options, the string representation of the remote address,
and the code of the message to generate a hash that can be used to match requests.
*/
func generateMatchableHash(options message.Options, remoteAddr net.Addr, code codes.Code) uint64 {
input := make([]byte, 0, 512)

for _, opt := range options {
switch opt.ID {
// Skip Blockwise Options and NoCacheKey Options
case message.Block1, message.Block2, message.Size1, message.Size2, message.RequestTag:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this hash is used for, but you may want to exclude RequestTag here.

Yes, the text says it is not part of the matchable set, but later on it says (roughly) that even if two requests are matchable, if they have different request tags, they should not be matched.

When I wrote RFC9175, we had the choice of wording while not changing the actual mechanism: We went for easier for those implementing the client side (where Request-Tag is excluded for the definition of being matchable, and then if two requests are matchable even if you don't want them to be, you add a Request-Tag and there is an extra rule on it for the server), or easier for those implementing the server side (when Request-Tag would not have been special, but in all the client sections we would have said "if the request are matchable except for possibly differing Request-Tag values"). We went for the former -- I'm not sure it was the right decision, especially if generateMatchableHash is used on the server side. (I can't tell easily without having more context of go-coap, was just pointed to this issue by a colleague).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the NoCacheKey options could more generally be recognzied from their option properties (lower bits of the number) rather than enumerating them (which will not be exhaustive).

continue
}
input = append(input, byte(opt.ID))
input = append(input, opt.Value...)
}

input = append(input, []byte(remoteAddr.Network())...)
input = append(input, []byte(remoteAddr.String())...)
input = append(input, byte(code))

return crc64.Checksum(input, crc64.MakeTable(crc64.ISO))
}

//nolint:gocyclo,gocognit
func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), blockType message.OptionID, sizeType message.OptionID) error {
token := r.Token()
Expand All @@ -755,7 +796,8 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
if err != nil {
return fmt.Errorf("cannot decode block option: %w", err)
}
sentRequest := b.getSentRequest(token)
matchableHash := generateMatchableHash(r.Options(), w.Conn().RemoteAddr(), r.Code())
sentRequest := b.getSentRequest(matchableHash)
if sentRequest != nil {
defer b.cc.ReleaseMessage(sentRequest)
}
Expand All @@ -770,9 +812,8 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
}
}

tokenStr := token.Hash()
var cachedReceivedMessageGuard *messageGuard
if e := b.receivingMessagesCache.Load(tokenStr); e != nil {
if e := b.receivingMessagesCache.Load(matchableHash); e != nil {
cachedReceivedMessageGuard = e.Data()
}
if cachedReceivedMessageGuard == nil {
Expand All @@ -783,15 +824,15 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
return nil
}
}
cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, tokenStr, validUntil)
cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, matchableHash, validUntil)
if err != nil {
return err
}
defer closeCachedReceivedMessage()

defer func(err *error) {
if *err != nil {
b.receivingMessagesCache.Delete(tokenStr)
b.receivingMessagesCache.Delete(matchableHash)
}
}(&err)
payloadFile, payloadSize, err := b.getPayloadFromCachedReceivedMessage(r, cachedReceivedMessage)
Expand All @@ -805,12 +846,12 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C
return fmt.Errorf("cannot copy data to payload: %w", err)
}
if !more {
b.receivingMessagesCache.Delete(tokenStr)
b.receivingMessagesCache.Delete(matchableHash)
cachedReceivedMessage.Remove(blockType)
cachedReceivedMessage.Remove(sizeType)
cachedReceivedMessage.SetType(r.Type())
if !bytes.Equal(cachedReceivedMessage.Token(), token) {
b.sendingMessagesCache.Delete(tokenStr)
b.sendingMessagesCache.Delete(matchableHash)
}
_, errS := cachedReceivedMessage.Body().Seek(0, io.SeekStart)
if errS != nil {
Expand Down
5 changes: 5 additions & 0 deletions net/blockwise/blockwise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io"
"net"
"testing"
"time"

Expand Down Expand Up @@ -54,6 +55,10 @@ type testClient struct {
p *pool.Pool
}

func (c *testClient) RemoteAddr() net.Addr {
return &net.IPAddr{IP: net.IPv4(127, 0, 0, 1)}
}

func newTestClient() *testClient {
return &testClient{
p: pool.New(100, 1024),
Expand Down
4 changes: 2 additions & 2 deletions net/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func (c *Client[C]) Observe(ctx context.Context, path string, observeFunc func(r
return c.DoObserve(req, observeFunc)
}

func (c *Client[C]) GetObservationRequest(token message.Token) (*pool.Message, bool) {
return c.observationHandler.GetObservationRequest(token)
func (c *Client[C]) GetObservationRequest(hash uint64) (*pool.Message, bool) {
return c.observationHandler.GetObservationRequest(hash)
}

// NewPostRequest creates post request.
Expand Down
Loading