forked from cloudflare/go-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
operator.go
47 lines (37 loc) · 1.02 KB
/
operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package pg
import "github.com/cloudflare/go-stream/cube"
import "github.com/cloudflare/go-stream/stream"
import "github.com/cloudflare/go-stream/stream/mapper"
import (
"database/sql"
"log"
)
func NewUpsertOp(dbconnect string, tableName string, cd cube.CubeDescriber) (stream.Operator, stream.ProcessedNotifier, *Executor) {
db, err := sql.Open("postgres", dbconnect)
if err != nil {
log.Fatal(err)
}
drv := db.Driver()
conn, err := drv.Open(dbconnect)
if err != nil {
log.Fatal(err)
}
table := MakeTable(tableName, cd)
exec := NewExecutor(table, conn)
//exec.CreateBaseTable()
ready := stream.NewNonBlockingProcessedNotifier(2)
f := func(input stream.Object, out mapper.Outputer) {
in := input.(*cube.TimeRepartitionedCube)
visitor := func(part cube.Partition, c cube.Cuber) {
exec.UpsertCube(part, c)
}
in.VisitPartitions(visitor)
ready.Notify(1)
}
exit := func() {
log.Println("Db Upser Exit: ")
}
op := mapper.NewOpExitor(f, exit, "DbUpsert")
op.Parallel = false
return op, ready, exec
}