Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/dennwc/iters v1.1.0
github.com/frostbyte73/core v0.1.1
github.com/fsnotify/fsnotify v1.9.0
github.com/gammazero/deque v1.0.0
github.com/gammazero/deque v1.1.0
github.com/go-jose/go-jose/v3 v3.0.4
github.com/go-logr/logr v1.4.3
github.com/hashicorp/go-retryablehttp v0.7.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ github.com/frostbyte73/core v0.1.1 h1:ChhJOR7bAKOCPbA+lqDLE2cGKlCG5JXsDvvQr4YaJI
github.com/frostbyte73/core v0.1.1/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/gammazero/deque v1.1.0 h1:OyiyReBbnEG2PP0Bnv1AASLIYvyKqIFN5xfl1t8oGLo=
github.com/gammazero/deque v1.1.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg=
github.com/go-jose/go-jose/v3 v3.0.4 h1:Wp5HA7bLQcKnf6YYao/4kpRpVMp/yf6+pJKV8WFSaNY=
github.com/go-jose/go-jose/v3 v3.0.4/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
Expand Down
142 changes: 142 additions & 0 deletions signalling/signalfragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package signalling

import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/livekit/protocol/livekit"
)

func TestSignalFragment(t *testing.T) {
inputMessage := &livekit.Envelope{
ServerMessages: []*livekit.Signalv2ServerMessage{
{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: &livekit.ConnectResponse{
SifTrailer: []byte("abcdefghijklmnopqrstuvwxyz0123456789"),
},
},
},
{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: &livekit.ConnectResponse{
SifTrailer: []byte("0123456789abcdefghijklmnopqrstuvwxyz0123456789"),
},
},
},
{
Message: &livekit.Signalv2ServerMessage_ConnectResponse{
ConnectResponse: &livekit.ConnectResponse{
SifTrailer: []byte("ABCDEFGHIJKLMNOPQRSTabcdefghijklmnopqrstuvwxyz0123456789"),
},
},
},
},
}

t.Run("no segmentation needed", func(t *testing.T) {
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: 5_000_000,
})

marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)
require.Nil(t, sr.Segment(marshalled))
})

t.Run("segmentation + reassembly", func(t *testing.T) {
maxFragmentSize := 5
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: maxFragmentSize,
})

marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)

expectedNumFragments := (len(marshalled) + maxFragmentSize - 1) / maxFragmentSize

fragments := sr.Segment(marshalled)
require.NotZero(t, len(fragments))
require.Equal(t, uint32(len(marshalled)), fragments[0].TotalSize)

rr := NewSignalReassembler(SignalReassemblerParams{})
var reassembled []byte
for idx, fragment := range fragments {
require.Equal(t, uint32(idx+1), fragment.FragmentNumber)
require.NotZero(t, fragment.FragmentSize)
require.Equal(t, uint32(expectedNumFragments), fragment.NumFragments)
require.Equal(t, fragment.FragmentSize, uint32(len(fragment.Data)))

reassembled = rr.Reassemble(fragment)
}
require.Equal(t, marshalled, reassembled)
})

t.Run("runt", func(t *testing.T) {
maxFragmentSize := 5
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: maxFragmentSize,
})

marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)

fragments := sr.Segment(marshalled)

rr := NewSignalReassembler(SignalReassemblerParams{})
var reassembled []byte
for idx, fragment := range fragments {
// do not send one packet into re-assembly initially, re-assembly should not succeed
if idx == 0 {
continue
}

reassembled = rr.Reassemble(fragment)
}
require.Zero(t, len(reassembled))

// submit 1st fragment and ensure reassembly completes
reassembled = rr.Reassemble(fragments[0])
require.Equal(t, marshalled, reassembled)
})

t.Run("corrupted", func(t *testing.T) {
maxFragmentSize := 5
sr := NewSignalSegmenter(SignalSegmenterParams{
MaxFragmentSize: maxFragmentSize,
})

marshalled, err := proto.Marshal(inputMessage)
require.NoError(t, err)

fragments := sr.Segment(marshalled)

rr := NewSignalReassembler(SignalReassemblerParams{})
var reassembled []byte
for idx, fragment := range fragments {
// corrupt a fragment, re-assembly should fail
if idx == 0 {
fragment.FragmentSize += 1
}

reassembled = rr.Reassemble(fragment)
}
require.Zero(t, len(reassembled))
})
}
154 changes: 154 additions & 0 deletions signalling/signalreassembler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package signalling

import (
"sync"
"time"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"go.uber.org/zap/zapcore"
)

const (
reassemblerTimeout = time.Minute
)

type reassembly struct {
packetId uint32
startedAt time.Time
fragments []*livekit.Fragment
isCorrupted bool
tqi *utils.TimeoutQueueItem[*reassembly]
}

func (r *reassembly) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}

e.AddUint32("packetId", r.packetId)
e.AddTime("startAt", r.startedAt)
e.AddDuration("age", time.Since(r.startedAt))

