Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/sdk/base-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { cre, type Environment } from "@cre/sdk/cre";
import { type HandlerEntry } from "@cre/sdk/workflow";
import type { ConfigHandlerParams } from "./utils/config";

/**
* Abstract base class for all CRE workflows
* Provides common functionality and patterns
*/
export abstract class Workflow<TConfig = unknown> {
constructor(private readonly configHandlerParams?: ConfigHandlerParams) {}

/**
* Override this method to define your workflow handlers
*/
protected abstract initHandlers(env: Environment<TConfig>): HandlerEntry[];

/**
* Main workflow initialization - called by the runner
*/
public initWorkflow = (env: Environment<TConfig>) => {
return this.initHandlers(env);
};

/**
* Creates and runs the workflow runner
*/
public async run(): Promise<void> {
try {
const runner = await cre.newRunner<TConfig>(this.configHandlerParams);
await runner.run(this.initWorkflow);
} catch (error) {
console.log("Workflow error:", JSON.stringify(error, null, 2));
throw error;
}
}
}
97 changes: 97 additions & 0 deletions src/sdk/utils/values/consensus-hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { cre } from "@cre/sdk/cre";
import {
type ConsenusAggregator,
getAggregatedValue,
} from "@cre/sdk/utils/values/consensus";
import { type SupportedValueTypes, val } from "@cre/sdk/utils/values/value";

// ===== TYPE HELPERS FOR BETTER TYPE SAFETY =====

// Map value types to their expected input types
type ValueTypeInput = {
string: string;
float64: number;
int64: number | bigint | string;
bigint: bigint;
bool: boolean;
bytes: Uint8Array | ArrayBuffer;
time: Date | number | string;
list: Array<unknown>;
mapValue: Record<string, unknown>;
decimal: string;
from: unknown;
};

// ===== CORE CONSENSUS WRAPPER =====

/**
* Core consensus wrapper with strong typing
* Ensures the function return type matches the value type input requirements
*/
export const useConsensus = <
TValueType extends keyof ValueTypeInput & SupportedValueTypes,
TArgs extends readonly any[],
TReturn extends ValueTypeInput[TValueType]
>(
fn: (...args: TArgs) => Promise<TReturn>,
valueType: TValueType,
aggregationType: ConsenusAggregator
) => {
return async (...args: TArgs): Promise<any> => {
return cre.runInNodeMode(async () => {
const result = await fn(...args);
return getAggregatedValue(
(val as any)[valueType](result),
aggregationType
);
});
};
};

// ===== TYPED CONVENIENCE WRAPPERS =====

/**
* Median consensus for numerical data
* Automatically infers correct return type based on value type
*/
export const useMedianConsensus = <TArgs extends readonly any[]>(
fn: (...args: TArgs) => Promise<number>,
valueType: "float64" | "int64" = "float64"
) => useConsensus(fn, valueType, "median");

/**
* Identical consensus - all nodes must agree exactly
* Supports any value type with proper typing
*/
export const useIdenticalConsensus = <
TValueType extends keyof ValueTypeInput & SupportedValueTypes,
TArgs extends readonly any[],
TReturn extends ValueTypeInput[TValueType]
>(
fn: (...args: TArgs) => Promise<TReturn>,
valueType: TValueType
) => useConsensus(fn, valueType, "identical");

/**
* Common prefix consensus for strings and bytes
*/
export const useCommonPrefixConsensus = <
TValueType extends ("string" | "bytes") & keyof ValueTypeInput,
TArgs extends readonly any[],
TReturn extends ValueTypeInput[TValueType]
>(
fn: (...args: TArgs) => Promise<TReturn>,
valueType: TValueType
) => useConsensus(fn, valueType, "commonPrefix");

/**
* Common suffix consensus for strings and bytes
*/
export const useCommonSuffixConsensus = <
TValueType extends ("string" | "bytes") & keyof ValueTypeInput,
TArgs extends readonly any[],
TReturn extends ValueTypeInput[TValueType]
>(
fn: (...args: TArgs) => Promise<TReturn>,
valueType: TValueType
) => useConsensus(fn, valueType, "commonSuffix");
44 changes: 21 additions & 23 deletions src/sdk/utils/values/consensus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import {
SimpleConsensusInputsSchema,
} from "@cre/generated/sdk/v1alpha/sdk_pb";

export { AggregationType } from "@cre/generated/sdk/v1alpha/sdk_pb";

