Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Avro output support #42

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 84 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Data generator tool for developers and QA engineers.

[TOC]

## Building

Building standalone application:
Expand Down Expand Up @@ -96,6 +98,7 @@ output {
}

transformers: ["json-prettify"]
validators = ["json", "missing-vars"]
}
```

Expand Down Expand Up @@ -129,9 +132,72 @@ output {
}
}
transformers = ["json-minify"]
validators = ["json", "missing-vars"]
}
```

- **decode-input-as-key-value**: true/false - decode input template as key/value json

```json
{
"key": ...
"value": {...}
}
```

#### Kafka AVRO output

```properties
output {
writer {
type = kafka-avro-output

topic = ${?KAFKA_TOPIC}
topic = "logs-avro"

bootstrap-servers = ${?KAFKA_BOOTSTRAP_SERVERS}
bootstrap-servers = "localhost:9095"

batch-size = 1000

headers {
key = value
}

decode-input-as-key-value = true

producer-config {
compression-type = gzip
in-flight-requests = 1
linger-ms = 15
max-batch-size-bytes = 1024
max-request-size-bytes = 512
}

avro-config {
schema-registry-url = "http://localhost:8081"
key-schema = "/path/to/file/key.avsc"
value-schema = "/path/to/file/key.avsc"
auto-register-schemas = false
registry-client-max-cache-size = 1000
}
}
transformers = ["json-minify"]
validators = ["json", "missing-vars"]
}
```

- **key-schema** - path to key schema, Optional.
- **value-schema** - path to value schema, Optional.
- **auto-register-schemas** - register schemas in schema-registry.

How schema resolver works:

- Read from file.
- When file isn't provided, gen4s lookup schema subject from schema registry (topic_name-key or topic_name-value).



#### File System output

```properties
Expand All @@ -142,9 +208,12 @@ output {
filename-pattern = "my-cool-logs-%s.txt"
}
transformers = ["json-prettify"]
validators = ["json", "missing-vars"]
}
```



#### Http output

```properties
Expand All @@ -161,21 +230,21 @@ output {
stop-on-error = true
}
transformers = ["json-minify"]
validators = ["json", "missing-vars"]
}
```



#### Transformers

**json-minify** - transform generated JSON to _compact_ printed JSON - (removes all new lines and spaces).

**json-prettify** - transform generated JSON to _pretty_ printed JSON.


## Schema definition and data generators

Big thanks to https://github.com/azakordonets/fabricator random data generator project!


## Schema definition and data generators

### Static value generator

Expand Down Expand Up @@ -207,71 +276,71 @@ This sampler can be used like template constant (static value).



#### Int number generator.
### Int number generator.

```json
{ "variable": "my-int", "type": "int", "min": 10, "max": 1000 }
```



#### Double number generator.
### Double number generator.

```json
{ "variable": "test-double", "type": "double", "min": 10.5, "max": 15.5, "scale": 6 }
```



#### Boolean generator.
### Boolean generator.

```json
{ "variable": "test-bool", "type": "boolean"}
```



#### String generator.
### String generator.

```json
{ "variable": "test-string", "type": "string", "len": 10}
```



#### String pattern generator.
### String pattern generator.

```json
{ "variable": "test-string-pattern", "type": "pattern", "pattern": "hello-???-###"} // hello-abc-123
```



#### Java UUID field generator.
### Java UUID field generator.

```json
{ "variable": "test-uuid", "type": "uuid" }
```



#### Ip address generator
### Ip address generator

```json
{ "variable": "test-ip", "type": "ip", "ipv6": false }
```



#### Enumeration generator.
### Enumeration generator.

```json
{ "variable": "test-enum", "type": "enum", "oneOf": ["hello", "world"] }
```



#### Env var generator.
### Env var generator.

```json
{ "variable": "test-var", "type": "env-var", "name": "ORG_ID" }
Expand All @@ -295,7 +364,7 @@ OR any env var with `G4S_` prefix, for example `G4S_QA_USERNAME`



#### DateTime generator
### DateTime generator

```json
{ "variable": "test-date", "type": "date", "format": "MM/dd/yyyy", "shiftDays": -10 }
Expand All @@ -313,7 +382,7 @@ OR any env var with `G4S_` prefix, for example `G4S_QA_USERNAME`



#### List generator.
### List generator.

