Skip to content

Commit

Permalink
YAR-14287: Refactor: Cleanup codebase (MaterializeInc#78)
Browse files Browse the repository at this point in the history
Co-authored-by: Chuck Larrieu Casias <87353672+chuck-alt-delete@users.noreply.github.com>
  • Loading branch information
2 people authored and Andre Rosa committed Feb 12, 2024
1 parent d31aa21 commit 4a2d74a
Show file tree
Hide file tree
Showing 16 changed files with 223 additions and 280 deletions.
14 changes: 7 additions & 7 deletions examples/blog.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"_meta": {
"topic": "mz_datagen_blog_users",
"key": "id",
"relationships": [
{
"relationships": [
{
"topic": "mz_datagen_blog_posts",
"parent_field": "id",
"child_field": "user_id",
"records_per": 2
}
]
]
},
"id": "datatype.number(100)",
"name": "internet.userName",
Expand All @@ -24,14 +24,14 @@
"_meta": {
"topic": "mz_datagen_blog_posts",
"key": "id",
"relationships": [
{
"relationships": [
{
"topic": "mz_datagen_blog_comments",
"parent_field": "id",
"child_field": "post_id",
"records_per": 2
}
]
]
},
"id": "datatype.number(1000)",
"user_id": "datatype.number(100)",
Expand All @@ -58,4 +58,4 @@
"views": "datatype.number({\"min\": 100, \"max\": 1000})",
"status": "datatype.number(1)"
}
]
]
12 changes: 6 additions & 6 deletions examples/ecommerce.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"_meta": {
"topic": "mz_datagen_ecommerce_users",
"key": "id",
"relationships": [
{
"relationships": [
{
"topic": "mz_datagen_ecommerce_purchases",
"parent_field": "id",
"child_field": "user_id",
"records_per": 4
}
]
]
},
"id": "datatype.number(100)",
"name": "internet.userName",
Expand All @@ -23,14 +23,14 @@
"_meta": {
"topic": "mz_datagen_ecommerce_purchases",
"key": "id",
"relationships": [
"relationships": [
{
"topic": "mz_datagen_ecommerce_items",
"parent_field": "item_id",
"child_field": "id",
"records_per": 1
}
]
]
},
"id": "datatype.number(1000)",
"user_id": "datatype.number(100)",
Expand All @@ -47,4 +47,4 @@
"description": "commerce.productDescription",
"material": "commerce.productMaterial"
}
]
]
125 changes: 23 additions & 102 deletions src/dataGenerator.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import alert from 'cli-alerts';
import crypto from 'crypto';
import createTopic from './kafka/createTopic.js';
import schemaRegistryConfig from './kafka/schemaRegistryConfig.js';
import { kafkaProducer, connectKafkaProducer, disconnectKafkaProducer } from './kafka/producer.js';
import {
getAvroEncodedRecord,
registerSchema,
getAvroSchema
} from './schemas/schemaRegistry.js';

import { KafkaProducer } from './kafka/producer.js';
import { generateMegaRecord } from './schemas/generateMegaRecord.js';
import { OutputFormat } from './formats/outputFormat.js';
import { AvroFormat } from './formats/avroFormat.js';
import { JsonFormat } from './formats/jsonFormat.js';

