Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion cmd/workflow/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,33 @@ func (h *handler) Execute(ctx context.Context) error {
return err
}

exists, existingStatus, err := adapter.CheckWorkflowExists(
h.inputs.WorkflowOwner,
h.inputs.WorkflowName,
h.inputs.WorkflowTag,
h.workflowArtifact.WorkflowID,
)
if err != nil {
return fmt.Errorf("failed to check if workflow exists: %w", err)
}
h.existingWorkflowStatus = existingStatus
if exists {
if err := confirmWorkflowOverwrite(h.inputs.WorkflowName, h.inputs.SkipConfirmation); err != nil {
return err
}
}

ui.Line()
ui.Dim("Uploading files...")
if err := h.uploadArtifacts(); err != nil {
return fmt.Errorf("failed to upload workflow: %w", err)
}

return adapter.Upsert()
err = adapter.Upsert()
if err == nil {
warnIfPausedWorkflowUpdate(h.existingWorkflowStatus)
}
return err
}

// prepareArtifacts handles compile/fetch, artifact preparation, and hashing.
Expand Down Expand Up @@ -333,3 +353,26 @@ func (h *handler) displayWorkflowDetails() {
ui.Dim(fmt.Sprintf("Owner Address: %s", h.inputs.WorkflowOwner))
ui.Line()
}

func confirmWorkflowOverwrite(workflowName string, skipConfirmation bool) error {
ui.Warning(fmt.Sprintf("Workflow %s already exists", workflowName))
ui.Dim("This will update the existing workflow.")

if !skipConfirmation {
confirm, err := ui.Confirm("Are you sure you want to overwrite the workflow?")
if err != nil {
return err
}
if !confirm {
return errors.New("deployment cancelled by user")
}
}

return nil
}

