Skip to content

Commit

Permalink
optimize 2
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Jan 27, 2023
1 parent fd456be commit 32d2520
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 96 deletions.
104 changes: 9 additions & 95 deletions net/blockwise/blockwise.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,73 +305,6 @@ func fitSZX(r *pool.Message, blockType message.OptionID, maxSZX SZX) SZX {
return maxSZX
}

func (b *BlockWise[C]) handleSendingMessage(w *responsewriter.ResponseWriter[C], sendingMessage *pool.Message, maxSZX SZX, maxMessageSize uint32, token []byte, block uint32) (bool, error) {
blockType := message.Block2
sizeType := message.Size2
switch sendingMessage.Code() {
case codes.POST, codes.PUT:
blockType = message.Block1
sizeType = message.Size1
}

szx, num, more, err := DecodeBlockOption(block)
if err != nil {
return false, fmt.Errorf("cannot decode %v option: %w", blockType, err)
}
off := num * szx.Size()
if szx > maxSZX {
szx = maxSZX
}
sendMessage := b.cc.AcquireMessage(sendingMessage.Context())
sendMessage.SetCode(sendingMessage.Code())
sendMessage.ResetOptionsTo(sendingMessage.Options())
sendMessage.SetToken(token)
sendMessage.SetType(sendingMessage.Type())
payloadSize, err := sendingMessage.BodySize()
if err != nil {
return false, payloadSizeError(err)
}
offSeek, err := sendingMessage.Body().Seek(off, io.SeekStart)
if err != nil {
return false, fmt.Errorf("cannot seek in response: %w", err)
}
if off != offSeek {
return false, fmt.Errorf("cannot seek to requested offset(%v != %v)", off, offSeek)
}
buf := make([]byte, 1024)
newBufLen := bufferSize(szx, maxMessageSize)
if int64(len(buf)) < newBufLen {
buf = make([]byte, newBufLen)
}
buf = buf[:newBufLen]

readed, err := io.ReadFull(sendingMessage.Body(), buf)
if errors.Is(err, io.ErrUnexpectedEOF) {
if offSeek+int64(readed) == payloadSize {
err = nil
}
}
if err != nil {
return false, fmt.Errorf("cannot read response: %w", err)
}

buf = buf[:readed]
sendMessage.SetBody(bytes.NewReader(buf))
more = true
if offSeek+int64(readed) == payloadSize {
more = false
}
sendMessage.SetOptionUint32(sizeType, uint32(payloadSize))
num = (offSeek+int64(readed))/szx.Size() - (int64(readed) / szx.Size())
block, err = EncodeBlockOption(szx, num, more)
if err != nil {
return false, fmt.Errorf("cannot encode block option(%v,%v,%v): %w", szx, num, more, err)
}
sendMessage.SetOptionUint32(blockType, block)
w.SetMessage(sendMessage)
return more, nil
}

func (b *BlockWise[C]) sendEntityIncomplete(w *responsewriter.ResponseWriter[C], token message.Token) {
sendMessage := b.cc.AcquireMessage(w.Message().Context())
sendMessage.SetCode(codes.RequestEntityIncomplete)
Expand Down Expand Up @@ -510,7 +443,7 @@ func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX
off := num * szx.Size()
if blockType == message.Block1 {
// For block1, we need to skip the already sent bytes.
off += newBufLen /*for previous read*/
off += newBufLen
}
offSeek, err := sendingMessage.Body().Seek(off, io.SeekStart)
if err != nil {
Expand Down Expand Up @@ -545,7 +478,7 @@ func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX
more = false
}
sendMessage.SetOptionUint32(sizeType, uint32(payloadSize))
num = (offSeek) / szx.Size() /*- (int64(readed) / szx.Size())*/
num = (offSeek) / szx.Size()
block, err = EncodeBlockOption(szx, num, more)
if err != nil {
b.cc.ReleaseMessage(sendMessage)
Expand All @@ -566,20 +499,6 @@ func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C
if err != nil {
return false, fmt.Errorf("cannot get %v option: %w", blockType, err)
}
/*
if blockType == message.Block1 {
// returned blockNumber just acknowledges position we need to set block to the next block.
szx, blockNumber, more, errB := DecodeBlockOption(block)
if errB != nil {
return false, fmt.Errorf("cannot decode %v(%v) option: %w", blockType, block, errB)
}
blockNumber++
block, errB = EncodeBlockOption(szx, blockNumber, more)
if errB != nil {
return false, fmt.Errorf("cannot encode %v(%v, %v, %v) option: %w", blockType, szx, blockNumber, more, errB)
}
}
*/
var sendMessage *pool.Message
var more bool
b.sendingMessagesCache.LoadWithFunc(r.Token().Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] {
Expand Down Expand Up @@ -619,29 +538,24 @@ func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C],
if payloadSize < maxSZX.Size() {
return nil
}
sendingMessage := b.cc.AcquireMessage(w.Message().Context())
sendingMessage.ResetOptionsTo(w.Message().Options())
sendingMessage.SetBody(w.Message().Body())
sendingMessage.SetCode(w.Message().Code())
sendingMessage.SetToken(w.Message().Token())
sendingMessage.SetType(w.Message().Type())

_, err = b.handleSendingMessage(w, sendingMessage, maxSZX, maxMessageSize, sendingMessage.Token(), block)
sendingMessage, _, err := b.createSendingMessage(w.Message(), maxSZX, maxMessageSize, block)
if err != nil {
return fmt.Errorf("handleSendingMessage: %w", err)
return fmt.Errorf("handleSendingMessage: cannot create sending message: %w", err)
}
originalSendingMessage := w.Swap(sendingMessage)
if isObserveResponse(w.Message()) {
b.cc.ReleaseMessage(originalSendingMessage)
// https://tools.ietf.org/html/rfc7959#section-2.6 - we don't need store it because client will be get values via GET.
return nil
}
expire, ok := sendingMessage.Context().Deadline()
if !ok {
expire = time.Now().Add(b.expiration)
}

el, loaded := b.sendingMessagesCache.LoadOrStore(sendingMessage.Token().Hash(), cache.NewElement(sendingMessage, expire, nil))
el, loaded := b.sendingMessagesCache.LoadOrStore(sendingMessage.Token().Hash(), cache.NewElement(originalSendingMessage, expire, nil))
if loaded {
return fmt.Errorf("cannot add message (%v) to sending message cache: message(%v) with token(%v) already exist", sendingMessage, el.Data(), sendingMessage.Token())
defer b.cc.ReleaseMessage(originalSendingMessage)
return fmt.Errorf("cannot add message (%v) to sending message cache: message(%v) with token(%v) already exist", originalSendingMessage, el.Data(), sendingMessage.Token())
}
return nil
}
Expand Down
1 change: 0 additions & 1 deletion net/blockwise/blockwise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func TestBlockWiseDo(t *testing.T) {
payload: memfile.New(make([]byte, 17)),
},
},

{
name: "SZX16-SZX1024",
args: args{
Expand Down
7 changes: 7 additions & 0 deletions net/responsewriter/responseWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ func (r *ResponseWriter[C]) Message() *pool.Message {
return r.response
}

// Swap message in response without releasing.
func (r *ResponseWriter[C]) Swap(m *pool.Message) *pool.Message {
tmp := r.response
r.response = m
return tmp
}

// CConn peer connection.
func (r *ResponseWriter[C]) Conn() C {
return r.cc
Expand Down

0 comments on commit 32d2520

Please sign in to comment.