Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core-c-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ crate-type = ["cdylib"]
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
crossbeam-utils = "0.8"
futures-util = { version = "0.3", default-features = false }
http = "1.3"
libc = "0.2"
Expand Down
134 changes: 112 additions & 22 deletions core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ typedef struct TemporalCoreRandom TemporalCoreRandom;

typedef struct TemporalCoreRuntime TemporalCoreRuntime;

typedef struct TemporalCoreSlotReserveCompletionCtx TemporalCoreSlotReserveCompletionCtx;

typedef struct TemporalCoreWorker TemporalCoreWorker;

typedef struct TemporalCoreWorkerReplayPusher TemporalCoreWorkerReplayPusher;
Expand Down Expand Up @@ -570,18 +572,17 @@ typedef struct TemporalCoreSlotReserveCtx {
struct TemporalCoreByteArrayRef worker_identity;
struct TemporalCoreByteArrayRef worker_build_id;
bool is_sticky;
void *token_src;
} TemporalCoreSlotReserveCtx;

typedef void (*TemporalCoreCustomReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
void *sender);
typedef void (*TemporalCoreCustomSlotSupplierReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
void *user_data);

typedef void (*TemporalCoreCustomCancelReserveCallback)(void *token_source);
typedef void (*TemporalCoreCustomSlotSupplierCancelReserveCallback)(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
void *user_data);

/**
* Must return C#-tracked id for the permit. A zero value means no permit was reserved.
*/
typedef uintptr_t (*TemporalCoreCustomTryReserveSlotCallback)(const struct TemporalCoreSlotReserveCtx *ctx);
typedef uintptr_t (*TemporalCoreCustomSlotSupplierTryReserveCallback)(const struct TemporalCoreSlotReserveCtx *ctx,
void *user_data);