func warnIfPausedWorkflowUpdate(status *uint8) {
if status != nil && *status == workflowStatusPaused {
ui.Warning("Your workflow is paused and has been updated")
}
}
207 changes: 166 additions & 41 deletions cmd/workflow/deploy/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,33 +575,45 @@ func TestExecute_PrivateRegistry(t *testing.T) {
defer wasmServer.Close()

gqlServer := newAssertGQLServer(t, func(t *testing.T, req deployMockGraphQLRequest) (int, map[string]any) {
assert.Contains(t, req.Query, "mutation UpsertOffchainWorkflow")
rawRequest, ok := req.Variables["request"].(map[string]any)
require.True(t, ok)
rawWorkflow, ok := rawRequest["workflow"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "test_workflow", rawWorkflow["workflowName"])
assert.Equal(t, "test-don", rawWorkflow["donFamily"])
assert.Equal(t, wasmServer.URL+"/binary.wasm", rawWorkflow["binaryUrl"])
assert.Equal(t, "WORKFLOW_STATUS_ACTIVE", rawWorkflow["status"])

return http.StatusOK, map[string]any{
"data": map[string]any{
"upsertOffchainWorkflow": map[string]any{
"workflow": map[string]any{
"workflowId": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"owner": chainsim.TestAddress,
"createdAt": "2025-01-01T00:00:00Z",
"status": "WORKFLOW_STATUS_ACTIVE",
"workflowName": "test_workflow",
"binaryUrl": wasmServer.URL + "/binary.wasm",
"configUrl": "",
"tag": "test_workflow",
"attributes": "",
"donFamily": "test-don",
switch {
case req.Query != "" && containsQuery(req.Query, "query GetOffchainWorkflowByName"):
rawRequest, ok := req.Variables["request"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "test_workflow", rawRequest["workflowName"])
return http.StatusOK, map[string]any{
"errors": []map[string]string{{"message": "workflow not found"}},
}
case req.Query != "" && containsQuery(req.Query, "mutation UpsertOffchainWorkflow"):
rawRequest, ok := req.Variables["request"].(map[string]any)
require.True(t, ok)
rawWorkflow, ok := rawRequest["workflow"].(map[string]any)
require.True(t, ok)
assert.Equal(t, "test_workflow", rawWorkflow["workflowName"])
assert.Equal(t, "test-don", rawWorkflow["donFamily"])
assert.Equal(t, wasmServer.URL+"/binary.wasm", rawWorkflow["binaryUrl"])
assert.Equal(t, "WORKFLOW_STATUS_ACTIVE", rawWorkflow["status"])

return http.StatusOK, map[string]any{
"data": map[string]any{
"upsertOffchainWorkflow": map[string]any{
"workflow": map[string]any{
"workflowId": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"owner": chainsim.TestAddress,
"createdAt": "2025-01-01T00:00:00Z",
"status": "WORKFLOW_STATUS_ACTIVE",
"workflowName": "test_workflow",
"binaryUrl": wasmServer.URL + "/binary.wasm",
"configUrl": "",
"tag": "test_workflow",
"attributes": "",
"donFamily": "test-don",
},
},
},
},
}
default:
t.Fatalf("unexpected GraphQL operation: %s", req.Query)
return 0, nil
}
})
defer gqlServer.Close()
Expand All @@ -612,14 +624,125 @@ func TestExecute_PrivateRegistry(t *testing.T) {
assert.NotEmpty(t, h.workflowArtifact.WorkflowID)
})

t.Run("surfaces GraphQL errors from private execute path", func(t *testing.T) {
t.Run("continues when private workflow lookup returns not found", func(t *testing.T) {
wasmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("workflow wasm payload"))
}))
defer wasmServer.Close()

gqlServer := newAssertGQLServer(t, func(t *testing.T, req deployMockGraphQLRequest) (int, map[string]any) {
if containsQuery(req.Query, "query GetOffchainWorkflowByName") {
return http.StatusOK, map[string]any{
"errors": []map[string]string{{"message": "workflow not found"}},
}
}
if containsQuery(req.Query, "mutation UpsertOffchainWorkflow") {
return http.StatusOK, map[string]any{
"data": map[string]any{
"upsertOffchainWorkflow": map[string]any{
"workflow": map[string]any{
"workflowId": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"owner": chainsim.TestAddress,
"createdAt": "2025-01-01T00:00:00Z",
"status": "WORKFLOW_STATUS_ACTIVE",
"workflowName": "test_workflow",
"binaryUrl": wasmServer.URL + "/binary.wasm",
"configUrl": "",
"tag": "test_workflow",
"attributes": "",
"donFamily": "test-don",
},
},
},
}
}
t.Fatalf("unexpected GraphQL operation: %s", req.Query)
return 0, nil
})
defer gqlServer.Close()

h := newPrivateRegistryExecuteHandler(t, wasmServer.URL+"/binary.wasm", gqlServer.URL)
require.NoError(t, h.ValidateInputs())
require.NoError(t, h.Execute(context.Background()))
})

t.Run("prompts overwrite path can proceed with skip confirmation", func(t *testing.T) {
wasmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("workflow wasm payload"))
}))
defer wasmServer.Close()

