Skip to content

Commit

Permalink
feat(run): run package defines state of a transform run
Browse files Browse the repository at this point in the history
run.State is a passable, cachable data structure that represents the state of a run.
We build one up as events are emitted & can use them for storage upon completion.
  • Loading branch information
b5 committed Feb 17, 2021
1 parent e5579a0 commit 8e69e5e
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 0 deletions.
4 changes: 4 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (b *bus) publish(ctx context.Context, typ Type, sessionID string, payload i
Payload: payload,
}

if b.closed {
return e, ErrBusClosed
}

// TODO(dustmop): Add instrumentation, perhaps to ctx, to make logging / tracing
// a single event easier to do.

Expand Down
168 changes: 168 additions & 0 deletions transform/run/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Package run defines metadata about transform script execution
package run

import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/qri-io/qri/event"
)

// NewID creates a run identifier
func NewID() string {
return uuid.New().String()
}

// Status enumerates all possible execution states of a transform script or
// step within a script, in relation to the current time.
// Scripts & steps that have completed are broken into categories based on exit
// state
type Status string

const (
// RSWaiting indicates a script/step that has yet to start
RSWaiting = Status("waiting")
// RSRunning indicates a script/step is currently executing
RSRunning = Status("running")
// RSSucceeded indicates a script/step has completed without error
RSSucceeded = Status("succeeded")
// RSFailed indicates a script/step completed & exited when an unexpected error
// occured
RSFailed = Status("failed")
// RSUnchanged indicates a script completed but no changes were found
// since the last version of the script succeeded
RSUnchanged = Status("unchanged")
// RSSkipped indicates a script/step was not executed
RSSkipped = Status("skipped")
)

// State is a passable, cachable data structure that describes the execution of
// a transform. State structs can act as a sink of transform events, collapsing
// the state transition of multiple transform events into a single structure
type State struct {
ID string `json:"id"`
Number int `json:"number"`
Status Status `json:"status"`
Message string `json:"message"`
StartTime *time.Time `json:"startTime"`
StopTime *time.Time `json:"stopTime"`
Duration int `json:"duration"`
Steps []*StepState `json:"steps"`
}

// NewState is a simple constructor to remind package consumers that state
// structs must be initialized with an identifier to act as a sink of transform
// events
func NewState(id string) *State {
return &State{
ID: id,
}
}

// AddTransformEvent alters state based on a given event
func (rs *State) AddTransformEvent(e event.Event) error {
if rs.ID != e.SessionID {
// silently ignore session ID mismatch
return nil
}

switch e.Type {
case event.ETTransformStart:
rs.Status = RSRunning
rs.StartTime = toTimePointer(e.Timestamp)
return nil
case event.ETTransformStop:
rs.StopTime = toTimePointer(e.Timestamp)
if tl, ok := e.Payload.(event.TransformLifecycle); ok {
rs.Status = Status(tl.Status)
}
if rs.StartTime != nil && rs.StopTime != nil {
rs.Duration = int(rs.StopTime.Sub(*rs.StartTime))
}
return nil
case event.ETTransformStepStart:
s, err := NewStepStateFromEvent(e)
if err != nil {
return err
}
s.Status = RSRunning
s.StartTime = toTimePointer(e.Timestamp)
rs.Steps = append(rs.Steps, s)
return nil
case event.ETTransformStepStop:
step, err := rs.lastStep()
if err != nil {
return err
}
step.StopTime = toTimePointer(e.Timestamp)
if tsl, ok := e.Payload.(event.TransformStepLifecycle); ok {
step.Status = Status(tsl.Status)
} else {
step.Status = RSFailed
}
if step.StartTime != nil && step.StopTime != nil {
step.Duration = int(step.StopTime.Sub(*step.StartTime))
}
return nil
case event.ETTransformStepSkip:
s, err := NewStepStateFromEvent(e)
if err != nil {
return err
}
s.Status = RSSkipped
rs.Steps = append(rs.Steps, s)
return nil
case event.ETTransformPrint,
event.ETTransformError,
event.ETTransformDatasetPreview:
return rs.appendStepOutputLog(e)
}
return fmt.Errorf("unexpected event type: %q", e.Type)
}

func (rs *State) lastStep() (*StepState, error) {
if len(rs.Steps) > 0 {
return rs.Steps[len(rs.Steps)-1], nil
}
return nil, fmt.Errorf("expected step to exist")
}

func (rs *State) appendStepOutputLog(e event.Event) error {
step, err := rs.lastStep()
if err != nil {
return err
}

step.Output = append(step.Output, e)
return nil
}

