Skip to content

Commit

Permalink
Dev Server rerun UI (#1272)
Browse files Browse the repository at this point in the history
  • Loading branch information
goodoldneon committed Apr 18, 2024
1 parent dbb714c commit 1284994
Show file tree
Hide file tree
Showing 18 changed files with 284 additions and 76 deletions.
66 changes: 66 additions & 0 deletions pkg/coreapi/generated/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/coreapi/gql.schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ type FunctionRun {
history: [RunHistoryItem!]!
historyItemOutput(id: ULID!): String
eventID: ID!
cron: String
}

enum HistoryType {
Expand Down
1 change: 1 addition & 0 deletions pkg/coreapi/graph/models/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func MakeFunctionRun(f *cqrs.FunctionRun) *FunctionRun {
EventID: f.EventID.String(),
BatchID: f.BatchID,
Status: &status,
Cron: f.Cron,
}
if len(f.Output) > 0 {
str := string(f.Output)
Expand Down
1 change: 1 addition & 0 deletions pkg/coreapi/graph/models/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions pkg/coreapi/graph/resolvers/function_run.resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/google/uuid"
"github.com/inngest/inngest/pkg/coreapi/graph/models"
"github.com/inngest/inngest/pkg/cqrs"
"github.com/inngest/inngest/pkg/enums"
"github.com/inngest/inngest/pkg/event"
"github.com/inngest/inngest/pkg/execution"
Expand Down Expand Up @@ -280,18 +279,17 @@ func (r *mutationResolver) Rerun(
}

evt, err := r.Data.GetEventByInternalID(ctx, run.EventID)
if run.Cron != nil && err == sql.ErrNoRows {
// Create a dummy event since we don't store cron events. We can delete
// this dummy when we start storing cron events
evt = &cqrs.Event{}
} else if err != nil {
if err != nil {
return zero, fmt.Errorf("failed to get run event: %w", err)
}

identifier, err := r.Executor.Schedule(ctx, execution.ScheduleRequest{
Function: *fn,
Events: []event.TrackedEvent{
event.NewOSSTrackedEvent(evt.Event()),
// We need NewOSSTrackedEventWithID to ensure that the tracked event
// has the same ID as the original event. Calling NewOSSTrackedEvent
// will result in the creation of a new ID
event.NewOSSTrackedEventWithID(evt.Event(), evt.InternalID()),
},
OriginalRunID: &run.RunID,
})
Expand Down
48 changes: 12 additions & 36 deletions pkg/coreapi/graph/resolvers/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,20 @@ func (r *queryResolver) Stream(ctx context.Context, q models.StreamQuery) ([]*mo
CreatedAt: time.UnixMilli(i.EventTS),
Runs: []*models.FunctionRun{},
}
if len(fnsByID[i.ID]) > 0 {
items[n].Runs = fnsByID[i.ID]
}
}

// Query all function runs received, and filter by crons.
fns, err = r.Data.GetFunctionRunsTimebound(ctx, tb, q.Limit)
if err != nil {
return nil, err
}
for _, i := range fns {
if i.Cron == nil && i.OriginalRunID == nil {
// These are children of events.
continue
}

var trigger string
if i.Cron != nil {
trigger = *i.Cron
} else if i.OriginalRunID != nil {
trigger = "Cron rerun"
runs := fnsByID[i.ID]
if len(runs) > 0 {
// If any of the runs is a cron, then the stream item is a cron
for _, run := range runs {
if run.Cron != nil {
items[n].Trigger = *run.Cron
items[n].Type = models.StreamTypeCron
break
}
}

items[n].Runs = runs
}

runs := []*models.FunctionRun{models.MakeFunctionRun(i)}
_, err := r.Data.GetFunctionByInternalUUID(ctx, uuid.UUID{}, uuid.MustParse(runs[0].FunctionID))
if err == sql.ErrNoRows {
// Skip run since its function doesn't exist. This can happen when
// deleting a function or changing its ID.
runs = []*models.FunctionRun{}
}

items = append(items, &models.StreamItem{
ID: i.RunID.String(),
Trigger: trigger,
Type: models.StreamTypeCron,
CreatedAt: i.RunStartedAt,
Runs: runs,
})
}

sort.Slice(items, func(i, j int) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/devserver/devserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func start(ctx context.Context, opts StartOpts) error {
runner.WithTracker(t),
runner.WithRateLimiter(rl),
runner.WithBatchManager(batcher),
runner.WithPublisher(pb),
)

// The devserver embeds the event API.
Expand Down
7 changes: 7 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ func NewOSSTrackedEvent(e Event) TrackedEvent {
}
}

func NewOSSTrackedEventWithID(e Event, id ulid.ULID) TrackedEvent {
return ossTrackedEvent{
Id: id,
Event: e,
}
}

func NewOSSTrackedEventFromString(data string) (*ossTrackedEvent, error) {
evt := &ossTrackedEvent{}
if err := json.Unmarshal([]byte(data), evt); err != nil {
Expand Down
34 changes: 31 additions & 3 deletions pkg/execution/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -106,6 +107,12 @@ func WithTracker(t *Tracker) func(s *svc) {
}
}

func WithPublisher(p pubsub.Publisher) func(s *svc) {
return func(s *svc) {
s.publisher = p
}
}

func NewService(c config.Config, opts ...Opt) Runner {
svc := &svc{config: c}
for _, o := range opts {
Expand All @@ -119,7 +126,8 @@ type svc struct {
cqrs cqrs.Manager
// pubsub allows us to subscribe to new events, and re-publish events
// if there are errors.
pubsub pubsub.PublishSubscriber
pubsub pubsub.PublishSubscriber
publisher pubsub.Publisher
// executor handles execution of functions.
executor execution.Executor
// data provides the required loading capabilities to trigger functions
Expand Down Expand Up @@ -252,13 +260,33 @@ func (s *svc) InitializeCrons(ctx context.Context) error {
))
defer span.End()

err := s.initialize(ctx, fn, event.NewOSSTrackedEvent(event.Event{
trackedEvent := event.NewOSSTrackedEvent(event.Event{
Data: map[string]any{
"cron": cron,
},
ID: time.Now().UTC().Format(time.RFC3339),
Name: event.FnCronName,
}))
})

byt, err := json.Marshal(trackedEvent)
if err == nil {
err := s.publisher.Publish(
ctx,
s.config.EventStream.Service.TopicName(),
pubsub.Message{
Name: event.EventReceivedName,
Data: string(byt),
Timestamp: time.Now(),
},
)
if err != nil {
logger.From(ctx).Error().Err(err).Msg("error publishing cron event")
}
} else {
logger.From(ctx).Error().Err(err).Msg("error marshaling cron event")
}

err = s.initialize(ctx, fn, trackedEvent)
if err != nil {
logger.From(ctx).Error().Err(err).Msg("error initializing scheduled function")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/history_drivers/memory_writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (w *writer) writeWorkflowStart(
run := w.store.Data[item.RunID]
run.Run.AccountID = item.AccountID
run.Run.BatchID = item.BatchID
run.Run.Cron = item.Cron
run.Run.EventID = item.EventID
run.Run.ID = item.RunID
run.Run.OriginalRunID = item.OriginalRunID
Expand Down

0 comments on commit 1284994

Please sign in to comment.