Skip to content

Commit

Permalink
Fixing bugs related to avro model processing: (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaeldelio committed Feb 10, 2024
1 parent e3bd3ec commit 6d90cb3
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public static MessageReference toChannelMessage(String channelName, String messa
public static MessageReference toSchema(String schemaName) {
return new MessageReference("#/components/schemas/" + schemaName);
}

public static String extractRefName(String ref) {
if (ref.contains("/")) {
return ref.substring(ref.lastIndexOf('/') + 1);
}
return ref;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AsyncAPI getAsyncAPI() {

/**
* Does the 'heavy work' of building the AsyncAPI documents once. Stores the resulting
* AsyncAPI document or alternativly a catched exception/error in the instance variable asyncAPIResult.
* AsyncAPI document or alternatively a caught exception/error in the instance variable asyncAPIResult.
*/
protected synchronized void initAsyncAPI() {
if (this.asyncAPIResult != null) {
Expand Down Expand Up @@ -90,7 +90,7 @@ protected synchronized void initAsyncAPI() {
}
this.asyncAPIResult = new AsyncAPIResult(asyncAPI, null);

log.debug("AsyncAPI document was build");
log.debug("AsyncAPI document was built");
} catch (Throwable t) {
log.debug("Failed to build AsyncAPI document", t);
this.asyncAPIResult = new AsyncAPIResult(null, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public String registerSchema(Class<?> type) {
log.debug("Registering schema for {}", type.getSimpleName());

Map<String, Schema> schemas = new LinkedHashMap<>(runWithFqnSetting((unused) -> converter.readAll(type)));
String schemaName = getSchemaName(type, schemas);

String schemaName = getSchemaName(type, schemas);
preProcessSchemas(schemas, schemaName, type);
this.schemas.putAll(schemas);
schemas.values().forEach(this::postProcessSchema);
Expand Down Expand Up @@ -170,6 +170,12 @@ private <R> R runWithFqnSetting(Function<Void, R> callable) {
}

private void postProcessSchema(Schema schema) {
schemaPostProcessors.forEach(processor -> processor.process(schema, schemas));
for (SchemasPostProcessor processor : schemaPostProcessors) {
processor.process(schema, schemas);

if (!schemas.containsValue(schema)) {
break;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.schemas.postprocessor;

import io.github.stavshamir.springwolf.asyncapi.v3.model.channel.message.MessageReference;
import io.swagger.v3.oas.models.media.Schema;
import org.springframework.util.StringUtils;

Expand All @@ -22,10 +23,17 @@ public class AvroSchemaPostProcessor implements SchemasPostProcessor {
@Override
public void process(Schema schema, Map<String, Schema> definitions) {
removeAvroSchemas(definitions);
removeAvroProperties(schema);
removeAvroProperties(schema, definitions);
}

private void removeAvroProperties(Schema schema) {
private void removeAvroProperties(Schema schema, Map<String, Schema> definitions) {
if (schema.get$ref() != null) {
String schemaName = MessageReference.extractRefName(schema.get$ref());
if (definitions.containsKey(schemaName)) {
removeAvroProperties(definitions.get(schemaName), definitions);
}
}

Map<String, Schema> properties = schema.getProperties();
if (properties != null) {
Schema schemaPropertySchema = properties.getOrDefault(SCHEMA_PROPERTY, null);
Expand All @@ -37,6 +45,8 @@ private void removeAvroProperties(Schema schema) {
properties.remove(SPECIFIC_DATA_PROPERTY);
}
}

properties.forEach((key, value) -> removeAvroProperties(value, definitions));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

class DefaultSchemasServiceTest {
private final SchemasPostProcessor schemasPostProcessor = Mockito.mock(SchemasPostProcessor.class);
private final SchemasPostProcessor schemasPostProcessor2 = Mockito.mock(SchemasPostProcessor.class);
private final ComponentsService componentsService = new DefaultComponentsService(
List.of(),
List.of(new ExampleGeneratorPostProcessor(new ExampleJsonGenerator()), schemasPostProcessor),
List.of(
new ExampleGeneratorPostProcessor(new ExampleJsonGenerator()),
schemasPostProcessor,
schemasPostProcessor2),
new SwaggerSchemaUtil(),
new SpringwolfConfigProperties());

Expand Down Expand Up @@ -165,6 +171,22 @@ void postProcessorsAreCalled() {
componentsService.registerSchema(FooWithEnum.class);

verify(schemasPostProcessor).process(any(), any());
verify(schemasPostProcessor2).process(any(), any());
}

@Test
void postProcessorIsSkippedWhenSchemaWasRemoved() {
doAnswer(invocationOnMock -> {
Map<String, io.swagger.v3.oas.models.media.Schema> schemas = invocationOnMock.getArgument(1);
schemas.clear();
return null;
})
.when(schemasPostProcessor)
.process(any(), any());

componentsService.registerSchema(FooWithEnum.class);

verifyNoInteractions(schemasPostProcessor2);
}

private String jsonResource(String path) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void avroSchemasAreRemovedTest() {
Map.of("foo", new StringSchema(), "schema", avroSchema, "specificData", avroSpecificData)));

var definitions = new HashMap<String, io.swagger.v3.oas.models.media.Schema>();
definitions.put("schema", schema);
definitions.put("customClassRefUnusedInThisTest", new StringSchema());
definitions.put("org.apache.avro.Schema", new io.swagger.v3.oas.models.media.Schema());
definitions.put("org.apache.avro.ConversionJava.lang.Object", new io.swagger.v3.oas.models.media.Schema());
Expand All @@ -35,6 +36,48 @@ void avroSchemasAreRemovedTest() {

// then
assertThat(schema.getProperties()).isEqualTo(Map.of("foo", new StringSchema()));
assertThat(definitions).isEqualTo(Map.of("customClassRefUnusedInThisTest", new StringSchema()));
assertThat(definitions)
.isEqualTo(Map.of("schema", schema, "customClassRefUnusedInThisTest", new StringSchema()));
}

@Test
void avroSchemasAreRemovedInRefsTest() {
// given
var avroSchema = new io.swagger.v3.oas.models.media.Schema();
avroSchema.set$ref("#/components/schemas/org.apache.avro.Schema");

var avroSpecificData = new io.swagger.v3.oas.models.media.Schema();
avroSpecificData.set$ref("#/components/schemas/org.apache.avro.specific.SpecificData");

var refSchema = new io.swagger.v3.oas.models.media.Schema();
refSchema.setProperties(new HashMap<>(
Map.of("foo", new StringSchema(), "schema", avroSchema, "specificData", avroSpecificData)));

var refProperty = new io.swagger.v3.oas.models.media.Schema();
refProperty.set$ref("#/components/schemas/refSchema");
var schema = new io.swagger.v3.oas.models.media.Schema();
schema.setProperties(new HashMap<>(Map.of("ref", refProperty)));

var definitions = new HashMap<String, io.swagger.v3.oas.models.media.Schema>();
definitions.put("schema", schema);
definitions.put("refSchema", refSchema);
definitions.put("customClassRefUnusedInThisTest", new StringSchema());
definitions.put("org.apache.avro.Schema", new io.swagger.v3.oas.models.media.Schema());
definitions.put("org.apache.avro.ConversionJava.lang.Object", new io.swagger.v3.oas.models.media.Schema());

// when
processor.process(schema, definitions);

// then
assertThat(schema.getProperties()).isEqualTo(Map.of("ref", refProperty));
assertThat(refSchema.getProperties()).isEqualTo(Map.of("foo", new StringSchema()));
assertThat(definitions)
.isEqualTo(Map.of(
"schema",
schema,
"refSchema",
refSchema,
"customClassRefUnusedInThisTest",
new StringSchema()));
}
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
{
"type": "record",
"name": "ExamplePayloadAvroDto",
"namespace": "io.github.stavshamir.springwolf.example.kafka.dto.avro",
"fields": [{
"name": "someString",
"type": ["null", "string"],
"default": null
}, {
"name": "someLong",
"type": ["null", "int"],
"default": null
}, {
"name": "someEnum",
"type": {
"type": "enum",
"name": "ExampleEnum",
"symbols": [
"FOO1",
"FOO2",
"FOO3"
]
"type": "record",
"name": "AnotherPayloadAvroDto",
"fields": [
{
"name": "someEnum",
"type": {
"type": "enum",
"name": "ExampleEnum",
"symbols": [
"FOO1",
"FOO2",
"FOO3"
]
},
"default": "FOO1"
},
"default": "FOO1"
}]
{
"name": "ExamplePayloadAvroDto",
"type": {
"type": "record",
"name": "ExamplePayloadAvroDto",
"fields": [
{
"name": "someString",
"type": ["null", "string"],
"default": null
},
{
"name": "someLong",
"type": "long"
}
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncListener;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation;
import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding;
import io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto;
import io.github.stavshamir.springwolf.example.kafka.dto.avro.AnotherPayloadAvroDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
Expand All @@ -31,7 +31,7 @@ public class AvroConsumer {
description =
"Requires a running kafka-schema-registry. See docker-compose.yml to start it"))
@KafkaAsyncOperationBinding
public void receiveExampleAvroPayload(ExamplePayloadAvroDto payloads) {
public void receiveExampleAvroPayload(AnotherPayloadAvroDto payloads) {
log.info("Received new message in avro-topic: {}", payloads.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.github.stavshamir.springwolf.example.kafka.consumers.AvroConsumer;
import io.github.stavshamir.springwolf.example.kafka.consumers.ExampleConsumer;
import io.github.stavshamir.springwolf.example.kafka.consumers.ProtobufConsumer;
import io.github.stavshamir.springwolf.example.kafka.dto.avro.AnotherPayloadAvroDto;
import io.github.stavshamir.springwolf.example.kafka.dto.avro.ExampleEnum;
import io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto;
import io.github.stavshamir.springwolf.example.kafka.dto.proto.ExamplePayloadProtobufDto;
Expand Down Expand Up @@ -103,13 +104,14 @@ void producerCanUseSpringwolfConfigurationToSendMessage() {
disabledReason = "because it requires a running kafka-schema-registry instance (docker image= ~1GB).")
void producerCanUseSpringwolfConfigurationToSendAvroMessage() {
// given
ExamplePayloadAvroDto payload = new ExamplePayloadAvroDto("foo", 5, ExampleEnum.FOO1);
ExamplePayloadAvroDto payload = new ExamplePayloadAvroDto("foo", 5L);
AnotherPayloadAvroDto anotherPayload = new AnotherPayloadAvroDto(ExampleEnum.FOO1, payload);

// when
springwolfKafkaProducer.send("avro-topic", "key", Map.of(), payload);
springwolfKafkaProducer.send("avro-topic", "key", Map.of(), anotherPayload);

// then
verify(avroConsumer, timeout(10000)).receiveExampleAvroPayload(payload);
verify(avroConsumer, timeout(10000)).receiveExampleAvroPayload(anotherPayload);
}

@Test
Expand Down
Loading

0 comments on commit 6d90cb3

Please sign in to comment.