-
Notifications
You must be signed in to change notification settings - Fork 110
/
merging.go
154 lines (142 loc) · 4.41 KB
/
merging.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package pointcloud
import (
"context"
"errors"
"fmt"
"image/color"
"sync"
"sync/atomic"
"time"
"github.com/edaniels/golog"
"github.com/golang/geo/r3"
"github.com/lucasb-eyer/go-colorful"
"go.opencensus.io/trace"
"go.viam.com/utils"
"go.viam.com/rdk/spatialmath"
)
// CloudAndOffsetFunc is a function that returns a PointCloud with a pose that represents an offset to be applied to every point.
type CloudAndOffsetFunc func(context context.Context) (PointCloud, spatialmath.Pose, error)
// ApplyOffset takes a point cloud and an offset pose and applies the offset to each of the points in the source point cloud.
func ApplyOffset(ctx context.Context, srcpc PointCloud, pose spatialmath.Pose, logger golog.Logger) (PointCloud, error) {
// create the function that return the pointcloud and the transform to the destination frame
cloudFunc := func(context context.Context) (PointCloud, spatialmath.Pose, error) {
return srcpc, pose, nil
}
srcFunc := []CloudAndOffsetFunc{cloudFunc}
return MergePointClouds(ctx, srcFunc, logger) // MergePointClouds can also be used on one point cloud.
}
// MergePointClouds takes a slice of points clouds with optional offsets and adds all their points to one point cloud.
func MergePointClouds(ctx context.Context, cloudFuncs []CloudAndOffsetFunc, logger golog.Logger) (PointCloud, error) {
if len(cloudFuncs) == 0 {
return nil, errors.New("no point clouds to merge")
}
finalPoints := make(chan []PointAndData, 50)
activeReaders := int32(len(cloudFuncs))
for i, pcSrc := range cloudFuncs {
iCopy := i
pcSrcCopy := pcSrc
utils.PanicCapturingGo(func() {
_, span := trace.StartSpan(ctx, "pointcloud::MergePointClouds::Cloud"+fmt.Sprint(iCopy))
defer span.End()
defer func() {
atomic.AddInt32(&activeReaders, -1)
}()
pc, offset, err := pcSrcCopy(ctx)
if err != nil {
panic(err) // TODO(erh) is there something better to do?
}
var wg sync.WaitGroup
const numLoops = 8
for loop := 0; loop < numLoops; loop++ {
wg.Add(1)
f := func(loop int) {
defer wg.Done()
const batchSize = 500
batch := make([]PointAndData, 0, batchSize)
savedDualQuat := spatialmath.NewZeroPose()
pc.Iterate(numLoops, loop, func(p r3.Vector, d Data) bool {
if offset != nil {
spatialmath.ResetPoseDQTranslation(savedDualQuat, p)
newPose := spatialmath.Compose(offset, savedDualQuat)
p = newPose.Point()
}
batch = append(batch, PointAndData{P: p, D: d})
if len(batch) > batchSize {
finalPoints <- batch
batch = make([]PointAndData, 0, batchSize)
}
return true
})
finalPoints <- batch
}
loopCopy := loop
utils.PanicCapturingGo(func() { f(loopCopy) })
}
wg.Wait()
})
}
var pcTo PointCloud
var err error
dataLastTime := false // if there was data in the channel in the previous loop, continue reading.
for dataLastTime || atomic.LoadInt32(&activeReaders) > 0 {
select {
case ps := <-finalPoints:
for _, p := range ps {
if pcTo == nil {
if p.D == nil {
pcTo = NewAppendOnlyOnlyPointsPointCloud(len(cloudFuncs) * 640 * 800)
} else {
pcTo = NewWithPrealloc(len(cloudFuncs) * 640 * 800)
}
}
myErr := pcTo.Set(p.P, p.D)
if myErr != nil {
err = myErr
}
}
dataLastTime = true
case <-time.After(5 * time.Millisecond):
dataLastTime = false
continue
}
}
// one last read to flush out any potential last data- sometimes there is still data in finalPoints.
lastBatches := len(finalPoints)
for i := 0; i < lastBatches; i++ {
lastPoints := <-finalPoints
if len(lastPoints) == 0 {
continue
}
for _, p := range lastPoints {
myErr := pcTo.Set(p.P, p.D)
if myErr != nil {
err = myErr
}
}
}
if err != nil {
return nil, err
}
return pcTo, nil
}
// MergePointCloudsWithColor creates a union of point clouds from the slice of point clouds, giving
// each element of the slice a unique color.
func MergePointCloudsWithColor(clusters []PointCloud) (PointCloud, error) {
var err error
palette := colorful.FastWarmPalette(len(clusters))
colorSegmentation := New()
for i, cluster := range clusters {
col, ok := color.NRGBAModel.Convert(palette[i]).(color.NRGBA)
if !ok {
panic("impossible")
}
cluster.Iterate(0, 0, func(v r3.Vector, d Data) bool {
err = colorSegmentation.Set(v, NewColoredData(col))
return err == nil
})
if err != nil {
return nil, err
}
}
return colorSegmentation, nil
}