forked from google/gopacket
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lcmdefrag.go
144 lines (120 loc) · 3.64 KB
/
lcmdefrag.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright 2018 Google, Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style license
// that can be found in the LICENSE file in the root of the source
// tree.
// Package lcmdefrag contains a defragmenter for LCM messages.
package lcmdefrag
import (
"fmt"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
)
const (
// Packages are cleaned up/removed after no input was received for this
// amount of seconds.
timeout time.Duration = 3 * time.Second
)
type lcmPacket struct {
lastPacket time.Time
done bool
recFrags uint16
totalFrags uint16
frags map[uint16]*layers.LCM
}
// LCMDefragmenter supports defragmentation of LCM messages.
//
// References
// https://lcm-proj.github.io/
// https://github.com/lcm-proj/lcm
type LCMDefragmenter struct {
packets map[uint32]*lcmPacket
}
func newLCMPacket(totalFrags uint16) *lcmPacket {
return &lcmPacket{
done: false,
recFrags: 0,
totalFrags: totalFrags,
frags: make(map[uint16]*layers.LCM),
}
}
// NewLCMDefragmenter returns a new LCMDefragmenter.
func NewLCMDefragmenter() *LCMDefragmenter {
return &LCMDefragmenter{
packets: make(map[uint32]*lcmPacket),
}
}
func (lp *lcmPacket) append(in *layers.LCM) {
lp.frags[in.FragmentNumber] = in
lp.recFrags++
lp.lastPacket = time.Now()
}
func (lp *lcmPacket) assemble() (out *layers.LCM, err error) {
var blob []byte
//Extract packets
for i := uint16(0); i < lp.totalFrags; i++ {
fragment, ok := lp.frags[i]
if !ok {
err = fmt.Errorf("Tried to defragment incomplete packet. Waiting "+
"for more potential (unordered) packets... %d", i)
return
}
// For the very first packet, we also want the header.
if i == 0 {
blob = append(blob, fragment.LayerContents()...)
}
// Append the data for each packet.
blob = append(blob, fragment.Payload()...)
}
packet := gopacket.NewPacket(blob, layers.LayerTypeLCM, gopacket.NoCopy)
lcmHdrLayer := packet.Layer(layers.LayerTypeLCM)
out, ok := lcmHdrLayer.(*layers.LCM)
if !ok {
err = fmt.Errorf("Error while decoding the defragmented packet. " +
"Erasing/dropping packet.")
}
lp.done = true
return
}
func (ld *LCMDefragmenter) cleanUp() {
for key, packet := range ld.packets {
if packet.done || time.Now().Sub(packet.lastPacket) > timeout {
delete(ld.packets, key)
}
}
}
// Defrag takes a reference to an LCM packet and processes it.
// In case the packet does not need to be defragmented, it immediately returns
// the as in passed reference. In case in was the last missing fragment, out
// will be the defragmented packet. If in was a fragment, but we are awaiting
// more, out will be set to nil.
// In the case that in was nil, we will just run the internal cleanup of the
// defragmenter that times out packages.
// If an error was encountered during defragmentation, out will also be nil,
// while err will contain further information on the failure.
func (ld *LCMDefragmenter) Defrag(in *layers.LCM) (out *layers.LCM, err error) {
// Timeout old packages and erase error prone ones.
ld.cleanUp()
// For running cleanup only
if in == nil {
return
}
// Quick check if this is acutally a single packet. In that case, just
// return it quickly.
if !in.Fragmented {
out = in
return
}
// Do we need to start a new fragments obj?
if _, ok := ld.packets[in.SequenceNumber]; !ok {
ld.packets[in.SequenceNumber] = newLCMPacket(in.TotalFragments)
}
// Append the packet
ld.packets[in.SequenceNumber].append(in)
// Check if this is the last package of that series
if ld.packets[in.SequenceNumber].recFrags == in.TotalFragments {
out, err = ld.packets[in.SequenceNumber].assemble()
}
return
}