Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions src/buffer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import {
DEFAULT_BUFFER_SIZE,
DEFAULT_MAX_BUFFER_SIZE,
} from "./index";
import {
isInteger,
timestampToMicros,
timestampToNanos,
TimestampUnit,
} from "../utils";
import { isInteger, TimestampUnit } from "../utils";

// Default maximum length for table and column names.
const DEFAULT_MAX_NAME_LENGTH = 127;
Expand Down Expand Up @@ -269,6 +264,12 @@ abstract class SenderBufferBase implements SenderBuffer {
return this;
}

protected abstract writeTimestamp(
timestamp: number | bigint,
unit: TimestampUnit,
designated: boolean,
): void;

/**
* Writes a timestamp column with its value into the buffer. <br>
* Use it to insert into TIMESTAMP columns.
Expand All @@ -284,15 +285,18 @@ abstract class SenderBufferBase implements SenderBuffer {
unit: TimestampUnit = "us",
): SenderBuffer {
if (typeof value !== "bigint" && !Number.isInteger(value)) {
throw new Error(`Value must be an integer or BigInt, received ${value}`);
throw new Error(
`Timestamp value must be an integer or BigInt, received ${value}`,
);
}
this.writeColumn(name, value, () => {
const valueMicros = timestampToMicros(BigInt(value), unit);
const valueStr = valueMicros.toString();
this.checkCapacity([valueStr], 1);
this.write(valueStr);
this.write("t");
});
if (unit == "ns" && typeof value !== "bigint") {
throw new Error(
`Timestamp value must be a BigInt if it is set in nanoseconds`,
);
}
this.writeColumn(name, value, () =>
this.writeTimestamp(value, unit, false),
);
return this;
}

Expand All @@ -313,11 +317,14 @@ abstract class SenderBufferBase implements SenderBuffer {
`Designated timestamp must be an integer or BigInt, received ${timestamp}`,
);
}
const timestampNanos = timestampToNanos(BigInt(timestamp), unit);
const timestampStr = timestampNanos.toString();
this.checkCapacity([timestampStr], 2);
if (unit == "ns" && typeof timestamp !== "bigint") {
throw new Error(
`Designated timestamp must be a BigInt if it is set in nanoseconds`,
);
}
this.checkCapacity([], 1);
this.write(" ");
this.write(timestampStr);
this.writeTimestamp(timestamp, unit, true);
this.write("\n");
this.startNewRow();
}
Expand Down
18 changes: 18 additions & 0 deletions src/buffer/bufferv1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { SenderOptions } from "../options";
import { SenderBuffer } from "./index";
import { SenderBufferBase } from "./base";
import { timestampToMicros, timestampToNanos, TimestampUnit } from "../utils";

/**
* Buffer implementation for protocol version 1. <br>
Expand Down Expand Up @@ -39,6 +40,23 @@ class SenderBufferV1 extends SenderBufferBase {
return this;
}

protected writeTimestamp(
timestamp: number | bigint,
unit: TimestampUnit = "us",
designated: boolean,
): void {
const biValue = BigInt(timestamp);
const timestampValue = designated
? timestampToNanos(biValue, unit)
: timestampToMicros(biValue, unit);
const timestampStr = timestampValue.toString();
this.checkCapacity([timestampStr], 2);
this.write(timestampStr);
if (!designated) {
this.write("t");
}
}

/**
* Array columns are not supported in protocol v1.
*
Expand Down
30 changes: 29 additions & 1 deletion src/buffer/bufferv2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
import { SenderOptions } from "../options";
import { SenderBuffer } from "./index";
import { SenderBufferBase } from "./base";
import { ArrayPrimitive, getDimensions, validateArray } from "../utils";
import {
ArrayPrimitive,
getDimensions,
timestampToMicros,
TimestampUnit,
validateArray,
} from "../utils";

// Column type constants for protocol v2.
const COLUMN_TYPE_DOUBLE: number = 10;
Expand Down Expand Up @@ -53,6 +59,28 @@ class SenderBufferV2 extends SenderBufferBase {
return this;
}

protected writeTimestamp(
value: number | bigint,
unit: TimestampUnit = "us",
): void {
let biValue: bigint;
let suffix: string;
switch (unit) {
case "ns":
biValue = BigInt(value);
suffix = "n";
break;
default:
biValue = timestampToMicros(BigInt(value), unit);
suffix = "t";
}

const timestampStr = biValue.toString();
this.checkCapacity([timestampStr], 2);
this.write(timestampStr);
this.write(suffix);
}

/**
* Write an array column with its values into the buffer using v2 format.
*
Expand Down
2 changes: 1 addition & 1 deletion src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class SenderOptions {
* If TCP transport is used, the protocol version will default to 1.
* In case of HTTP transport the <i>/settings</i> endpoint of the database is used to find the protocol versions
* supported by the server, and the highest will be selected.
* When calling the <i>/settings</i> endpoint the timeout and TLs options are used from the <i>options</i> object.
* When calling the <i>/settings</i> endpoint the timeout and TLS options are used from the <i>options</i> object.
* @param {SenderOptions} options SenderOptions instance needs resolving protocol version
*/
static async resolveAuto(options: SenderOptions) {
Expand Down
2 changes: 1 addition & 1 deletion test/logging.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
vi,
} from "vitest";

import { Logger } from "../src/logging";
import { Logger } from "../src";

describe("Default logging suite", function () {
const error = vi.spyOn(console, "error").mockImplementation(() => {});
Expand Down
2 changes: 1 addition & 1 deletion test/options.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Agent } from "undici";

import { SenderOptions } from "../src/options";
import { SenderOptions } from "../src";
import { MockHttp } from "./util/mockhttp";
import { readFileSync } from "fs";

Expand Down
Loading