Skip to content

Commit

Permalink
YAR-14287: Refactor recordsize logic (MaterializeInc#108)
Browse files Browse the repository at this point in the history
* Abstract the record size logic

* Fix Kafka Dry run

* Make the recordSizePayload column nullable
  • Loading branch information
bobbyiliev authored and Andre Rosa committed Feb 12, 2024
1 parent 562b9f2 commit 3211051
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
run: docker exec datagen datagen -s /tests/products.sql -f postgres -n 3

- name: Produce data to Postgres with multiple tables
run: docker exec datagen datagen -s /tests/schema2.sql -f postgres -n 3
run: docker exec datagen datagen -s /tests/schema2.sql -f postgres -n 3 -rs 1000

- name: Docker Compose Down
run: docker compose down -v
11 changes: 1 addition & 10 deletions src/kafkaDataGenerator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import alert from 'cli-alerts';
import recordSize from './utils/recordSize.js';
import { KafkaProducer } from './kafka/producer.js';
import { generateMegaRecord } from './schemas/generateMegaRecord.js';
import { OutputFormat } from './formats/outputFormat.js';
Expand All @@ -20,11 +19,6 @@ export default async function kafkaDataGenerator({
initialSchema: string;
}): Promise<void> {

let payload: string;
if (global.recordSize) {
payload = await recordSize();
}

let producer: KafkaProducer | null = null;
if (global.dryRun !== true) {
let outputFormat: OutputFormat;
Expand Down Expand Up @@ -55,16 +49,13 @@ export default async function kafkaDataGenerator({
key = record[megaRecord[topic].key];
}

if (global.recordSize) {
record.recordSizePayload = payload;
}

if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping record production...`,
msg: `\n Topic: ${topic} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}`
});
continue;
}

await producer?.send(key, record, topic);
Expand Down
15 changes: 13 additions & 2 deletions src/postgres/createTables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,21 @@ export default async function createTables(schema: any, initialSchemaPath: strin
const queries = initialSchema.split(';');

for (const query of queries) {
let extendedQuery = query.trim();
// Add ; to the end of the query if it's missing
if (!extendedQuery.endsWith(';')) {
extendedQuery += ';';
}
// If the global option is enabled, add the recordSizePayload column to the table creation query
if (global.recordSize && extendedQuery.toLowerCase().startsWith('create table')) {
extendedQuery = extendedQuery.replace(/\);/g, ', recordSizePayload TEXT NULL);');
}

try {
if (query.trim()) {
const correctedSql = query.replace(/`/g, '"').replace(/COMMENT '.*'/g, '').replace(/datetime/g, 'timestamp');
if (extendedQuery) {
const correctedSql = extendedQuery.replace(/`/g, '"').replace(/COMMENT '.*'/g, '').replace(/datetime/g, 'timestamp');
await client.query(correctedSql);
console.log(correctedSql);
}
} catch (error) {
alert({
Expand Down
23 changes: 12 additions & 11 deletions src/postgresDataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export default async function postgresDataGenerator({
iterations: number;
initialSchema: string;
}): Promise<void> {

// Database client setup
let client = null;
if (global.dryRun) {
Expand Down Expand Up @@ -49,38 +48,40 @@ export default async function postgresDataGenerator({
name: `Creating tables...`,
msg: ``
});
client && await createTables(schema, initialSchema);
client && (await createTables(schema, initialSchema));
}
}

for (const table in megaRecord) {
for await (const record of megaRecord[table].records) {
console.log(`\n Table: ${table} \n Record: ${JSON.stringify(record)}`);
console.log(
`\n Table: ${table} \n Record: ${JSON.stringify(record)}`
);

let key = null;
if (record[megaRecord[table].key]) {
key = record[megaRecord[table].key];
}

if (global.recordSize) {
record.recordSizePayload = payload;
}

if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping record production...`,
msg: `\n Table: ${table} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}`
msg: `\n Table: ${table} \n Record key: ${key} \n Payload: ${JSON.stringify(
record
)}`
});
}

// Insert record into Postgres
if (!global.dryRun) {
try {
const values = Object.values(record);
const placeholders = values.map((_, index) => `$${index + 1}`).join(', ');
const placeholders = values
.map((_, index) => `$${index + 1}`)
.join(', ');
const query = `INSERT INTO ${table} VALUES (${placeholders})`;
client && await client.query(query, values);
client && (await client.query(query, values));
} catch (err) {
console.error(err);
}
Expand All @@ -91,5 +92,5 @@ export default async function postgresDataGenerator({
await sleep(global.wait);
}

client && await client.end();
client && (await client.end());
}
11 changes: 11 additions & 0 deletions src/schemas/generateMegaRecord.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { faker } from '@faker-js/faker';
import alert from 'cli-alerts';
import recordSize from '../utils/recordSize.js';

export async function generateRandomRecord(fakerRecord: any, generatedRecord: any = {}) {
// helper function to generate a record from json schema with faker data
Expand Down Expand Up @@ -137,5 +138,15 @@ export async function generateMegaRecord(schema: any) {
existingRecord = await generateRandomRecord(fakerRecord, existingRecord);
}
}

if (global.recordSize) {
for (const topic in megaRecord) {
let payload: string = await recordSize();
for (let record of megaRecord[topic].records) {
record["recordSizePayload"] = payload;
}
}
}

return megaRecord;
}
9 changes: 0 additions & 9 deletions src/webhookDataGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import alert from 'cli-alerts';
import { generateMegaRecord } from './schemas/generateMegaRecord.js';
import { OutputFormat } from './formats/outputFormat.js';
import sleep from './utils/sleep.js';
import recordSize from './utils/recordSize.js';
import asyncGenerator from './utils/asyncGenerator.js';
import webhookConfig from './webhook/webhookConfig.js';

Expand Down Expand Up @@ -36,11 +35,6 @@ export default async function webhookDataGenerator({
client = await webhookConfig();
}

let payload: string;
if (global.recordSize) {
payload = await recordSize();
}

for await (const iteration of asyncGenerator(iterations)) {
global.iterationIndex = iteration;
const megaRecord = await generateMegaRecord(schema);
Expand All @@ -58,9 +52,6 @@ export default async function webhookDataGenerator({
const handler = async (megaRecord: any, iteration: number) => {
for (const endpoint in megaRecord) {
for await (const record of megaRecord[endpoint].records) {
if (global.recordSize) {
record.recordSizePayload = payload;
}

if (global.dryRun) {
alert({
Expand Down
5 changes: 5 additions & 0 deletions tests/datagen.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ describe('Test record size', () => {
const output = datagen(`-s ${schema} -n 2 -rs 100`);
expect(output).toContain('recordSizePayload');
});
test('should contain the recordSizePayload if record size is set with Postgres destinations', () => {
const schema = './tests/products.sql';
const output = datagen(`-s ${schema} -f postgres -n 2 -rs 100`);
expect(output).toContain('recordSizePayload');
});
});

describe('Test sql output', () => {
Expand Down

0 comments on commit 3211051

Please sign in to comment.