// StepState describes the execution of a transform step
type StepState struct {
Name string `json:"name"`
Category string `json:"category"`
Status Status `json:"status"`
StartTime *time.Time `json:"startTime"`
StopTime *time.Time `json:"stopTime"`
Duration int `json:"duration"`
Output []event.Event `json:"output"`
}

// NewStepStateFromEvent constructs StepState from an event
func NewStepStateFromEvent(e event.Event) (*StepState, error) {
if tsl, ok := e.Payload.(event.TransformStepLifecycle); ok {
return &StepState{
Name: tsl.Name,
Category: tsl.Category,
Status: Status(tsl.Status),
}, nil
}
return nil, fmt.Errorf("run step event data must be a transform step lifecycle struct")
}

func toTimePointer(unixnano int64) *time.Time {
// TODO (b5) - we're dropping nanosecond precision here :/
t := time.Unix(unixnano, 0)
return &t
}
116 changes: 116 additions & 0 deletions transform/run/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package run

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/qri-io/qri/event"
)

func TestStateAddTransformEvent(t *testing.T) {
runID := NewID()
states := []struct {
e event.Event
r *State
}{
{
event.Event{Type: event.ETTransformStart, Timestamp: 1609460600090, SessionID: runID, Payload: event.TransformLifecycle{StepCount: 4, Status: "running"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning},
},
{
event.Event{Type: event.ETTransformStepStart, Timestamp: 1609460700090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "setup"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), Status: RSRunning},
}},
},
// {
// event.Event{ Type: event.ETVersionPulled, Timestamp: 1609460800090, SessionID: runID, Payload: {"refstring": "rico/presidents@QmFoo", "remote": "https://registy.qri.cloud" }},
// &State{},
// },
{
event.Event{Type: event.ETTransformStepStop, Timestamp: 1609460900090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "setup", Status: "succeeded"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
}},
},
{
event.Event{Type: event.ETTransformStepStart, Timestamp: 1609461000090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "download"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
{Name: "download", StartTime: toTimePointer(1609461000090), Status: RSRunning},
}},
},
{
event.Event{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
{Name: "download", StartTime: toTimePointer(1609461000090), Status: RSRunning, Output: []event.Event{
{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}},
}},
}},
},
// {
// event.Event{ Type: event.ETHttpRequestStart, Timestamp: 1609461200090, SessionID: runID, Payload: {"id": runID, "downloadSize": 230409, "method": "Gevent.ET", "url": "https://registy.qri.cloud" }},
// &State{},
// {
// {
// event.Event{ Type: event.ETHttpRequestStop, Timestamp: 1609461300090, SessionID: runID, Payload: {"size": 230409, "method": "Gevent.ET", "url": "https://registy.qri.cloud" }},
// &State{},
// },
{
event.Event{Type: event.ETTransformStepStop, Timestamp: 1609461400090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "download", Status: "succeeded"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
{Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{
{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}},
}},
}},
},
{
event.Event{Type: event.ETTransformStepStart, Timestamp: 1609461500090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "transform"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
{Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{
{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}},
}},
{Name: "transform", StartTime: toTimePointer(1609461500090), Status: RSRunning},
}},
},
{
event.Event{Type: event.ETTransformStepStop, Timestamp: 1609461600090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "transform", Status: "succeeded"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
{Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{
{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}},
}},
{Name: "transform", StartTime: toTimePointer(1609461500090), StopTime: toTimePointer(1609461600090), Duration: 100000000000000, Status: RSSucceeded},
}},
},
{
event.Event{Type: event.ETTransformStop, Timestamp: 1609461900090, SessionID: runID, Payload: event.TransformLifecycle{Status: "failed"}},
&State{ID: runID, StartTime: toTimePointer(1609460600090), StopTime: toTimePointer(1609461900090), Duration: 1300000000000000, Status: RSFailed, Steps: []*StepState{
{Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded},
{Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{
{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}},
}},
{Name: "transform", StartTime: toTimePointer(1609461500090), StopTime: toTimePointer(1609461600090), Duration: 100000000000000, Status: RSSucceeded},
}},
},
}

for i, s := range states {
t.Run(fmt.Sprintf("after_event_%d", i), func(t *testing.T) {
got := NewState(runID)
for j := 0; j <= i; j++ {
if err := got.AddTransformEvent(states[j].e); err != nil {
t.Fatal(err)
}
}

if diff := cmp.Diff(s.r, got); diff != "" {
t.Errorf("result mismatch. (-want +got):\n%s", diff)
}
})
}
}

0 comments on commit 8e69e5e

Please sign in to comment.