/
computation.go
75 lines (66 loc) · 2.06 KB
/
computation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package computations
import (
log "github.com/Sirupsen/logrus"
"net/rpc/jsonrpc"
"sync"
"github.com/natefinch/pie"
"github.com/nsaje/dagger/dagger"
)
// ComputationImplementation represents a specific computation implementation
type ComputationImplementation interface {
GetInfo(definition string) (dagger.ComputationPluginInfo, error)
SubmitRecord(t *dagger.Record) ([]*dagger.Record, error)
GetState() ([]byte, error)
SetState([]byte) error
}
// StartPlugin starts serving RPC requests and brokers data between RPC caller
// and the computation
func StartPlugin(impl ComputationImplementation) {
provider := pie.NewProvider()
computationPlugin := &ComputationPlugin{impl, sync.RWMutex{}}
if err := provider.RegisterName("Computation", computationPlugin); err != nil {
log.Fatalf("failed to register computation Plugin: %s", err)
}
provider.ServeCodec(jsonrpc.NewServerCodec)
}
// ComputationPlugin handles RPC calls from the main dagger process
type ComputationPlugin struct {
impl ComputationImplementation
mx sync.RWMutex
}
// GetInfo returns the inputs to this computation
func (p *ComputationPlugin) GetInfo(definition string, response *dagger.ComputationPluginInfo) error {
p.mx.RLock()
defer p.mx.RUnlock()
info, err := p.impl.GetInfo(definition)
*response = info
return err
}
// SubmitRecord submits the record into processing
func (p *ComputationPlugin) SubmitRecord(t *dagger.Record,
response *dagger.ComputationPluginResponse) error {
p.mx.Lock()
defer p.mx.Unlock()
*response = dagger.ComputationPluginResponse{}
newRecords, err := p.impl.SubmitRecord(t)
response.Records = newRecords
return err
}
// GetState returns the dump of computation's state to dagger
func (p *ComputationPlugin) GetState(_ struct{},
response *[]byte) error {
p.mx.RLock()
defer p.mx.RUnlock()
state, err := p.impl.GetState()
*response = state
return err
}
// SetState seeds the state of the computation
func (p *ComputationPlugin) SetState(state []byte,
response *string) error {
p.mx.Lock()
defer p.mx.Unlock()
*response = "ok"
err := p.impl.SetState(state)
return err
}