```json
{ "variable": "test-array", "type": "list", "len": 3, "generator": { "variable": "_", "type": "ip" } }
Expand Down
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ ThisBuild / scalafmtOnCompile := true

ThisBuild / scalacOptions += "-Wunused:all"

resolvers ++= Resolver.sonatypeOssRepos("snapshots")
ThisBuild / resolvers ++= Resolver.sonatypeOssRepos("snapshots")
ThisBuild / resolvers += "confluent" at "https://packages.confluent.io/maven/"
ThisBuild / resolvers += "jitpack" at "https://jitpack.io"

lazy val core = project
.in(file("core"))
Expand Down Expand Up @@ -44,7 +46,8 @@ lazy val generators = project
lazy val outputs = project
.in(file("outputs"))
.settings(
name := "gen4s-outputs",
name := "gen4s-outputs",
scalaVersion := Scala3,
libraryDependencies ++= List.concat(
Dependencies.Cats,
Dependencies.CatsEffect,
Expand All @@ -57,6 +60,7 @@ lazy val outputs = project
Dependencies.Enumeratum,
Dependencies.Refined,
Dependencies.Logback,
Dependencies.AvroConverter,
Dependencies.CatsEffectTest,
Dependencies.TestContainers,
Dependencies.ScalaTest
Expand Down
43 changes: 43 additions & 0 deletions examples/kafka-avro/config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
input {
schema = "input.schema.json"
template = "input.template.json"
decode-new-line-as-template = false
}

output {
writer {
type = kafka-avro-output

topic = ${?KAFKA_TOPIC}
topic = "logs"

bootstrap-servers = ${?KAFKA_BOOTSTRAP_SERVERS}
bootstrap-servers = "localhost:9092"

batch-size = 1000

headers {
key = value
}

decode-input-as-key-value = true

producer-config {
compression-type = gzip
in-flight-requests = 1
linger-ms = 15
max-batch-size-bytes = 1024
max-request-size-bytes = 512
}

avro-config {
schema-registry-url = "http://localhost:8081"
key-schema = "./examples/kafka-avro/person-key.avsc"
value-schema = "./examples/kafka-avro/person-value.avsc"
auto-register-schemas = true
registry-client-max-cache-size = 1000
}
}
transformers = ["json-minify"]
validators = ["json", "missing-vars"]
}
7 changes: 7 additions & 0 deletions examples/kafka-avro/input.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"generators": [
{ "variable": "name", "type": "pattern", "pattern": "user-???-###"},
{ "variable": "id", "type": "int", "min": 10, "max": 1000 },
{ "variable": "age", "type": "int", "min": 1, "max": 60 }
]
}
8 changes: 8 additions & 0 deletions examples/kafka-avro/input.template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"key": { "id": {{id}}, "orgId": 1},
"value": {
"username": "{{name}}",
"age": {{age}}
}
}

9 changes: 9 additions & 0 deletions examples/kafka-avro/person-key.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type":"record",
"name":"PersonKey",
"namespace":"io.gen4s",
"fields":[
{"name":"id","type":"int"},
{"name":"orgId","type":"int"}
]
}
9 changes: 9 additions & 0 deletions examples/kafka-avro/person-value.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type":"record",
"name":"Person",
"namespace":"io.gen4s",
"fields":[
{"name":"username","type":"string"},
{"name":"age","type":["null","int"], "default": null}
]
}
2 changes: 1 addition & 1 deletion examples/kafka/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ output {
topic = "logs"

bootstrap-servers = ${?KAFKA_BOOTSTRAP_SERVERS}
bootstrap-servers = "localhost:9095"
bootstrap-servers = "localhost:9092"

batch-size = 100

Expand Down
22 changes: 22 additions & 0 deletions outputs/src/main/scala/io/gen4s/outputs/Output.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.gen4s.outputs

import java.io.File
import java.nio.file.{Path, Paths}

import org.apache.commons.io.FilenameUtils
Expand All @@ -26,6 +27,27 @@ case class KafkaOutput(
def kafkaProducerConfig: KafkaProducerConfig = producerConfig.getOrElse(KafkaProducerConfig.default)
}

case class AvroConfig(
schemaRegistryUrl: String,
keySchema: Option[File] = None,
valueSchema: Option[File] = None,
autoRegisterSchemas: Boolean = false,
registryClientMaxCacheSize: Int = 1000
)

case class KafkaAvroOutput(
topic: Topic,
bootstrapServers: BootstrapServers,
avroConfig: AvroConfig,
decodeInputAsKeyValue: Boolean = false,
headers: Map[String, String] = Map.empty,
batchSize: PosInt = PosInt.unsafeFrom(1000),
producerConfig: Option[KafkaProducerConfig] = None)
extends Output {

def kafkaProducerConfig: KafkaProducerConfig = producerConfig.getOrElse(KafkaProducerConfig.default)
}

sealed abstract class HttpMethods(override val entryName: String) extends EnumEntry

object HttpMethods extends enumeratum.Enum[HttpMethods] {
Expand Down
Loading