Skip to content

Commit

Permalink
fix(dispatch): Dispatch for transform. Transformer instead of Service
Browse files Browse the repository at this point in the history
  • Loading branch information
dustmop committed Mar 11, 2021
1 parent 75dbb30 commit 24562c5
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 112 deletions.
17 changes: 9 additions & 8 deletions api/transform.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package api

import (
"encoding/json"
"net/http"

"github.com/qri-io/qri/api/util"
Expand All @@ -10,29 +9,31 @@ import (

// TransformHandlers connects HTTP requests to the TransformMethods subsystem
type TransformHandlers struct {
*lib.TransformMethods
inst *lib.Instance
}

// NewTransformHandlers constructs a TrasnformHandlers struct
func NewTransformHandlers(inst *lib.Instance) TransformHandlers {
return TransformHandlers{TransformMethods: lib.NewTransformMethods(inst)}
return TransformHandlers{inst: inst}
}

// ApplyHandler is an HTTP handler function for executing a transform script
func (h TransformHandlers) ApplyHandler(prefix string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
p := lib.ApplyParams{}
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
method := "transform.apply"
p := h.inst.NewInputParam(method)

if err := lib.UnmarshalParams(r, p); err != nil {
util.WriteErrResponse(w, http.StatusBadRequest, err)
return
}

res, err := h.TransformMethods.Apply(r.Context(), &p)
res, _, err := h.inst.Dispatch(r.Context(), method, p)
if err != nil {
util.WriteErrResponse(w, http.StatusBadRequest, err)
util.RespondWithError(w, err)
return
}

util.WriteResponse(w, res)
return
}
}
15 changes: 8 additions & 7 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,16 @@ the --apply flag on the save command to commit results from transforms.`,
type ApplyOptions struct {
ioes.IOStreams

Instance *lib.Instance

Refs *RefSelect
FilePath string
Secrets []string

TransformMethods *lib.TransformMethods
}

// Complete adds any missing configuration that can only be added just before calling Run
func (o *ApplyOptions) Complete(f Factory, args []string) (err error) {
if o.TransformMethods, err = f.TransformMethods(); err != nil {
return err
}
o.Instance = f.Instance()
if o.Refs, err = GetCurrentRefSelect(f, args, -1, nil); err != nil {
// This error will be handled during validation
if err != repo.ErrEmptyRef {
Expand All @@ -88,6 +86,9 @@ func (o *ApplyOptions) Run() error {
return errors.New("only transform scripts are supported by --file")
}

ctx := context.TODO()
inst := o.Instance

tf := dataset.Transform{
ScriptPath: o.FilePath,
}
Expand All @@ -99,14 +100,14 @@ func (o *ApplyOptions) Run() error {
}
}

ctx := context.TODO()
params := lib.ApplyParams{
Refstr: o.Refs.Ref(),
Transform: &tf,
ScriptOutput: o.Out,
Wait: true,
}
res, err := o.TransformMethods.Apply(ctx, &params)

res, err := inst.Transform().Apply(ctx, &params)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion cmd/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type Factory interface {
SearchMethods() (*lib.SearchMethods, error)
SQLMethods() (*lib.SQLMethods, error)
RenderMethods() (*lib.RenderMethods, error)
TransformMethods() (*lib.TransformMethods, error)
}

// StandardRepoPath returns qri paths based on the QRI_PATH environment
Expand Down
5 changes: 0 additions & 5 deletions cmd/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,3 @@ func (t TestFactory) SQLMethods() (*lib.SQLMethods, error) {
func (t TestFactory) RenderMethods() (*lib.RenderMethods, error) {
return lib.NewRenderMethods(t.inst), nil
}

// TransformMethods generates a lib.TransformMethods from internal state
func (t TestFactory) TransformMethods() (*lib.TransformMethods, error) {
return lib.NewTransformMethods(t.inst), nil
}
8 changes: 0 additions & 8 deletions cmd/qri.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,6 @@ func (o *QriOptions) DatasetMethods() (*lib.DatasetMethods, error) {
return lib.NewDatasetMethods(o.inst), nil
}

// TransformMethods generates a lib.TransformMethods from internal state
func (o *QriOptions) TransformMethods() (*lib.TransformMethods, error) {
if err := o.Init(); err != nil {
return nil, err
}
return lib.NewTransformMethods(o.inst), nil
}

// RemoteMethods generates a lib.RemoteMethods from internal state
func (o *QriOptions) RemoteMethods() (*lib.RemoteMethods, error) {
if err := o.Init(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions lib/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,12 +876,11 @@ func (m *DatasetMethods) Save(ctx context.Context, p *SaveParams) (*dataset.Data
ds.Transform = prevTransformDataset.Transform
}

str := m.inst.node.LocalStreams
scriptOut := p.ScriptOutput
secrets := p.Secrets
// allocate an ID for the transform, subscribe to print output & build up
// runState
runID := transform.NewRunID()
runID := run.NewID()
runState = run.NewState(runID)
// create a loader so transforms can call `load_dataset`
// TODO(b5) - add a ResolverMode save parameter and call m.inst.resolverForMode
Expand All @@ -905,7 +904,8 @@ func (m *DatasetMethods) Save(ctx context.Context, p *SaveParams) (*dataset.Data

// apply the transform
shouldWait := true
err := m.inst.transform.Apply(ctx, ds, loader, runID, m.inst.bus, shouldWait, str, scriptOut, secrets)
transformer := transform.NewTransformer(m.inst.appCtx, loader, m.inst.bus)
err := transformer.Apply(ctx, ds, runID, shouldWait, scriptOut, secrets)
if err != nil {
log.Errorw("transform run error", "err", err.Error())
runState.Message = err.Error()
Expand Down
5 changes: 3 additions & 2 deletions lib/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,11 @@ type callable struct {
// RegisterMethods iterates the methods provided by the lib API, and makes them visible to dispatch
func (inst *Instance) RegisterMethods() {
reg := make(map[string]callable)
inst.registerOne("fsi", inst.Filesys(), fsiImpl{}, reg)
inst.registerOne("access", inst.Access(), accessImpl{}, reg)
inst.registerOne("dataset", inst.Dataset(), datasetImpl{}, reg)
inst.registerOne("config", inst.Config(), configImpl{}, reg)
inst.registerOne("dataset", inst.Dataset(), datasetImpl{}, reg)
inst.registerOne("fsi", inst.Filesys(), fsiImpl{}, reg)
inst.registerOne("transform", inst.Transform(), transformImpl{}, reg)
inst.regMethods = &regMethodSet{reg: reg}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/fsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (m *FSIMethods) EnsureRef(ctx context.Context, p *LinkParams) (*dsref.Versi
return nil, dispatchReturnError(got, err)
}

// Implementations for FSI methods follow.g
// Implementations for FSI methods follow
// TODO(dustmop): Perhaps consider moving these methods to /lib/impl/*.go

// fsiImpl holds the method implementations for FSI
Expand Down
31 changes: 15 additions & 16 deletions lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/buildrepo"
"github.com/qri-io/qri/stats"
"github.com/qri-io/qri/transform"
)

var (
Expand Down Expand Up @@ -372,15 +371,15 @@ func NewInstance(ctx context.Context, repoPath string, opts ...Option) (qri *Ins
repoPath: repoPath,
cfg: cfg,

qfs: o.qfs,
repo: o.repo,
node: o.node,
streams: o.Streams,
registry: o.regclient,
logbook: o.logbook,
profiles: o.profiles,
bus: event.NewBus(ctx),
transform: transform.NewService(ctx),
qfs: o.qfs,
repo: o.repo,
node: o.node,
streams: o.Streams,
registry: o.regclient,
logbook: o.logbook,
profiles: o.profiles,
bus: event.NewBus(ctx),
appCtx: ctx,
}
qri = inst

Expand Down Expand Up @@ -670,11 +669,11 @@ func NewInstanceFromConfigAndNodeAndBus(ctx context.Context, cfg *config.Config,
cancel: cancel,
doneCh: make(chan struct{}),

cfg: cfg,
node: node,
dscache: dc,
logbook: r.Logbook(),
transform: transform.NewService(ctx),
cfg: cfg,
node: node,
dscache: dc,
logbook: r.Logbook(),
appCtx: ctx,
}
inst.RegisterMethods()

Expand Down Expand Up @@ -723,11 +722,11 @@ type Instance struct {
remoteClient remote.Client
registry *regclient.Client
stats *stats.Service
transform *transform.Service
logbook *logbook.Book
dscache *dscache.Dscache
bus event.Bus
watcher *watchfs.FilesysWatcher
appCtx context.Context

profiles profile.Store
keystore key.Store
Expand Down
7 changes: 6 additions & 1 deletion lib/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (s *scope) Context() context.Context {
return s.ctx
}

// AppContext returns the context of the top-level application, to enable graceful shutdowns
func (s *scope) AppContext() context.Context {
return s.inst.appCtx
}

// FSISubsystem returns a reference to the FSI subsystem
// TODO(dustmop): This subsystem contains global data, we should move that data out and
// into scope
Expand Down Expand Up @@ -109,7 +114,7 @@ func (s *scope) LoadDataset(ctx context.Context, ref dsref.Ref, source string) (
return s.inst.LoadDataset(ctx, ref, source)
}

// Loader returns the default dataset ref loader
// ParseResolveFunc returns a function that can parse a ref, then resolve and load it
func (s *scope) ParseResolveFunc() dsref.ParseResolveLoad {
return NewParseResolveLoadFunc("", s.inst.defaultResolver(), s.inst)
}
Expand Down
3 changes: 1 addition & 2 deletions lib/test_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ func (tr *testRunner) MustGet(t *testing.T, refstr string) *dataset.Dataset {
}

func (tr *testRunner) ApplyWithParams(ctx context.Context, p *ApplyParams) (*dataset.Dataset, error) {
m := NewTransformMethods(tr.Instance)
res, err := m.Apply(ctx, p)
res, err := tr.Instance.Transform().Apply(ctx, p)
if err != nil {
return nil, err
}
Expand Down
60 changes: 32 additions & 28 deletions lib/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@ import (
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/transform"
"github.com/qri-io/qri/transform/run"
)

// TransformMethods encapsulates business logic for transforms
// TransformMethods groups together methods for transforms
type TransformMethods struct {
inst *Instance
d dispatcher
}

// CoreRequestsName implements the Requests interface
func (TransformMethods) CoreRequestsName() string { return "apply" }
// Name returns the name of this method gropu
func (m *TransformMethods) Name() string {
return "transform"
}

// NewTransformMethods creates a TransformMethods pointer from a qri instance
func NewTransformMethods(inst *Instance) *TransformMethods {
return &TransformMethods{
inst: inst,
}
// Transform returns the TransformMethods that Instance has registered
func (inst *Instance) Transform() *TransformMethods {
return &TransformMethods{d: inst}
}

// ApplyParams are parameters for the apply command
Expand Down Expand Up @@ -55,24 +56,26 @@ type ApplyResult struct {

// Apply runs a transform script
func (m *TransformMethods) Apply(ctx context.Context, p *ApplyParams) (*ApplyResult, error) {
err := p.Validate()
if err != nil {
return nil, err
got, _, err := m.d.Dispatch(ctx, dispatchMethodName(m, "apply"), p)
if res, ok := got.(*ApplyResult); ok {
return res, err
}
return nil, dispatchReturnError(got, err)
}

res := &ApplyResult{}
// Implementations for transform methods follow

if m.inst.http != nil {
err = m.inst.http.Call(ctx, AEApply, p, res)
if err != nil {
return nil, err
}
return res, nil
}
// transformImpl holds the method implementations for transforms
type transformImpl struct{}

// Apply runs a transform script
func (transformImpl) Apply(scp scope, p *ApplyParams) (*ApplyResult, error) {
ctx := scp.Context()

var err error
ref := dsref.Ref{}
if p.Refstr != "" {
ref, _, err = m.inst.ParseAndResolveRefWithWorkingDir(ctx, p.Refstr, "")
ref, _, err = scp.ParseAndResolveRefWithWorkingDir(ctx, p.Refstr, "")
if err != nil {
return nil, err
}
Expand All @@ -85,15 +88,12 @@ func (m *TransformMethods) Apply(ctx context.Context, p *ApplyParams) (*ApplyRes
}
if p.Transform != nil {
ds.Transform = p.Transform
ds.Transform.OpenScriptFile(ctx, m.inst.repo.Filesystem())
ds.Transform.OpenScriptFile(ctx, scp.Filesystem())
}

str := m.inst.node.LocalStreams
loader := NewParseResolveLoadFunc("", m.inst.defaultResolver(), m.inst)

// allocate an ID for the transform, for now just log the events it produces
runID := transform.NewRunID()
m.inst.bus.SubscribeID(func(ctx context.Context, e event.Event) error {
runID := run.NewID()
scp.Bus().SubscribeID(func(ctx context.Context, e event.Event) error {
go func() {
log.Debugw("apply transform event", "type", e.Type, "payload", e.Payload)
if e.Type == event.ETTransformPrint {
Expand All @@ -109,11 +109,15 @@ func (m *TransformMethods) Apply(ctx context.Context, p *ApplyParams) (*ApplyRes
}, runID)

scriptOut := p.ScriptOutput
err = m.inst.transform.Apply(ctx, ds, loader, runID, m.inst.bus, p.Wait, str, scriptOut, p.Secrets)
loader := scp.ParseResolveFunc()

transformer := transform.NewTransformer(scp.AppContext(), loader, scp.Bus())
err = transformer.Apply(ctx, ds, runID, p.Wait, scriptOut, p.Secrets)
if err != nil {
return nil, err
}

res := &ApplyResult{}
if p.Wait {
if err = base.InlineJSONBody(ds); err != nil && err != base.ErrNoBodyToInline {
return nil, err
Expand Down

0 comments on commit 24562c5

Please sign in to comment.