const consensusAggregators = [
"median",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these exist as generated constants from the protos?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protos exposes these as ENUM with numeric values which is not perfect :S but yeah it's called AggregationType I believe.

"identical",
"commonPrefix",
"commonSuffix",
] as const;
export type ConsenusAggregator = (typeof consensusAggregators)[number];

export const consensusDescriptorMedian = create(ConsensusDescriptorSchema, {
descriptor: {
case: "aggregation",
Expand Down Expand Up @@ -45,6 +55,13 @@ export const consensusDescriptorCommonSuffix = create(
}
);

const consensusAggregatorsMap = {
median: consensusDescriptorMedian,
identical: consensusDescriptorIdentical,
commonPrefix: consensusDescriptorCommonPrefix,
commonSuffix: consensusDescriptorCommonSuffix,
} as const;

export const createConsensusDescriptorAggregation = (
aggregation: AggregationType
) =>
Expand Down Expand Up @@ -126,28 +143,9 @@ export const observationError = (message: string): ObservationErrorCase => ({
*/
export const getAggregatedValue = (
value: Value,
consensus: "median" | "identical" | "commonPrefix" | "commonSuffix"
) => {
let aggregation: ConsensusDescriptor;
switch (consensus) {
case "median":
aggregation = consensusDescriptorMedian;
break;
case "identical":
aggregation = consensusDescriptorIdentical;
break;
case "commonPrefix":
aggregation = consensusDescriptorCommonPrefix;
break;
case "commonSuffix":
aggregation = consensusDescriptorCommonSuffix;
break;
default:
throw new Error(`Unknown consensus type: ${consensus}`);
}

return create(SimpleConsensusInputsSchema, {
consensus: ConsenusAggregator
) =>
create(SimpleConsensusInputsSchema, {
observation: observationValue(value),
descriptors: aggregation,
descriptors: consensusAggregatorsMap[consensus],
});
};
14 changes: 13 additions & 1 deletion src/sdk/utils/values/value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ const wrapInternal = (v: unknown): Value => {
throw new Error("unsupported object instance");
};

export type SupportedValueTypes =
| "string"
| "bool"
| "bytes"
| "int64"
| "float64"
| "bigint"
| "time"
| "list"
| "map"
| "decimal";

export const val = {
string: (s: string): Value =>
create(ValueSchema, { value: { case: "stringValue", value: s } }),
Expand Down Expand Up @@ -230,7 +242,7 @@ export const val = {
});
},
from: (v: unknown): Value => wrapInternal(v),
};
} as const;

export type { Value };

Expand Down
53 changes: 53 additions & 0 deletions src/workflows/http-fetch/http-fetch-hook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { z } from "zod";
import { cre, type Environment } from "@cre/sdk/cre";
import { useMedianConsensus } from "@cre/sdk/utils/values/consensus-hooks";

// Config struct defines the parameters that can be passed to the workflow
const configSchema = z.object({
schedule: z.string(),
apiUrl: z.string(),
});

type Config = z.infer<typeof configSchema>;

// Wrap with consensus logic using function call
const fetchMathResult = useMedianConsensus(async (config: Config) => {
const response = await cre.utils.fetch({
url: config.apiUrl,
});
return Number.parseFloat(response.body.trim());
}, "float64");

// This is your handler which will perform the desired action
const onCronTrigger = async (env: Environment<Config>) => {
const aggregatedValue = await fetchMathResult(env.config);
cre.sendResponseValue(cre.utils.val.mapValue({ Result: aggregatedValue }));
};

// InitWorkflow is the required entry point for a CRE workflow
// The runner calls this function to initialize the workflow and register its handlers
const initWorkflow = (env: Environment<Config>) => {
const cron = new cre.capabilities.CronCapability();

return [
cre.handler(
// Use the schedule from our config file
cron.trigger({ schedule: env.config?.schedule }),
onCronTrigger
),
];
};

// main is the entry point for the workflow
export async function main() {
try {
const runner = await cre.newRunner<Config>({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think workflows are starting to look a lot better and really take shape.

configSchema,
});
await runner.run(initWorkflow);
} catch (error) {
console.log("error", JSON.stringify(error, null, 2));
}
}

main();
6 changes: 5 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
// Some stricter flags (disabled by default)
"noUnusedLocals": false,
"noUnusedParameters": false,
"noPropertyAccessFromIndexSignature": false
"noPropertyAccessFromIndexSignature": false,

// Simplify working with consensus
"experimentalDecorators": true,
"emitDecoratorMetadata": true
},
"include": ["src/**/*", "scripts/generate-sdks.ts"],
"exclude": ["ai/**/*"]
Expand Down
Loading