Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can a runnable demo be added? #56

Open
FungYuu opened this issue Mar 18, 2025 · 11 comments
Open

Can a runnable demo be added? #56

FungYuu opened this issue Mar 18, 2025 · 11 comments

Comments

@FungYuu
Copy link
Contributor

FungYuu commented Mar 18, 2025

Does FlowGram support running?

Image

@xiamidaxia
Copy link
Collaborator

We have plans to implement a sandbox runtime with NodeJS, but we haven't started yet, and this requires designing a backend service.

@FungYuu
Copy link
Contributor Author

FungYuu commented Mar 19, 2025

We have plans to implement a sandbox runtime with NodeJS, but we haven't started yet, and this requires designing a backend service.

Thank you for your reply. I look forward to more complete documentation.

@linonetwo
Copy link

Hope the runtime supports pluggable presistance, so it is easier to understand, and won't hard require external services like database or k8s.

@xiamidaxia
Copy link
Collaborator

@linonetwo
Copy link

Very elegant, Bytedance quality, thanks!

@linonetwo
Copy link

看错了,原来意思是可以方便地接入第三方的工作流执行器……

@SilenceShine
Copy link

We have plans to implement a sandbox runtime with NodeJS, but we haven't started yet, and this requires designing a backend service.

字节也是nodejs 做的 sandbox吗 ,能否参与贡献

@xiamidaxia
Copy link
Collaborator

字节不是 nodejs,用的 go

@linonetwo
Copy link

linonetwo commented Apr 4, 2025

mastra 的分支、循环等控制流管理做得不好,我试了一下转换起来有点别扭。

也可以参考 bpmn 可以通过 AST 转换来声明式地构建:

反正底层本质上都是状态机,所以也可以基于状态机库来做,优点是前后端都可以执行(mastra、bpmn-server 无法在前端直接试运行):

这种节点连线编程,传统上叫 FBP,有很多现有的运行时可以直接用:

@didnhdj2
Copy link

didnhdj2 commented Apr 9, 2025

https://mastra.ai/examples/workflows/creating-a-workflow

How can I integrate Mistral with Flowgram to create an executable workflow?
Are there any existing demo implementations or tutorials for this integration?
Appreciate your team’s work on this!

@linonetwo
Copy link

linonetwo commented Apr 9, 2025

@didnhdj2 Github copilot can write it in a minute. Here is my result (not tested):

But as I said above, its' if statement doesn't well designed, not easy to transpile. I'm going to use xstate in my project instead.

/**
 * 工作流构建器
 * 
 * 该模块负责将 WorkflowJSON 图描述转换为基于 @mastra/core 的执行器。
 * 用户可以自定义节点构建器函数来创建自己的工作流节点类型。
 */
import { Step, Workflow } from "@mastra/core";
import { z } from "zod";
import { WorkflowJSON, WorkflowNodeJSON } from '@flowgram.ai/free-layout-editor';

export type StepBuilder = (node: WorkflowNodeJSON, context: BuilderContext) => Step;
export type NodeBuilders = Record<string, StepBuilder>;

export interface BuilderContext {
  workflow: Workflow;
  stepGraph: Map<string, Step>;
  adjacencyList: Map<string, {targetId: string, sourcePortId?: string}[]>;
  predecessors: Map<string, string[]>;
}

/**
 * 起始节点构建器
 */
function start(node: WorkflowNodeJSON, context: BuilderContext): Step {
  const step = new Step({
    id: node.id,
    execute: async () => ({
      ...node.data.outputs?.properties
    })
  });
  
  // 起始节点直接添加到工作流
  context.workflow.step(step);
  
  return step;
}

/**
 * 条件节点构建器
 */
function condition(node: WorkflowNodeJSON, context: BuilderContext): Step {
  return new Step({
    id: node.id,
    execute: async ({ context: runContext }) => {
      const conditions = node.data.inputsValues?.conditions || [];
      const results: Record<string, boolean> = {};
      
      for (const condition of conditions) {
        const sourceStep = runContext.getStepResult(condition.value.content);
        results[condition.key] = Boolean(sourceStep);
      }
      
      return results;
    }
  });
}

/**
 * LLM 节点构建器
 */
async function executeLLM(params: any) {
  return `LLM response for ${JSON.stringify(params)}`;
}

function llm(node: WorkflowNodeJSON, context: BuilderContext): Step {
  return new Step({
    id: node.id,
    execute: async () => ({
      result: await executeLLM(node.data.inputsValues)
    })
  });
}

/**
 * 循环节点构建器
 */
