Skip to content

Conversation

@absorbb
Copy link
Contributor

@absorbb absorbb commented Apr 8, 2025

rotor: don't do excessive event copy if functions pipeline consists of just 1 function
rotor: added support for 'fastFunctions' option that disables copying of events between functions chain steps
rotor: bulker-destination: switched to undici
rotor: small tweaks

…f just 1 function

rotor: added support for 'fastFunctions' option that disables copying of events between functions chain steps
rotor: bulker-destination: switched to undici
rotor: small tweaks
Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI code reviewer found 16 issues

if (err.name !== DropRetryErrorName && err.event) {
const errEvent = err.event;
if (err.name !== DropRetryErrorName) {
const errEvent = err.event || event;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior change in error handling may lead to unexpected processing. The original condition only continued with events if err.event was truthy, but now fallback to original event occurs. This could propagate events that might have been intentionally dropped in error cases.

result = undefined
return result
} catch (err) {
throw err;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed retry policy annotation on errors

const chainRes = await runChain(chain, iterator, user, ctx);
checkError(chainRes);
return chainRes.result;
return runChain(chain, iterator, user, ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execution log and error tracking have been completely removed

}
if (isDropResult(result)) {
result = undefined
return result
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result handling has changed from {result, execLog} object to direct return of result

dispatcher: undiciAgent,
});
if (res.statusCode != 200) {
throw new HTTPError(`HTTP Error: ${res.statusCode}`, res.statusCode, await res.body.text());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message is missing statusText, which was present in the original code. This reduces debugging information when errors occur

);
if (!res.ok) {
throw new HTTPError(`HTTP Error: ${res.status} ${res.statusText}`, res.status, await res.text());
const res = await request(`${bulkerEndpoint}/post/${destinationId}?tableName=${table}`, {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original fetch call included a { log: false } option that's been removed in the new implementation, which might affect logging behavior

error: err,
}];
}
if (!isDropResult(result)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In runSingle, when result is falsy but not a drop result, the original event is lost unlike in runChain

error: err,
}];
}
if (!isDropResult(result)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runSingle doesn't handle array results correctly. It just assigns events = result rather than unpacking arrays like runChain does

if (!isDropResult(result)) {
events = result;
}
return {events, execLog};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runSingle function doesn't ensure events is always an array, unlike runChain which always returns an array for events

@absorbb absorbb marked this pull request as ready for review April 21, 2025 07:58
@absorbb absorbb merged commit 48a7df4 into newjitsu Apr 21, 2025
5 checks passed
Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mrge found 14 issues across 15 files. View them in mrge.io

let execLog = [];
let events = [];
let result = undefined;
try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runSingle function doesn't respect the fastFunctions option that runChain uses, creating inconsistent behavior

// };
try {
result = await f.f(deepCopy(event), ctx);
result = await f.f(fastFunctions ? event : deepCopy(event), ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling fastFunctions option could lead to unexpected side effects if functions modify event objects

const chainRes = await runChain(chain, iterator, user, ctx);
checkError(chainRes);
return chainRes.result;
return runChain(chain, iterator, user, ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed return structure could break consumers expecting {result, execLog} format

result = undefined
return result
} catch (err) {
throw err;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loss of detailed error information from execLog

options: {
...data,
...((workspace.featuresEnabled ?? []).includes("nofetchlogs") ? { fetchLogLevel: "debug" } : {}),
...((workspace.featuresEnabled ?? []).includes("fastFunctions") ? { fastFunctions: true } : {}),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optionsHash is calculated using only the original data object and doesn't include the newly added fastFunctions flag

const chainRes = await runChain(chain, iterator, user, ctx);
checkError(chainRes);
return chainRes.result;
return runChain(chain, iterator, user, ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handling flow has fundamentally changed, removing the intermediary execution log.

error: err,
}];
}
if (!isDropResult(result)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing handling for falsy non-drop results in runSingle

}];
}
if (!isDropResult(result)) {
events = result;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct assignment to events without ensuring it's an array

// };
try {
result = await f.f(deepCopy(event), ctx);
result = await f.f(fastFunctions ? event : deepCopy(event), ctx);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential side effects with fastFunctions optimization

if (err.name !== DropRetryErrorName && err.event) {
const errEvent = err.event;
if (err.name !== DropRetryErrorName) {
const errEvent = err.event || event;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Falling back to the original event may bypass pipeline transformations

pull bot pushed a commit to erickirt/jitsu that referenced this pull request Oct 13, 2025
* rotor: don't do excessive event copy if functions pipeline consists of just 1 function
* rotor: added support for 'fastFunctions' option that disables copying of events between functions chain steps
* rotor: bulker-destination: switched to undici
* rotor: small tweaks
* rotor: undiciAgent set maxRequestsPerClient to 5000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants