Skip to content

Commit

Permalink
feat(transform): Add runID to transform. Publish some events.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustmop committed Jan 26, 2021
1 parent 1a0ef6f commit 5ceac77
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 35 deletions.
1 change: 1 addition & 0 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (o *ApplyOptions) Run() error {
Refstr: o.Refs.Ref(),
Transform: &tf,
ScriptOutput: o.Out,
Wait: true,
}
res := lib.ApplyResult{}
if err = o.TransformMethods.Apply(&params, &res); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (b *bus) Publish(ctx context.Context, typ Type, payload interface{}) error
return b.publish(ctx, typ, "", payload)
}

// Publish sends an event with a given sessionID to the bus
// PublishID sends an event with a given sessionID to the bus
func (b *bus) PublishID(ctx context.Context, typ Type, sessionID string, payload interface{}) error {
return b.publish(ctx, typ, sessionID, payload)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (b *bus) SubscribeID(handler Handler, sessionID string) {
b.idSubs[sessionID] = append(b.idSubs[sessionID], handler)
}

// SubscribeAll requets all events from the bus
// SubscribeAll requests all events from the bus
func (b *bus) SubscribeAll(handler Handler) {
b.lk.Lock()
defer b.lk.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions event/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package event

const (
// ETTransformStart signals the start a transform execution
ETTransformStart = Type("transform:Start")
// ETTransformComplete signals the successful completion of a transform execution
ETTransformComplete = Type("transform:Complete")
// ETTransformFailure signals the failure of a transform execution
ETTransformFailure = Type("transform:Failure")
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gofrs/flock v0.7.1 // indirect
github.com/google/flatbuffers v1.12.1-0.20200706154056-969d0f7a6317
github.com/google/go-cmp v0.5.3
github.com/google/uuid v1.1.1
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs v0.6.0
Expand All @@ -38,9 +39,8 @@ require (
github.com/multiformats/go-multihash v0.0.14
github.com/olekukonko/tablewriter v0.0.4
github.com/pkg/errors v0.9.1
github.com/qri-io/dataset v0.2.1-0.20201201155506-9b4fc79ffde8
github.com/qri-io/dag v0.2.2-0.20201208212257-ae00241c4b48
github.com/qri-io/dataset v0.2.1-0.20201124144731-82162a0f76e6
github.com/qri-io/dataset v0.2.1-0.20210119191645-a8a3aa840464
github.com/qri-io/deepdiff v0.2.1-0.20200807143746-d02d9f531f5b
github.com/qri-io/didmod v0.0.0-20201123165422-8b2e224c993a
github.com/qri-io/doggos v0.1.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,10 @@ github.com/qri-io/dataset v0.2.1-0.20201124144731-82162a0f76e6 h1:a9CYZQ+DCzwqg8
github.com/qri-io/dataset v0.2.1-0.20201124144731-82162a0f76e6/go.mod h1:HtwGskdCECbOON0iVQHEEm6fykwDqharlqabc1ssj3Y=
github.com/qri-io/dataset v0.2.1-0.20201201155506-9b4fc79ffde8 h1:/9pbWabRT9BbjFp1AjdAsXKT2NQp+mGmyvnTylPyEHY=
github.com/qri-io/dataset v0.2.1-0.20201201155506-9b4fc79ffde8/go.mod h1:HtwGskdCECbOON0iVQHEEm6fykwDqharlqabc1ssj3Y=
github.com/qri-io/dataset v0.2.1-0.20210118153421-4b6176e58a47 h1:8VYob4qYMMqvaGuJ54Js+8VrUOF8O8TrOJ/aJOeihbA=
github.com/qri-io/dataset v0.2.1-0.20210118153421-4b6176e58a47/go.mod h1:HtwGskdCECbOON0iVQHEEm6fykwDqharlqabc1ssj3Y=
github.com/qri-io/dataset v0.2.1-0.20210119191645-a8a3aa840464 h1:EzaPWYsCUEJnx6cniZpeF5g3JZJ2yOhsICjRVP9z7Yk=
github.com/qri-io/dataset v0.2.1-0.20210119191645-a8a3aa840464/go.mod h1:HtwGskdCECbOON0iVQHEEm6fykwDqharlqabc1ssj3Y=
github.com/qri-io/deepdiff v0.2.1-0.20200807143746-d02d9f531f5b h1:T8qEIv+qLi5mVWvSS329wJ+HbN7cfMwCWjRVzh/+upo=
github.com/qri-io/deepdiff v0.2.1-0.20200807143746-d02d9f531f5b/go.mod h1:NrL/b7YvexgpGb4HEO3Rlx5RrMLDfxuKDf/XDAq5ac0=
github.com/qri-io/didmod v0.0.0-20201123165422-8b2e224c993a h1:40BIa59lae2xZ7iieb3UU4/X57jZsWZ6QgqwdjDQhig=
Expand Down
12 changes: 11 additions & 1 deletion lib/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/qri-io/qri/dscache/build"
"github.com/qri-io/qri/dsref"
qrierr "github.com/qri-io/qri/errors"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/fsi"
"github.com/qri-io/qri/fsi/linkfile"
"github.com/qri-io/qri/repo"
Expand Down Expand Up @@ -622,8 +623,17 @@ func (m *DatasetMethods) Save(p *SaveParams, res *dataset.Dataset) error {
// string and control how transform functions
loader := NewParseResolveLoadFunc("", m.inst.defaultResolver(), m.inst)

// allocate an ID for the transform, for now just log the events it produces
runID := transform.NewRunID()
m.inst.bus.SubscribeID(func(ctx context.Context, e event.Event) error {
when := time.Unix(e.Timestamp/1000000000, e.Timestamp%1000000000)
log.Infof("[%s] event %s: %s", when, e.Type, e.Payload)
return nil
}, runID)

// apply the transform
err := transform.Apply(ctx, ds, loader, str, scriptOut, secrets)
shouldWait := true
err := transform.Apply(ctx, ds, loader, runID, m.inst.bus, shouldWait, str, scriptOut, secrets)
if err != nil {
return err
}
Expand Down
32 changes: 21 additions & 11 deletions lib/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/qri-io/dataset"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/transform"
)

Expand All @@ -30,12 +32,10 @@ func NewTransformMethods(inst *Instance) *TransformMethods {
type ApplyParams struct {
Refstr string
Transform *dataset.Transform
Source string
Secrets map[string]string
Wait bool

// TODO(dustmop): add `Wait bool`, if false, run the save asynchronously
// and return events on the bus that provide the progress of the save operation

Source string
ScriptOutput io.Writer
}

Expand All @@ -49,9 +49,8 @@ func (p *ApplyParams) Valid() error {

// ApplyResult is the result of an apply command
type ApplyResult struct {
Data *dataset.Dataset
// TODO(dustmop): Make Apply asynchronous, running the transform in a go-routine.
// Return a channel that will send progress on the execution.
Data *dataset.Dataset
RunID string `json:"runID"`
}

// Apply runs a transform script
Expand Down Expand Up @@ -87,15 +86,26 @@ func (m *TransformMethods) Apply(p *ApplyParams, res *ApplyResult) error {
str := m.inst.node.LocalStreams
loader := NewParseResolveLoadFunc("", m.inst.defaultResolver(), m.inst)

// allocate an ID for the transform, for now just log the events it produces
runID := transform.NewRunID()
m.inst.bus.SubscribeID(func(ctx context.Context, e event.Event) error {
when := time.Unix(e.Timestamp/1000000000, e.Timestamp%1000000000)
log.Infof("[%s] event %s: %s", when, e.Type, e.Payload)
return nil
}, runID)

scriptOut := p.ScriptOutput
err = transform.Apply(ctx, ds, loader, str, scriptOut, p.Secrets)
err = transform.Apply(ctx, ds, loader, runID, m.inst.bus, p.Wait, str, scriptOut, p.Secrets)
if err != nil {
return err
}

if err = base.InlineJSONBody(ds); err != nil && err != base.ErrNoBodyToInline {
return err
if p.Wait {
if err = base.InlineJSONBody(ds); err != nil && err != base.ErrNoBodyToInline {
return err
}
res.Data = ds
}
res.Data = ds
res.RunID = runID
return nil
}
1 change: 1 addition & 0 deletions lib/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestApplyTransform(t *testing.T) {
Transform: &dataset.Transform{
ScriptPath: "testdata/cities_2/add_city.star",
},
Wait: true,
})
if err != nil {
t.Error(err)
Expand Down
6 changes: 4 additions & 2 deletions lib/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ func (h *wsHandler) WSConnectionHandler(w http.ResponseWriter, r *http.Request)
func (h *wsHandler) wsMessageHandler(_ context.Context, e event.Event) error {
ctx := context.Background()
evt := map[string]interface{}{
"type": string(e.Type),
"data": e.Payload,
"type": string(e.Type),
"ts": e.Timestamp,
"sessionID": e.SessionID,
"data": e.Payload,
}

log.Debugf("sending event %q to %d websocket conns", e.Type, len(h.conns))
Expand Down
25 changes: 21 additions & 4 deletions transform/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,41 @@ import (
"fmt"
"io"

"github.com/google/uuid"
golog "github.com/ipfs/go-log"
"github.com/qri-io/dataset"
"github.com/qri-io/ioes"
"github.com/qri-io/qri/dsref"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/transform/startf"
)

// TODO(dustmop): Tests. Especially once the `apply` command exists.
var log = golog.Logger("transform")

// Apply applies the transform script to order to modify the changing dataset
func Apply(
ctx context.Context,
ds *dataset.Dataset,
loader dsref.ParseResolveLoad,
runID string,
pub event.Publisher,
wait bool,
str ioes.IOStreams,
scriptOut io.Writer,
secrets map[string]string,
) (err error) {
) error {
var (
target = ds
head *dataset.Dataset
err error
)

if target.Transform == nil || target.Transform.ScriptFile() == nil {
return errors.New("apply requires a transform component with a script file")
}
if runID == "" {
return errors.New("apply requires a runID")
}

if ds.Name != "" {
head, err = loader(ctx, fmt.Sprintf("%s/%s", ds.Peername, ds.Name))
Expand All @@ -54,11 +64,18 @@ func Apply(
startf.AddDatasetLoader(loader),
}

pub.PublishID(ctx, event.ETTransformStart, runID, "")

if err = startf.ExecScript(ctx, target, head, opts...); err != nil {
pub.PublishID(ctx, event.ETTransformFailure, runID, "")
return err
}

str.PrintErr("✅ transform complete\n")

pub.PublishID(ctx, event.ETTransformComplete, runID, "")
return nil
}

// NewRunID creates a run identifier
func NewRunID() string {
return uuid.New().String()
}
5 changes: 2 additions & 3 deletions transform/startf/entry_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package startf

import (
"fmt"
"log"
"testing"

"github.com/qri-io/dataset"
Expand Down Expand Up @@ -37,7 +36,7 @@ func TestEntryReaderSimpleList(t *testing.T) {
for i, e := range expect {
ent, err := r.ReadEntry()
if err != nil {
log.Fatal(err)
t.Fatal(err)
}

if e.index != ent.Index {
Expand Down Expand Up @@ -77,7 +76,7 @@ func TestEntryReaderSimpleDict(t *testing.T) {
for i, e := range expect {
ent, err := r.ReadEntry()
if err != nil {
log.Fatal(err)
t.Fatal(err)
}

if e.index != ent.Index {
Expand Down
23 changes: 15 additions & 8 deletions transform/startf/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"

golog "github.com/ipfs/go-log"
"github.com/qri-io/dataset"
"github.com/qri-io/qfs"
"github.com/qri-io/qri/dsref"
Expand All @@ -22,6 +23,8 @@ import (
"go.starlark.net/starlark"
)

var log = golog.Logger("startf")

// Version is the version of qri that this transform was run with
var Version = version.Version

Expand Down Expand Up @@ -283,7 +286,6 @@ type specialFunc func(t *transform, thread *starlark.Thread, ctx *skyctx.Context
func callDownloadFunc(t *transform, thread *starlark.Thread, ctx *skyctx.Context) (result starlark.Value, err error) {
httpGuard.EnableNtwk()
defer httpGuard.DisableNtwk()
t.print("📡 running download...\n")

var download *starlark.Function
if download, err = t.globalFunc("download"); err != nil {
Expand All @@ -304,7 +306,6 @@ func callTransformFunc(t *transform, thread *starlark.Thread, ctx *skyctx.Contex
}
return err
}
t.print("🤖 running transform...\n")

d := skyds.NewDataset(t.prev, t.checkFunc)
d.SetMutable(t.next)
Expand All @@ -314,14 +315,10 @@ func callTransformFunc(t *transform, thread *starlark.Thread, ctx *skyctx.Contex
return nil
}

// print writes output only if a node is specified
func (t *transform) print(msg string) {
t.stderr.Write([]byte(msg))
}

func (t *transform) locals() starlark.StringDict {
return starlark.StringDict{
"load_dataset": starlark.NewBuiltin("load_dataset", t.LoadDataset),
"print": starlark.NewBuiltin("print", t.print),
}
}

Expand Down Expand Up @@ -369,6 +366,16 @@ func (t *transform) LoadDataset(thread *starlark.Thread, _ *starlark.Builtin, ar
return skyds.NewDataset(ds, nil).Methods(), nil
}

func (t *transform) print(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var message starlark.String
if err := starlark.UnpackArgs("print", args, kwargs, "message", &message); err != nil {
return starlark.None, err
}
t.stderr.Write([]byte(message))
io.WriteString(t.stderr, "\n")
return starlark.None, nil
}

// MutatedComponentsFunc returns a function for checking if a field has been
// modified. it's a kind of data structure mutual exclusion lock
// TODO (b5) - this should be refactored & expanded
Expand Down Expand Up @@ -398,7 +405,7 @@ func MutatedComponentsFunc(dsp *dataset.Dataset) func(path ...string) error {
return fmt.Errorf(`transform script and user-supplied dataset are both trying to set:
%s
please adjust either the transform script or remove the supplied '%s'`, path[0], path[0])
please adjust either the transform script or remove the supplied %q`, path[0], path[0])
}
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions transform/startf/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func TestExecScript(t *testing.T) {
if err != nil {
t.Fatal(err)
}
expect := `🤖 running transform...
hello world!`
expect := "hello world!\n"
if string(output) != expect {
t.Errorf("stderr mismatch. expected: '%s', got: '%s'", expect, string(output))
}
Expand Down

0 comments on commit 5ceac77

Please sign in to comment.