Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ export class ClientResourceExhausted extends TransportError {
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Slow, false, true, true);
}

export class ClientCancelled extends TransportError {
export class ClientCancelled extends TransportError { // TODO: "Call cancelled" error appears also when connection string is wrong - would be right to avoid such dead lock retrying
static status = StatusCode.CLIENT_CANCELED;
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.No, false, false, false);
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Fast, false, true, false);
}

const TRANSPORT_ERROR_CODES = new Map([
Expand Down
2 changes: 1 addition & 1 deletion src/retries_obsoleted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const RETRYABLE_ERRORS_FAST = [
];
const RETRYABLE_ERRORS_SLOW = [errors.Overloaded, errors.ClientResourceExhausted];

class RetryStrategy {
export class RetryStrategy {
// private logger: Logger;
constructor(
public methodName = 'UnknownClass::UnknownMethod',
Expand Down
20 changes: 17 additions & 3 deletions src/table/table-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import * as grpc from "@grpc/grpc-js";
import EventEmitter from "events";
import {ICreateSessionResult, SessionEvent, TableService} from "./table-session-pool";
import {Endpoint} from "../discovery";
import {retryable} from "../retries_obsoleted";
import {retryable, RetryParameters, RetryStrategy} from "../retries_obsoleted";
import {MissingStatus, MissingValue, SchemeError, YdbError} from "../errors";
import {ResponseMetadataKeys} from "../constants";
import {pessimizable} from "../utils";
Expand Down Expand Up @@ -171,15 +171,21 @@ export class PrepareQuerySettings extends OperationParamsSettings {
}

export class ExecuteQuerySettings extends OperationParamsSettings {
keepInCache: boolean = false;
keepInCache?: boolean = false;
collectStats?: Ydb.Table.QueryStatsCollection.Mode;
onResponseMetadata?: (metadata: grpc.Metadata) => void;
idempotent: boolean = false;

withKeepInCache(keepInCache: boolean) {
this.keepInCache = keepInCache;
return this;
}

withIdempotent(idempotent: boolean) {
this.idempotent = idempotent;
return this;
}

withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) {
this.collectStats = collectStats;
return this;
Expand Down Expand Up @@ -258,6 +264,8 @@ export class ExecuteScanQuerySettings {
}
}

let executeQueryRetryer: RetryStrategy;

export class TableSession extends EventEmitter implements ICreateSessionResult {
private beingDeleted = false;
private free = true;
Expand Down Expand Up @@ -518,7 +526,13 @@ export class TableSession extends EventEmitter implements ICreateSessionResult {
if (keepInCache) {
request.queryCachePolicy = {keepInCache};
}
const response = await this.api.executeDataQuery(request);

if (!executeQueryRetryer) executeQueryRetryer = new RetryStrategy('TableSession:executeQuery', new RetryParameters(), this.logger);

const response =
settings?.idempotent
? await executeQueryRetryer.retry(() => this.api.executeDataQuery(request))
: await this.api.executeDataQuery(request);
const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata));
return ExecuteQueryResult.decode(payload);
}
Expand Down
27 changes: 20 additions & 7 deletions src/utils/test/create-table.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Column, TableDescription, TableSession} from "../../table";
import {withRetries} from "../../retries_obsoleted";
import {AUTO_TX, Column, ExecuteQuerySettings, TableDescription, TableSession} from "../../table";
// import {withRetries} from "../../retries_obsoleted";
import {Types} from "../../types";
import {Row} from "./row";

Expand Down Expand Up @@ -29,10 +29,23 @@ DECLARE $data AS List<Struct<id: Uint64, title: Utf8>>;
REPLACE INTO ${TABLE}
SELECT * FROM AS_TABLE($data);`;

await withRetries(async () => {
const preparedQuery = await session.prepareQuery(query);
await session.executeQuery(preparedQuery, {
// Now we can specify that the operation should be repeated in case of an error by specifying that it is idempotent

// Old code:

// await withRetries(async () => {
// const preparedQuery = await session.prepareQuery(query);
// await session.executeQuery(preparedQuery, {
// '$data': Row.asTypedCollection(rows),
// });
// });

// New code variant:

const preparedQuery = await session.prepareQuery(query);
await session.executeQuery(preparedQuery, {
'$data': Row.asTypedCollection(rows),
});
});
},
AUTO_TX,
new ExecuteQuerySettings().withIdempotent(true));
}