Skip to content

Commit

Permalink
Adding FlexFec encoder implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pougetat committed Oct 24, 2023
1 parent e4eb151 commit 0440a7d
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 0 deletions.
1 change: 1 addition & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Sean DuBois <sean@siobud.com>
Steffen Vogel <post@steffenvogel.de>
treyhakanson <treyhakanson@gmail.com>
XLPolar <guangjin_pan@163.com>
ypothoma <thomas.pougetabadie@gmail.com>
ziminghua <565209960@qq.com>

# List of contributors not appearing in Git history
Expand Down
111 changes: 111 additions & 0 deletions pkg/flexfec/flexfec_coverage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package flexfec

import (
"github.com/pion/interceptor/pkg/flexfec/util"
"github.com/pion/rtp"
)

// Maximum number of media packets that can be protected by a single FEC packet.
// We are not supporting the possibility of having an FEC packet protect multiple
// SSRC source packets for now.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
const (
MaxMediaPackets uint32 = 110
MaxFecPackets uint32 = MaxMediaPackets
)

// ProtectionCoverage defines the map of RTP packets that individual Fec packets protect.
type ProtectionCoverage struct {
// Array of masks, each mask capable of covering up to maxMediaPkts = 110.
// A mask is represented as a grouping of bytes where each individual bit
// represents the coverage for the media packet at the corresponding index.
packetMasks [MaxFecPackets]util.BitArray
numFecPackets uint32
numMediaPackets uint32
mediaPackets []rtp.Packet
}

// NewCoverage returns a new ProtectionCoverage object. numFecPackets represents the number of
// Fec packets that we will be generating to cover the list of mediaPackets. This allows us to know
// how big the underlying map should be.
func NewCoverage(mediaPackets []rtp.Packet, numFecPackets uint32) *ProtectionCoverage {
numMediaPackets := uint32(len(mediaPackets))

// Basic sanity checks
if numMediaPackets <= 0 || numMediaPackets > MaxMediaPackets {
return nil
}

// We allocate the biggest array of bitmasks that respects the max constraints.
var packetMasks [MaxFecPackets]util.BitArray
for i := 0; i < int(MaxFecPackets); i++ {
packetMasks[i] = util.NewBitArray(MaxMediaPackets)
}

// Generate FEC bit mask where numFecPackets FEC packets are covering numMediaPackets Media packets.
// In the packetMasks array, each FEC packet is represented by a single BitArray, each bit in a given BitArray
// corresponds to a specific Media packet.
// Ex: Row I, Col J is set to 1 -> FEC packet I will protect media packet J.
for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ {
// We use an interleaved method to determine coverage. Given N FEC packets, Media packet X will be
// covered by FEC packet X % N.
for mediaPacketIndex := uint32(0); mediaPacketIndex < numMediaPackets; mediaPacketIndex++ {
coveringFecPktIndex := mediaPacketIndex % numFecPackets
packetMasks[coveringFecPktIndex].SetBit(mediaPacketIndex, 1)
}
}

return &ProtectionCoverage{
packetMasks: packetMasks,
numFecPackets: numFecPackets,
numMediaPackets: numMediaPackets,
mediaPackets: mediaPackets,
}
}

// ResetCoverage clears the underlying map so that we can reuse it for new batches of RTP packets.
func (p *ProtectionCoverage) ResetCoverage() {
for i := uint32(0); i < MaxFecPackets; i++ {
for j := uint32(0); j < MaxMediaPackets; j++ {
p.packetMasks[i].SetBit(j, 0)
}
}
}

// GetCoveredBy returns an iterator over RTP packets that are protected by the specified Fec packet index.
func (p *ProtectionCoverage) GetCoveredBy(fecPacketIndex uint32) *util.MediaPacketIterator {
coverage := make([]uint32, 0, p.numMediaPackets)
for mediaPacketIndex := uint32(0); mediaPacketIndex < p.numMediaPackets; mediaPacketIndex++ {
if p.packetMasks[fecPacketIndex].GetBit(mediaPacketIndex) == 1 {
coverage = append(coverage, mediaPacketIndex)
}
}
return util.NewMediaPacketIterator(p.mediaPackets, coverage)
}

// MarshalBitmasks returns the underlying bitmask that defines which media packets are protected by the
// specified fecPacketIndex.
func (p *ProtectionCoverage) MarshalBitmasks(fecPacketIndex uint32) []byte {
return p.packetMasks[fecPacketIndex].Marshal()
}

// ExtractMask1 returns the first section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask1(fecPacketIndex uint32) uint16 {
return uint16(p.packetMasks[fecPacketIndex].GetBitValue(0, 14))
}

// ExtractMask2 returns the second section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask2(fecPacketIndex uint32) uint32 {
return uint32(p.packetMasks[fecPacketIndex].GetBitValue(15, 45))
}

// ExtractMask3 returns the third section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask3(fecPacketIndex uint32) uint64 {
return p.packetMasks[fecPacketIndex].GetBitValue(46, 109)
}
184 changes: 184 additions & 0 deletions pkg/flexfec/flexfec_encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package flexfec implements FlexFEC to recover missing RTP packets due to packet loss.
// https://datatracker.ietf.org/doc/html/rfc8627
package flexfec

import (
"encoding/binary"

"github.com/pion/interceptor/pkg/flexfec/util"
"github.com/pion/rtp"
)

const (
// BaseRTPHeaderSize represents the minium RTP packet header size in bytes.
BaseRTPHeaderSize = 12
// BaseFecHeaderSize represents the minium FEC payload's header size including the
// required first mask.
BaseFecHeaderSize = 12
)

// FlexEncoder implements the Fec encoding mechanism for the "Flex" variant of FlexFec.
type FlexEncoder struct {
baseSN uint16
payloadType uint8
ssrc uint32
coverage *ProtectionCoverage
}

// NewFlexEncoder returns a new FlexFecEncer.
func NewFlexEncoder(baseSN uint16, payloadType uint8, ssrc uint32) *FlexEncoder {
return &FlexEncoder{
baseSN: baseSN,
payloadType: payloadType,
ssrc: ssrc,
coverage: nil,
}
}

// EncodeFec returns a list of generated RTP packets with FEC payloads that protect the specified mediaPackets.
// This method does not account for missing RTP packets in the mediaPackets array nor does it account for
// them being passed out of order.
func (flex *FlexEncoder) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet {
// Start by defining which FEC packets cover which media packets
if flex.coverage == nil {
flex.coverage = NewCoverage(mediaPackets, numFecPackets)
} else {
flex.coverage.ResetCoverage()
}

if flex.coverage == nil {
return nil
}

// Generate FEC payloads
fecPackets := make([]rtp.Packet, numFecPackets)
for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ {
fecPackets[fecPacketIndex] = flex.encodeFlexFecPacket(fecPacketIndex)
}

return fecPackets
}

func (flex *FlexEncoder) encodeFlexFecPacket(fecPacketIndex uint32) rtp.Packet {
mediaPacketsIt := flex.coverage.GetCoveredBy(fecPacketIndex)
flexFecHeader := flex.encodeFlexFecHeader(
mediaPacketsIt,
flex.coverage.ExtractMask1(fecPacketIndex),
flex.coverage.ExtractMask2(fecPacketIndex),
flex.coverage.ExtractMask3(fecPacketIndex),
)
flexFecRepairPayload := flex.encodeFlexFecRepairPayload(mediaPacketsIt.Reset())

return rtp.Packet{
Header: rtp.Header{
Version: 2,
Padding: false,
Extension: false,
Marker: false,
PayloadType: flex.payloadType,
SequenceNumber: flex.baseSN,
Timestamp: 54243243,
SSRC: flex.ssrc,
CSRC: []uint32{},
},
Payload: append(flexFecHeader, flexFecRepairPayload...),
}
}

func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64) []byte {
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|0|0|P|X| CC |M| PT recovery | length recovery |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TS recovery |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SN base_i |k| Mask [0-14] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|k| Mask [15-45] (optional) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Mask [46-109] (optional) |
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ... next SN base and Mask for CSRC_i in CSRC list ... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
: Repair "Payload" follows FEC Header :
: :
*/

// Get header size - This depends on the size of the bitmask.
headerSize := BaseFecHeaderSize
if optionalMask2 > 0 {
headerSize += 4
}
if optionalMask3 > 0 {
headerSize += 8
}

// Allocate a the FlexFec header
flexFecHeader := make([]byte, headerSize)

// XOR the relevant fields for the header
// TO DO - CHECK TO SEE IF THE MARSHALTO() call works with this.
tmpMediaPacketBuf := make([]byte, headerSize)
for mediaPackets.HasNext() {
mediaPacket := mediaPackets.Next()
n, err := mediaPacket.MarshalTo(tmpMediaPacketBuf)

if n == 0 || err != nil {
return nil
}

// XOR the first 2 bytes of the header: V, P, X, CC, M, PT fields
flexFecHeader[0] ^= tmpMediaPacketBuf[0]
flexFecHeader[1] ^= tmpMediaPacketBuf[1]

// XOR the length recovery field
lengthRecoveryVal := uint16(mediaPacket.MarshalSize() - BaseRTPHeaderSize)
flexFecHeader[2] ^= uint8(lengthRecoveryVal >> 8)
flexFecHeader[3] ^= uint8(lengthRecoveryVal)

// XOR the 5th to 8th bytes of the header: the timestamp field
flexFecHeader[4] ^= flexFecHeader[4]
flexFecHeader[5] ^= flexFecHeader[5]
flexFecHeader[6] ^= flexFecHeader[6]
flexFecHeader[7] ^= flexFecHeader[7]
}

// Write the bitmasks to the header
binary.BigEndian.PutUint16(flexFecHeader[10:12], mask1)

if optionalMask2 > 0 {
binary.BigEndian.PutUint32(flexFecHeader[12:16], optionalMask2)
flexFecHeader[10] |= 0b10000000
}
if optionalMask3 > 0 {
binary.BigEndian.PutUint64(flexFecHeader[16:24], optionalMask3)
flexFecHeader[12] |= 0b10000000
}
return flexFecHeader
}

func (flex *FlexEncoder) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte {
flexFecPayload := make([]byte, len(mediaPackets.First().Payload))

for mediaPackets.HasNext() {
mediaPacketPayload := mediaPackets.Next().Payload

if len(flexFecPayload) < len(mediaPacketPayload) {
// Expected FEC packet payload is bigger that what we can currently store,
// we need to resize.
flexFecPayloadTmp := make([]byte, len(mediaPacketPayload))
copy(flexFecPayloadTmp, flexFecPayload)
flexFecPayload = flexFecPayloadTmp
}
for byteIndex := 0; byteIndex < len(mediaPacketPayload); byteIndex++ {
flexFecPayload[byteIndex] ^= mediaPacketPayload[byteIndex]
}
}
return flexFecPayload
}
78 changes: 78 additions & 0 deletions pkg/flexfec/util/bitarray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package util implements utilities to better support Fec decoding / encoding.
package util

// BitArray provides support for bitmask manipulations.
type BitArray struct {
bytes []byte
}

// NewBitArray returns a new BitArray. It takes sizeBits as parameter which represents
// the size of the underlying bitmask.
func NewBitArray(sizeBits uint32) BitArray {
var sizeBytes uint32
if sizeBits%8 == 0 {
sizeBytes = sizeBits / 8
} else {
sizeBytes = sizeBits/8 + 1
}

return BitArray{
bytes: make([]byte, sizeBytes),
}
}

// SetBit sets a bit to the specified bit value on the bitmask.
func (b *BitArray) SetBit(bitIndex uint32, bitValue uint32) {
byteIndex := bitIndex / 8
bitOffset := uint(bitIndex % 8)

// Set the specific bit to 1 using bitwise OR
if bitValue == 1 {
b.bytes[byteIndex] |= 1 << bitOffset
} else {
b.bytes[byteIndex] |= 0 << bitOffset
}
}

// GetBit returns the bit value at a specified index of the bitmask.
func (b *BitArray) GetBit(bitIndex uint32) uint8 {
return b.bytes[bitIndex/8]
}

// Marshal returns the underlying bitmask.
func (b *BitArray) Marshal() []byte {
return b.bytes
}

// GetBitValue returns a subsection of the bitmask.
func (b *BitArray) GetBitValue(i int, j int) uint64 {
if i < 0 || i >= len(b.bytes)*8 || j < 0 || j >= len(b.bytes)*8 || i > j {
return 0
}

startByte := i / 8
startBit := i % 8
endByte := j / 8

// Create a slice containing the bytes to extract
subArray := b.bytes[startByte : endByte+1]

// Initialize the result value
var result uint64

// Loop through the bytes and concatenate the bits
for idx, b := range subArray {
if idx == 0 {
b <<= uint(startBit)
}
result |= uint64(b)
}

// Mask the bits that are not part of the desired range
result &= (1<<uint(j-i+1) - 1)

return result
}
Loading

0 comments on commit 0440a7d

Please sign in to comment.