gqlServer := newMockGQLServer(t, map[string]any{
"errors": []map[string]string{{"message": "unauthorized"}},
gqlServer := newAssertGQLServer(t, func(t *testing.T, req deployMockGraphQLRequest) (int, map[string]any) {
if containsQuery(req.Query, "query GetOffchainWorkflowByName") {
return http.StatusOK, map[string]any{
"data": map[string]any{
"getOffchainWorkflowByName": map[string]any{
"workflow": map[string]any{
"workflowId": "existing-wf-id",
"owner": chainsim.TestAddress,
"createdAt": "2025-01-01T00:00:00Z",
"status": "WORKFLOW_STATUS_ACTIVE",
"workflowName": "test_workflow",
"binaryUrl": "https://example.com/old.wasm",
"configUrl": "",
"tag": "test_workflow",
"attributes": "",
"donFamily": "test-don",
},
},
},
}
}
if containsQuery(req.Query, "mutation UpsertOffchainWorkflow") {
return http.StatusOK, map[string]any{
"data": map[string]any{
"upsertOffchainWorkflow": map[string]any{
"workflow": map[string]any{
"workflowId": "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"owner": chainsim.TestAddress,
"createdAt": "2025-01-01T00:00:00Z",
"status": "WORKFLOW_STATUS_ACTIVE",
"workflowName": "test_workflow",
"binaryUrl": wasmServer.URL + "/binary.wasm",
"configUrl": "",
"tag": "test_workflow",
"attributes": "",
"donFamily": "test-don",
},
},
},
}
}
t.Fatalf("unexpected GraphQL operation: %s", req.Query)
return 0, nil
})
defer gqlServer.Close()

h := newPrivateRegistryExecuteHandler(t, wasmServer.URL+"/binary.wasm", gqlServer.URL)
h.inputs.SkipConfirmation = true
require.NoError(t, h.ValidateInputs())
require.NoError(t, h.Execute(context.Background()))
})

t.Run("surfaces GraphQL errors from private execute upsert path", func(t *testing.T) {
wasmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("workflow wasm payload"))
}))
defer wasmServer.Close()

gqlServer := newAssertGQLServer(t, func(t *testing.T, req deployMockGraphQLRequest) (int, map[string]any) {
if containsQuery(req.Query, "query GetOffchainWorkflowByName") {
return http.StatusOK, map[string]any{
"errors": []map[string]string{{"message": "workflow not found"}},
}
}
if containsQuery(req.Query, "mutation UpsertOffchainWorkflow") {
return http.StatusOK, map[string]any{
"errors": []map[string]string{{"message": "unauthorized"}},
}
}
t.Fatalf("unexpected GraphQL operation: %s", req.Query)
return 0, nil
})
defer gqlServer.Close()

Expand All @@ -631,22 +754,28 @@ func TestExecute_PrivateRegistry(t *testing.T) {
assert.Contains(t, err.Error(), "unauthorized")
})

t.Run("surfaces transport errors from private execute path", func(t *testing.T) {
t.Run("surfaces transport errors from private existence check", func(t *testing.T) {
wasmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("workflow wasm payload"))
}))
defer wasmServer.Close()

gqlServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "server exploded", http.StatusInternalServerError)
}))
gqlServer := newAssertGQLServer(t, func(t *testing.T, req deployMockGraphQLRequest) (int, map[string]any) {
if containsQuery(req.Query, "query GetOffchainWorkflowByName") {
return http.StatusInternalServerError, map[string]any{
"errors": []map[string]string{{"message": "server exploded"}},
}
}
t.Fatalf("unexpected GraphQL operation: %s", req.Query)
return 0, nil
})
defer gqlServer.Close()

h := newPrivateRegistryExecuteHandler(t, wasmServer.URL+"/binary.wasm", gqlServer.URL)
require.NoError(t, h.ValidateInputs())
err := h.Execute(context.Background())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to register workflow in private registry")
assert.Contains(t, err.Error(), "failed to check if workflow exists")
})
}

Expand All @@ -655,14 +784,6 @@ type deployMockGraphQLRequest struct {
Variables map[string]interface{} `json:"variables"`
}

func newMockGQLServer(t *testing.T, response map[string]any) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(response)
}))
}

func newAssertGQLServer(
t *testing.T,
handler func(t *testing.T, req deployMockGraphQLRequest) (status int, response map[string]any),
Expand All @@ -681,6 +802,10 @@ func newAssertGQLServer(
}))
}

func containsQuery(query, operation string) bool {
return query != "" && strings.Contains(query, operation)
}

func newPrivateRegistryExecuteHandler(t *testing.T, wasmURL, gqlURL string) *handler {
t.Helper()
simulatedEnvironment := chainsim.NewSimulatedEnvironment(t)
Expand Down
Loading
Loading