forked from couchbaselabs/tuqtng
-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_index.go
143 lines (124 loc) · 3.89 KB
/
create_index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright (c) 2013 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
package xpipeline
import (
"fmt"
"runtime/debug"
"strings"
"github.com/couchbaselabs/clog"
"github.com/couchbaselabs/dparval"
"github.com/couchbaselabs/tuqtng/ast"
"github.com/couchbaselabs/tuqtng/catalog"
"github.com/couchbaselabs/tuqtng/misc"
"github.com/couchbaselabs/tuqtng/network"
"github.com/couchbaselabs/tuqtng/query"
)
type CreateIndex struct {
itemChannel dparval.ValueChannel
supportChannel PipelineSupportChannel
bucket catalog.Bucket
name string
index_type string
primary bool
on ast.ExpressionList
downstreamStopChannel misc.StopChannel
query network.Query
}
func NewCreateIndex(bucket catalog.Bucket, name string, index_type string, primary bool, on ast.ExpressionList) *CreateIndex {
return &CreateIndex{
itemChannel: make(dparval.ValueChannel),
supportChannel: make(PipelineSupportChannel),
bucket: bucket,
name: name,
index_type: index_type,
primary: primary,
on: on,
}
}
func (this *CreateIndex) SetSource(source Operator) {}
func (this *CreateIndex) GetChannels() (dparval.ValueChannel, PipelineSupportChannel) {
return this.itemChannel, this.supportChannel
}
func (this *CreateIndex) Run(stopChannel misc.StopChannel) {
defer close(this.itemChannel)
defer close(this.supportChannel)
// this MUST be here so that it runs before the channels are closed
defer this.RecoverPanic()
indexType := catalog.IndexType(strings.ToLower(this.index_type))
indexOn := make(catalog.IndexKey, len(this.on))
for pos, key := range this.on {
indexOn[pos] = key
}
this.downstreamStopChannel = stopChannel
var index catalog.Index
var err query.Error
if this.primary {
clog.To(CHANNEL, "create_index (primary) operator starting")
index, err = this.bucket.CreatePrimaryIndex()
} else {
clog.To(CHANNEL, "create_index (secondary) operator starting")
index, err = this.bucket.CreateIndex(this.name, indexOn, indexType)
}
if err != nil {
this.SendError(err)
} else {
if index != nil {
item := dparval.NewValue(map[string]interface{}{})
item.SetAttachment("projection", map[string]interface{}{
"id": index.Id(),
"name": index.Name(),
})
this.SendItem(item)
} else {
clog.Warn("Successfully created index, but index was nil")
}
}
clog.To(CHANNEL, "create_index operator finished")
}
func (this *CreateIndex) processItem(item *dparval.Value) bool {
return true
}
func (this *CreateIndex) afterItems() {}
func (this *CreateIndex) SendItem(item *dparval.Value) bool {
ok := true
for ok {
select {
case this.itemChannel <- item:
return true
case _, ok = <-this.downstreamStopChannel:
// someone closed the stop channel
}
}
return ok
}
func (this *CreateIndex) SendError(err query.Error) bool {
ok := true
for ok {
select {
case this.supportChannel <- err:
if err.IsFatal() {
return false
}
return true
case _, ok = <-this.downstreamStopChannel:
// someone closed the stop channel
}
}
return false
}
func (this *CreateIndex) RecoverPanic() {
r := recover()
if r != nil {
clog.Error(fmt.Errorf("Query Execution Panic: %v\n%s", r, debug.Stack()))
this.SendError(query.NewError(nil, "Panic In Exeuction Pipeline"))
}
}
func (this *CreateIndex) SetQuery(q network.Query) {
this.query = q
}