expectedNumberOfFragments := len(r.fragments)
expectedTotalSize := uint32(0)
availableSize := uint32(0)
var availableFragments []uint32
for _, fragment := range r.fragments {
if fragment == nil {
continue
}

expectedTotalSize = fragment.TotalSize
availableSize += fragment.FragmentSize
availableFragments = append(availableFragments, fragment.FragmentNumber)
}
e.AddInt("expectedNumberOfFragments", expectedNumberOfFragments)
e.AddUint32("expectedTotalSize", expectedTotalSize)
e.AddUint32("availableSize", availableSize)
e.AddArray("availableFragments", logger.Uint32Slice(availableFragments))

e.AddBool("isCorrupted", r.isCorrupted)
return nil
}

// ------------------------------------------------

type SignalReassemblerParams struct {
Logger logger.Logger
}

type SignalReassembler struct {
params SignalReassemblerParams

lock sync.Mutex
reassemblies map[uint32]*reassembly

timeoutQueue utils.TimeoutQueue[*reassembly]
}

func NewSignalReassembler(params SignalReassemblerParams) *SignalReassembler {
return &SignalReassembler{
params: params,
reassemblies: make(map[uint32]*reassembly),
}
}

func (s *SignalReassembler) Reassemble(fragment *livekit.Fragment) []byte {
s.lock.Lock()
defer s.lock.Unlock()

re, ok := s.reassemblies[fragment.PacketId]
if !ok {
re = &reassembly{
packetId: fragment.PacketId,
startedAt: time.Now(),
fragments: make([]*livekit.Fragment, fragment.NumFragments),
}
re.tqi = &utils.TimeoutQueueItem[*reassembly]{Value: re}

s.reassemblies[fragment.PacketId] = re
}
if int(fragment.FragmentNumber) <= len(re.fragments) {
if int(fragment.FragmentSize) != len(fragment.Data) {
re.isCorrupted = true // runt packet, data size of blob does not match fragment size
} else {
re.fragments[fragment.FragmentNumber-1] = fragment
}
} else {
re.isCorrupted = true
}

if re.isCorrupted {
return nil
}

// try to reassemble
expectedTotalSize := uint32(0)
totalSize := 0
for _, fr := range re.fragments {
if fr == nil {
return nil // not received all fragments of packet yet
}

expectedTotalSize = fr.TotalSize // can read this from any fragment of packet
totalSize += len(fr.Data)
}
if expectedTotalSize != 0 && uint32(totalSize) != expectedTotalSize {
re.isCorrupted = true
return nil
}

data := make([]byte, 0, expectedTotalSize)
for _, fr := range re.fragments {
data = append(data, fr.Data...)
}
delete(s.reassemblies, re.packetId) // fully re-assembled, can be deleted from cache
return data
}

func (s *SignalReassembler) Prune() {
for it := s.timeoutQueue.IterateRemoveAfter(reassemblerTimeout); it.Next(); {
re := it.Item().Value
s.params.Logger.Infow("pruning stale reassembly packet", "reassembly", re)

s.lock.Lock()
delete(s.reassemblies, re.packetId)
s.lock.Unlock()
}
}
81 changes: 81 additions & 0 deletions signalling/signalsegmenter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package signalling

import (
"math/rand"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"go.uber.org/atomic"
)

const (
defaultMaxFragmentSize = 8192
)

type SignalSegmenterParams struct {
Logger logger.Logger
MaxFragmentSize int
FirstPacketId uint32 // should be used for testing only
}

type SignalSegmenter struct {
params SignalSegmenterParams

packetId atomic.Uint32
}

func NewSignalSegmenter(params SignalSegmenterParams) *SignalSegmenter {
s := &SignalSegmenter{
params: params,
}
if s.params.MaxFragmentSize == 0 {
s.params.MaxFragmentSize = defaultMaxFragmentSize
}
s.packetId.Store(params.FirstPacketId)
if s.packetId.Load() == 0 {
s.packetId.Store(uint32(rand.Intn(1<<8) + 1))
}
return s
}

func (s *SignalSegmenter) Segment(data []byte) []*livekit.Fragment {
if len(data) <= s.params.MaxFragmentSize {
return nil
}

var fragments []*livekit.Fragment
numFragments := uint32((len(data) + s.params.MaxFragmentSize - 1) / s.params.MaxFragmentSize)
fragmentNumber := uint32(1)
consumed := 0
packetId := s.packetId.Inc()
for len(data[consumed:]) != 0 {
fragmentSize := min(len(data[consumed:]), s.params.MaxFragmentSize)
fragment := &livekit.Fragment{
PacketId: packetId,
FragmentNumber: fragmentNumber,
NumFragments: numFragments,
FragmentSize: uint32(fragmentSize),
TotalSize: uint32(len(data)),
Data: data[consumed : consumed+fragmentSize],
}
fragments = append(fragments, fragment)
fragmentNumber++
consumed += fragmentSize
}

return fragments
}
Loading