async function* asyncGenerator(number: number) {
let i = 0;
Expand Down Expand Up @@ -37,56 +32,6 @@ function sleep(s: number) {
return new Promise(resolve => setTimeout(resolve, s));
}

async function prepareTopic(topic: string) {
if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping topic creation...`,
msg: ``
});
return;
}

alert({
type: `success`,
name: `Creating Kafka topics...`,
msg: ``
});

try {
await createTopic(topic);
alert({
type: `success`,
name: `Created topic ${topic}`,
msg: ``
});
} catch (error) {
alert({
type: `error`,
name: `Error creating Kafka topic, try creating it manually...`,
msg: `\n ${error.message}`
});
process.exit(0);
}
}

async function prepareSchema(megaRecord: any, topic: any, registry: any, avroSchemas: any) {
alert({
type: `success`,
name: `Registering Avro schema...`,
msg: ``
});
let avroSchema = await getAvroSchema(
topic,
megaRecord[topic].records[0]
);
let schemaId = await registerSchema(avroSchema, registry);
avroSchemas[topic] = {};
avroSchemas[topic]['schemaId'] = schemaId;
avroSchemas[topic]['schema'] = avroSchema;
return avroSchemas;
}

export default async function dataGenerator({
format,
schema,
Expand All @@ -103,47 +48,33 @@ export default async function dataGenerator({
payload = crypto.randomBytes(global.recordSize).toString('hex');
}

let registry;
let producer;
let avroSchemas = {};
if(global.dryRun !== true){
producer = await connectKafkaProducer();
let producer: KafkaProducer | null = null;
if (global.dryRun !== true) {
let outputFormat: OutputFormat;
if (format === 'avro') {
outputFormat = await AvroFormat.create();
} else if (format === 'json') {
outputFormat = new JsonFormat();
}

producer = await KafkaProducer.create(outputFormat);
}

for await (const iteration of asyncGenerator(number)) {
global.iterationIndex = iteration;
let megaRecord = await generateMegaRecord(schema);

if (iteration == 0) {
if (format == 'avro') {
if (global.dryRun) {
alert({
type: `success`,
name: `Dry run: Skipping schema registration...`,
msg: ``
});
} else {
registry = await schemaRegistryConfig();
}
}
for (const topic in megaRecord) {
await prepareTopic(topic);
if (format == 'avro' && global.dryRun !== true) {
avroSchemas = await prepareSchema(
megaRecord,
topic,
registry,
avroSchemas
);
}
await producer?.prepare(topic, megaRecord);
}
}

for (const topic in megaRecord) {
for await (const record of megaRecord[topic].records) {
let encodedRecord = null;
let recordKey = null;
let key = null;
if (record[megaRecord[topic].key]) {
recordKey = record[megaRecord[topic].key];
key = record[megaRecord[topic].key];
}

if (global.recordSize) {
Expand All @@ -154,26 +85,16 @@ export default async function dataGenerator({
alert({
type: `success`,
name: `Dry run: Skipping record production...`,
msg: `\n Topic: ${topic} \n Record key: ${recordKey} \n Payload: ${JSON.stringify(
record
)}`
msg: `\n Topic: ${topic} \n Record key: ${key} \n Payload: ${JSON.stringify(record)}`
});
} else {
if (format == 'avro') {
encodedRecord = await getAvroEncodedRecord(
record,
registry,
avroSchemas[topic]['schemaId']
);
}
await kafkaProducer(producer, recordKey, record, encodedRecord, topic);
}

await producer?.send(key, record, topic);
}
}

await sleep(global.wait);
}
if (global.dryRun !== true) {
await disconnectKafkaProducer(producer);
}

await producer?.close();
};
69 changes: 69 additions & 0 deletions src/formats/avroFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { SchemaRegistry, SchemaType } from "@kafkajs/confluent-schema-registry";
import { Env } from "../utils/env.js";
import { OutputFormat } from "./outputFormat";

export class AvroFormat implements OutputFormat {
private schemas: any = {};
private registry: SchemaRegistry;

static async create(): Promise<AvroFormat> {
const url = Env.required("SCHEMA_REGISTRY_URL");
const username = Env.optional("SCHEMA_REGISTRY_USERNAME", null);
const password = Env.optional("SCHEMA_REGISTRY_PASSWORD", null);

const configuration = {
host: url
};

if (password && username) {
configuration["auth"] = {
username: username,
password: password
};
}

const registry = new SchemaRegistry(configuration);
return new AvroFormat(registry);
}

constructor(registry: SchemaRegistry) {
this.registry = registry;
}

async register(schema: any, topic: string): Promise<void> {
const options = { subject: `${schema["name"]}-value` }
try {
const resp = await this.registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(schema)
},
options
)

alert({
type: `success`,
name: `Schema registered!`,
msg: `Subject: ${options.subject}, ID: ${resp.id}`
});

this.schemas[topic] = {
'schemaId': resp.id,
'schema': schema
};
} catch (error) {
alert({
type: `error`,
name: `Failed to register schema.`,
msg: `${error}`
});

process.exit(1);
}
}

async encode(record: any, topic: string): Promise<Buffer> {
const schemaId = this.schemas[topic]['schemaId']
const encodedRecord = await this.registry.encode(schemaId, record);
return encodedRecord;
}
}
13 changes: 13 additions & 0 deletions src/formats/jsonFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { OutputFormat } from "./outputFormat";

export class JsonFormat implements OutputFormat {

register(schema: any, topic: string): Promise<void> {
return Promise.resolve();
}

encode(record: any, _: string): Promise<Buffer> {
const value = JSON.stringify(record);
return Promise.resolve(Buffer.from(value));
}
}
5 changes: 5 additions & 0 deletions src/formats/outputFormat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface OutputFormat {
register(schema: any, topic: string): Promise<void>;

encode(record: any, topic: string): Promise<Buffer>;
}
13 changes: 4 additions & 9 deletions src/kafka/cleanKafka.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import kafkaConfig from './kafkaConfig.js';
import axios from 'axios';
import dotenv from 'dotenv';
import alert from 'cli-alerts';
import { Env } from '../utils/env.js';

async function deleteSchemaSubjects(topics: any): Promise<void> {
dotenv.config();
if (!process.env.SCHEMA_REGISTRY_URL) {
console.error("Please set SCHEMA_REGISTRY_URL");
process.exit();
}
const schemaRegistryUrl = Env.required("SCHEMA_REGISTRY_URL");

for await (const topic of topics) {
let url = `${process.env.SCHEMA_REGISTRY_URL}/subjects/${topic}-value?permanent=false`;
let url = `${schemaRegistryUrl}/subjects/${topic}-value?permanent=false`;
await axios.delete(
url,
{
Expand All @@ -33,7 +30,6 @@ async function deleteSchemaSubjects(topics: any): Promise<void> {
}

export default async function cleanKafka(format: string, topics: any): Promise<void> {

if (global.dryRun) {
console.log("This is a dry run, so no resources will be deleted")
return
Expand Down Expand Up @@ -66,5 +62,4 @@ export default async function cleanKafka(format: string, topics: any): Promise<v
} else {
await deleteSchemaSubjects(topics);
}

};
1 change: 0 additions & 1 deletion src/kafka/createTopic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export default async function createTopic(topic: string): Promise<void> {
const topics = await admin.listTopics();

if (!topics.includes(topic)) {

let replicationFactor = await getReplicationFactor(admin);

let topicConfigs = [
Expand Down
Loading

0 comments on commit 4a2d74a

Please sign in to comment.