-
Notifications
You must be signed in to change notification settings - Fork 27
/
multi_decoder.go
114 lines (97 loc) · 1.91 KB
/
multi_decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package encoding
import (
"container/heap"
"time"
)
type MultiDecoder interface {
ReadableDecoder
Reset(decs []Decoder)
}
type decState struct {
dec Decoder
}
type multiDecoder struct {
decs []decState
currEntry heapEntry
heap minHeap
err error
}
func NewMultiDecoder() *multiDecoder {
return &multiDecoder{}
}
func (m *multiDecoder) Next() bool {
if m.err != nil {
return false
}
if m.heap.Len() == 0 {
return false
}
m.currEntry = heap.Pop(&m.heap).(heapEntry)
dec := m.decs[m.currEntry.decIdx].dec
if dec.Next() {
t, v := dec.Current()
heap.Push(&m.heap, heapEntry{t: t, v: v, decIdx: m.currEntry.decIdx})
} else {
if dec.Err() != nil {
m.err = dec.Err()
}
}
return true
}
func (m *multiDecoder) Current() (time.Time, float64) {
return m.currEntry.t, m.currEntry.v
}
func (m *multiDecoder) Err() error {
return nil
}
func (m *multiDecoder) Reset(decs []Decoder) {
m.err = nil
for i := range m.decs {
m.decs[i] = decState{}
}
m.decs = m.decs[:0]
for _, dec := range decs {
m.decs = append(m.decs, decState{dec: dec})
}
m.heap.vals = m.heap.vals[:0]
for i, dec := range m.decs {
if dec.dec.Next() {
t, v := dec.dec.Current()
m.heap.vals = append(m.heap.vals, heapEntry{t: t, v: v, decIdx: i})
} else {
if dec.dec.Err() != nil {
m.err = dec.dec.Err()
}
}
}
heap.Init(&m.heap)
}
type minHeap struct {
vals []heapEntry
}
type heapEntry struct {
t time.Time
v float64
decIdx int
}
func (h *minHeap) Push(x interface{}) {
h.vals = append(h.vals, x.(heapEntry))
}
func (h *minHeap) Pop() interface{} {
lastIdx := len(h.vals) - 1
x := h.vals[lastIdx]
h.vals = h.vals[:lastIdx]
return x
}
func (h *minHeap) Len() int {
if h == nil {
return 0
}
return len(h.vals)
}
func (h *minHeap) Less(i, j int) bool {
return h.vals[i].t.Before(h.vals[j].t)
}
func (h *minHeap) Swap(i, j int) {
h.vals[i], h.vals[j] = h.vals[j], h.vals[i]
}