typedef enum TemporalCoreSlotInfo_Tag {
WorkflowSlotInfo,
Expand Down Expand Up @@ -621,32 +622,87 @@ typedef struct TemporalCoreSlotInfo {
typedef struct TemporalCoreSlotMarkUsedCtx {
struct TemporalCoreSlotInfo slot_info;
/**
* C# id for the slot permit.
* Lang-issued permit ID.
*/
uintptr_t slot_permit;
} TemporalCoreSlotMarkUsedCtx;

typedef void (*TemporalCoreCustomMarkSlotUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx);
typedef void (*TemporalCoreCustomSlotSupplierMarkUsedCallback)(const struct TemporalCoreSlotMarkUsedCtx *ctx,
void *user_data);

typedef struct TemporalCoreSlotReleaseCtx {
const struct TemporalCoreSlotInfo *slot_info;
/**
* C# id for the slot permit.
* Lang-issued permit ID.
*/
uintptr_t slot_permit;
} TemporalCoreSlotReleaseCtx;

typedef void (*TemporalCoreCustomReleaseSlotCallback)(const struct TemporalCoreSlotReleaseCtx *ctx);
typedef void (*TemporalCoreCustomSlotSupplierReleaseCallback)(const struct TemporalCoreSlotReleaseCtx *ctx,
void *user_data);

typedef void (*TemporalCoreCustomSlotImplFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl);
typedef bool (*TemporalCoreCustomSlotSupplierAvailableSlotsCallback)(uintptr_t *available_slots,
void *user_data);

typedef void (*TemporalCoreCustomSlotSupplierFreeCallback)(const struct TemporalCoreCustomSlotSupplierCallbacks *userimpl);

typedef struct TemporalCoreCustomSlotSupplierCallbacks {
TemporalCoreCustomReserveSlotCallback reserve;
TemporalCoreCustomCancelReserveCallback cancel_reserve;
TemporalCoreCustomTryReserveSlotCallback try_reserve;
TemporalCoreCustomMarkSlotUsedCallback mark_used;
TemporalCoreCustomReleaseSlotCallback release;
TemporalCoreCustomSlotImplFreeCallback free;
/**
* Called to initiate asynchronous slot reservation. `ctx` contains information about
* reservation request. The pointer is only valid for the duration of the function call; the
* implementation should copy the data out of it for later use, and return as soon as possible.
*
* When slot is reserved, the implementation should call [`temporal_core_complete_async_reserve`]
* with the same `completion_ctx` as passed to this function. Reservation cannot be cancelled
* by Lang, but it can be cancelled by Core through [`cancel_reserve`](Self::cancel_reserve)
* callback. If reservation was cancelled, [`temporal_core_complete_async_cancel_reserve`]
* should be called instead.
*
* Slot reservation cannot error. The implementation should recover from errors and keep trying
* to reserve a slot until it eventually succeeds, or until reservation is cancelled by Core.
*/
TemporalCoreCustomSlotSupplierReserveCallback reserve;
/**
* Called to cancel slot reservation. `completion_ctx` specifies which reservation is being
* cancelled; the matching [`reserve`](Self::reserve) call was made with the same `completion_ctx`.
* After cancellation, the implementation should call [`temporal_core_complete_async_cancel_reserve`]
* with the same `completion_ctx`. Calling [`temporal_core_complete_async_reserve`] is not
* needed after cancellation.
*/
TemporalCoreCustomSlotSupplierCancelReserveCallback cancel_reserve;
/**
* Called to try an immediate slot reservation. The callback should return 0 if immediate
* reservation is not currently possible, or permit ID if reservation was successful. Permit ID
* is arbitrary, but must be unique among live reservations as it's later used for [`mark_used`](Self::mark_used)
* and [`release`](Self::release) callbacks.
*/
TemporalCoreCustomSlotSupplierTryReserveCallback try_reserve;
/**
* Called after successful reservation to mark slot as used. See [`SlotSupplier`](temporal_sdk_core_api::worker::SlotSupplier)
* trait for details.
*/
TemporalCoreCustomSlotSupplierMarkUsedCallback mark_used;
/**
* Called to free a previously reserved slot.
*/
TemporalCoreCustomSlotSupplierReleaseCallback release;
/**
* Called to retrieve the number of available slots if known. If the implementation knows how
* many slots are available at the moment, it should set the value behind the `available_slots`
* pointer and return true. If that number is unknown, it should return false.
*
* This function pointer can be set to null. It will be treated as if the number of available
* slots is never known.
*/
TemporalCoreCustomSlotSupplierAvailableSlotsCallback available_slots;
/**
* Called when the slot supplier is being dropped. All resources should be freed.
*/
TemporalCoreCustomSlotSupplierFreeCallback free;
/**
* Passed as an extra argument to the callbacks.
*/
void *user_data;
} TemporalCoreCustomSlotSupplierCallbacks;

typedef struct TemporalCoreCustomSlotSupplierCallbacksImpl {
Expand Down Expand Up @@ -984,10 +1040,44 @@ struct TemporalCoreWorkerReplayPushResult temporal_core_worker_replay_push(struc
struct TemporalCoreByteArrayRef workflow_id,
struct TemporalCoreByteArrayRef history);

void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id);
/**
* Completes asynchronous slot reservation started by a call to [`CustomSlotSupplierCallbacks::reserve`].
*
* `completion_ctx` must be the same as the one passed to the matching [`reserve`](CustomSlotSupplierCallbacks::reserve)
* call. `permit_id` is arbitrary, but must be unique among live reservations as it's later used
* for [`mark_used`](CustomSlotSupplierCallbacks::mark_used) and [`release`](CustomSlotSupplierCallbacks::release)
* callbacks.
*
* This function returns true if the reservation was completed successfully, or false if the
* reservation was cancelled before completion. If this function returns false, the implementation
* should call [`temporal_core_complete_async_cancel_reserve`] with the same `completion_ctx`.
*
* **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling
* either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`]
* with the same `completion_ctx` will cause **memory corruption!**
*/
bool temporal_core_complete_async_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx,
uintptr_t permit_id);

void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx,
void *token_ptr);
/**
* Completes cancellation of asynchronous slot reservation.
*
* Cancellation can only be initiated by Core. It's done by calling [`CustomSlotSupplierCallbacks::cancel_reserve`]
* after an earlier call to [`CustomSlotSupplierCallbacks::reserve`].
*
* `completion_ctx` must be the same as the one passed to the matching [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve)
* call.
*
* This function returns true on successful cancellation, or false if cancellation was not
* requested for the given `completion_ctx`. A false value indicates there's likely a logic bug in
* the implementation where it doesn't correctly wait for [`cancel_reserve`](CustomSlotSupplierCallbacks::cancel_reserve)
* callback to be called.
*
* **Caution:** if this function returns true, `completion_ctx` gets freed. Afterwards, calling
* either [`temporal_core_complete_async_reserve`] or [`temporal_core_complete_async_cancel_reserve`]
* with the same `completion_ctx` will cause **memory corruption!**
*/
bool temporal_core_complete_async_cancel_reserve(const struct TemporalCoreSlotReserveCompletionCtx *completion_ctx);

#ifdef __cplusplus
} // extern "C"
Expand Down
Loading
Loading