/
aggregate_func_local.go
127 lines (106 loc) · 2.77 KB
/
aggregate_func_local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package executor
import (
"fmt"
"io"
"os"
"runtime/pprof"
"time"
"github.com/vmihailenco/msgpack"
"github.com/xitongsys/guery/eplan"
"github.com/xitongsys/guery/logger"
"github.com/xitongsys/guery/metadata"
"github.com/xitongsys/guery/pb"
"github.com/xitongsys/guery/row"
"github.com/xitongsys/guery/util"
)
func (self *Executor) SetInstructionAggregateFuncLocal(instruction *pb.Instruction) (err error) {
var enode eplan.EPlanAggregateFuncLocalNode
if err = msgpack.Unmarshal(instruction.EncodedEPlanNodeBytes, &enode); err != nil {
return err
}
self.Instruction = instruction
self.EPlanNode = &enode
self.InputLocations = []*pb.Location{&enode.Input}
self.OutputLocations = []*pb.Location{&enode.Output}
return nil
}
func (self *Executor) RunAggregateFuncLocal() (err error) {
fname := fmt.Sprintf("executor_%v_aggregatefunclocal_%v_cpu.pprof", self.Name, time.Now().Format("20060102150405"))
f, _ := os.Create(fname)
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
defer func() {
if err != nil {
self.AddLogInfo(err, pb.LogLevel_ERR)
}
self.Clear()
}()
reader, writer := self.Readers[0], self.Writers[0]
enode := self.EPlanNode.(*eplan.EPlanAggregateFuncLocalNode)
md := &metadata.Metadata{}
//read md
if err = util.ReadObject(reader, md); err != nil {
return err
}
//write md
if err = util.WriteObject(writer, enode.Metadata); err != nil {
return err
}
rbReader, rbWriter := row.NewRowsBuffer(md, reader, nil), row.NewRowsBuffer(enode.Metadata, nil, writer)
defer func() {
rbWriter.Flush()
}()
//init
if err := enode.Init(enode.Metadata); err != nil {
return err
}
//write rows
var rg *row.RowsGroup
res := make([]map[string]interface{}, len(enode.FuncNodes))
for i := 0; i < len(enode.FuncNodes); i++ {
res[i] = map[string]interface{}{}
}
keys := map[string]*row.Row{}
for {
rg, err = rbReader.Read()
if err == io.EOF {
err = nil
for key, row := range keys {
for i := 0; i < len(res); i++ {
row.AppendVals(res[i][key])
}
rbWriter.WriteRow(row)
}
break
}
if err != nil {
break
}
for i := 0; i < rg.GetRowsNumber(); i++ {
key := rg.GetKeyString(i)
if _, ok := keys[key]; !ok {
keys[key] = rg.GetRow(i)
}
}
if err = self.CalAggregateFuncLocal(enode, rg, &res); err != nil {
break
}
}
logger.Infof("RunAggregateFuncLocal finished")
return err
}
func (self *Executor) CalAggregateFuncLocal(enode *eplan.EPlanAggregateFuncLocalNode, rg *row.RowsGroup, res *[]map[string]interface{}) error {
var err error
var resc map[string]interface{}
var resci interface{}
for i, item := range enode.FuncNodes {
if resci, err = item.Result(rg); err != nil {
break
}
resc = resci.(map[string]interface{})
for k, v := range resc {
(*res)[i][k] = v
}
}
return err
}