function loop(node: WorkflowNodeJSON, context: BuilderContext): Step {
  // 处理子工作流
  const loopWorkflow = new Workflow({
    name: `${node.id}-loop-workflow`,
    triggerSchema: z.object({ inputValue: z.any() })
  });
  
  if (node.blocks && node.edges) {
    // 递归构建子工作流
    const subflowData = { nodes: node.blocks, edges: node.edges };
    const subContext = buildStepGraph(subflowData, nodeBuilders, loopWorkflow);
    connectWorkflowSteps(subContext);
    loopWorkflow.commit();
  }
  
  // 创建循环执行的步骤
  return new Step({
    id: node.id,
    execute: async ({ context: runContext }) => {
      const loopTimes = node.data.inputsValues?.loopTimes || 1;
      let result;
      
      for (let i = 0; i < loopTimes; i++) {
        // 每次循环执行子工作流
        result = await loopWorkflow.trigger({ inputValue: i });
      }
      
      return { result };
    }
  });
}

/**
 * 结束节点构建器
 */
function end(node: WorkflowNodeJSON, context: BuilderContext): Step {
  return new Step({
    id: node.id,
    execute: async ({ context: runContext }) => {
      // 结束节点通常聚合前面的结果
      return {
        result: node.data.outputs?.properties?.result || "Workflow completed"
      };
    }
  });
}

// 默认节点构建器集合
const nodeBuilders: NodeBuilders = {
  start,
  condition,
  llm,
  loop,
  end,
};

/**
 * 从工作流JSON构建步骤图
 */
function buildStepGraph(flowData: WorkflowJSON, builders: NodeBuilders, workflow: Workflow): BuilderContext {
  const context: BuilderContext = {
    workflow,
    stepGraph: new Map<string, Step>(),
    adjacencyList: new Map(),
    predecessors: new Map()
  };

  // 构建邻接表和前驱节点表
  flowData.edges.forEach(edge => {
    // 邻接表
    if (!context.adjacencyList.has(edge.sourceNodeID)) {
      context.adjacencyList.set(edge.sourceNodeID, []);
    }
    context.adjacencyList.get(edge.sourceNodeID)!.push({
      targetId: edge.targetNodeID,
      sourcePortId: edge.sourcePortID
    });
    
    // 前驱节点表
    if (!context.predecessors.has(edge.targetNodeID)) {
      context.predecessors.set(edge.targetNodeID, []);
    }
    context.predecessors.get(edge.targetNodeID)!.push(edge.sourceNodeID);
  });

  // 首先创建所有节点的步骤
  flowData.nodes.forEach(node => {
    const builder = builders[node.type];
    if (!builder) {
      console.warn(`Unknown node type: ${node.type}`);
      return;
    }
    
    const step = builder(node, context);
    context.stepGraph.set(node.id, step);
  });

  return context;
}

/**
 * 连接工作流步骤,构建执行路径
 */
function connectWorkflowSteps(context: BuilderContext): void {
  const { workflow, stepGraph, adjacencyList, predecessors } = context;
  const processedNodes = new Set<string>();
  
  function processNode(nodeId: string): void {
    if (processedNodes.has(nodeId)) return;
    processedNodes.add(nodeId);
    
    const step = stepGraph.get(nodeId);
    if (!step) return;
    
    const connections = adjacencyList.get(nodeId) || [];
    
    connections.forEach(conn => {
      const targetStep = stepGraph.get(conn.targetId);
      if (!targetStep) return;
      
      if (conn.sourcePortId) {
        // 有条件的连接
        workflow
          .after(step)
          .if(async ({ context: runContext }) => {
            const sourceResult = runContext.getStepResult(nodeId);
            if (!sourceResult || sourceResult.status !== 'success') return false;
            return Boolean(sourceResult.output?.[conn.sourcePortId!]);
          })
          .then(targetStep);
      } else {
        // 无条件的连接
        workflow.after(step).then(targetStep);
      }
    });
    
    // 处理所有后继节点
    connections.forEach(conn => {
      processNode(conn.targetId);
    });
  }
  
  // 从起始节点开始处理
  for (const [nodeId, node] of stepGraph.entries()) {
    if (!predecessors.has(nodeId) || predecessors.get(nodeId)!.length === 0) {
      processNode(nodeId);
    }
  }
}

/**
 * 构建工作流
 * 
 * 将 WorkflowJSON 转换为可执行的 Workflow 对象
 * 
 * @param flowData 工作流JSON描述
 * @param builders 节点构建器集合
 * @param options 工作流配置选项
 * @returns 构建好的工作流
 */
export function buildWorkflow(
  flowData: WorkflowJSON,
  builders: NodeBuilders = nodeBuilders,
  options: {
    name?: string;
    triggerSchema?: z.ZodType;
  } = {}
): Workflow {
  const workflow = new Workflow({
    name: options.name ?? "dynamic-workflow",
    triggerSchema: options.triggerSchema ?? z.object({
      inputValue: z.any()
    })
  });

  // 创建步骤图结构
  const context = buildStepGraph(flowData, builders, workflow);
  
  // 连接步骤
  connectWorkflowSteps(context);
  
  workflow.commit();
  return workflow;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants