JavaScript/TypeScript SDK for building flowctl sources, processors, and consumers without Go.
Initial scaffold, Node.js-first:
SourceProcessorConsumer- health endpoint
- flowctl control plane registration + heartbeats
- bundled
flowctl/v1protos - Stellar raw ledger helpers via
@stellar/stellar-sdk
npm install @withobsrvr/js-flowctl-sdknpm install @withobsrvr/js-flowctl-sdk
npm install @withobsrvr/js-flowctl-source
npm install @withobsrvr/js-flowctl-processor
npm install @withobsrvr/js-flowctl-consumer
npm install @withobsrvr/js-flowctl-stellarFor local development in this repo:
npm install
npm run buildnpm run proto:generate
npm test
npm run test:ciGitHub Actions CI is included in .github/workflows/ci.yml and runs build + tests on Node 20 and 22.
import { Processor } from '@withobsrvr/js-flowctl-sdk';
const processor = new Processor({
id: 'example-processor',
name: 'Example Processor',
endpoint: ':50051',
healthPort: 8088,
inputEventTypes: ['example.input.v1'],
outputEventTypes: ['example.output.v1'],
flowctl: {
enabled: true,
endpoint: '127.0.0.1:8080'
}
});
processor.onProcess(async (event) => ({
id: `${event.id}-processed`,
type: 'example.output.v1',
payload: event.payload,
metadata: {
...event.metadata,
processed: 'true'
}
}));
await processor.start();See examples/checkpoint-source/index.ts.
This example shows:
- source-managed state updates
- generated resume tokens
- graceful shutdown via
runComponent()
npm install
npm run example:checkpoint-sourceSee examples/flowctl-e2e/.
This is a full flowctl integration example using:
- a JS source
- a JS processor
- a JS consumer
- a generated process-mode pipeline YAML
Run it with:
npm install
npm run build
npm run example:flowctl-e2e:demoSee examples/basic-processor/index.ts.
npm install
npm run example:processorSee examples/payment-processor/index.ts.
This example:
- accepts
stellar.ledger.v1 - decodes
stellar.v1.RawLedgerprotobuf payloads - parses
LedgerCloseMetaXDR with@stellar/stellar-sdk - extracts Stellar payment operations
- emits
stellar.payment_operations.v1as JSON payloads
npm install
npm run example:payment-processorThis repo now includes package directories for:
packages/source→@withobsrvr/js-flowctl-sourcepackages/processor→@withobsrvr/js-flowctl-processorpackages/consumer→@withobsrvr/js-flowctl-consumerpackages/stellar→@withobsrvr/js-flowctl-stellar
These are small focused entrypoints around the main SDK package.
Register an async generator with onProduce().
Register an async handler with onProcess().
Return:
Eventfor one outputEvent[]for fan-outnull/undefinedto drop the input event
Register an async handler with onConsume().
This repo now includes generated static protobuf bindings in:
src/generated/flow-proto.jssrc/generated/flow-proto.d.ts
Generate them with:
npm run proto:generateThey are exported as:
import { flowProto } from '@withobsrvr/js-flowctl-sdk';
const payload = flowProto.stellar.v1.RawLedger.encode({...}).finish();The SDK now uses generated proto message types for message encoding/decoding.
It also exports typed gRPC helpers from src/proto.ts, including:
createSourceServiceClient()createProcessorServiceClient()createConsumerServiceClient()createControlPlaneServiceClient()
The low-level gRPC service loading still uses @grpc/proto-loader, but the public SDK surface now has typed client helpers instead of any-driven access.
Starts a component and attaches graceful signal handling for SIGINT and SIGTERM.
Exported utilities:
SourceStateStoreencodeResumeToken(state)decodeResumeToken(token)
Source handlers now receive a state helper with:
get()set(next)patch(next)getValue(key)
Root package publishing checks:
npm run proto:generate
npm run build
npm run test
npm run pack:checkAll package dry-run checks:
npm run pack:check:allFiles included in the root package are controlled by files in package.json.
GitHub Actions:
.github/workflows/ci.ymlruns build, test, andnpm pack --dry-run.github/workflows/release.ymlprovides manual/tag-based npm publish workflow
For npm auth setup, see .npmrc.example.
- Uses
@grpc/grpc-jsand dynamic proto loading. - Ships the
flowctl/v1proto files used by currentflowctl. - Payload encoding for domain-specific messages is intentionally left to user code.
- The Stellar payment example currently emits JSON for processed output while consuming protobuf/XDR on input.
- This is a solid MVP, not feature parity with the Go SDK yet.