Skip to content

Commit

Permalink
chore: fix tests for updated Schema Registry client internals (conflu…
Browse files Browse the repository at this point in the history
…entinc#9982) (confluentinc#10003)

Co-authored-by: Victoria Xia <victoria.xia@confluent.io>
  • Loading branch information
2 people authored and lucasbru committed Jul 11, 2023
1 parent 731d485 commit ca17671
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 17 deletions.
Expand Up @@ -28,17 +28,16 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Message;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
Expand Down Expand Up @@ -1228,9 +1227,7 @@ private static Serializer<Object> avroSerializer() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.ticker()).thenReturn(Ticker.systemTicker());
return new KafkaAvroSerializer(schemaRegistryClient, props);
return new KafkaAvroSerializer(new MockSchemaRegistryClient(), props);
}

private static Message protobufRecord() {
Expand All @@ -1250,18 +1247,14 @@ private static Serializer<Message> protobufSerializer() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.ticker()).thenReturn(Ticker.systemTicker());
return new KafkaProtobufSerializer<>(schemaRegistryClient, props);
return new KafkaProtobufSerializer<>(new MockSchemaRegistryClient(), props);
}

private static Serializer<Object> jsonSrSerializer() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.ticker()).thenReturn(Ticker.systemTicker());
return new KafkaJsonSchemaSerializer<>(schemaRegistryClient, props);
return new KafkaJsonSchemaSerializer<>(new MockSchemaRegistryClient(), props);
}
}

Expand Down
Expand Up @@ -3,14 +3,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.connect.ConnectKsqlSchemaTranslator;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -85,8 +82,8 @@ public class KsqlJsonSchemaDeserializerTest {
public void before() throws Exception {
schema = (new JsonSchemaTranslator()).fromConnectSchema(ORDER_SCHEMA.schema());

schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.getSchemaBySubjectAndId(anyString(), anyInt())).thenReturn(schema);
schemaRegistryClient = new MockSchemaRegistryClient();
schemaRegistryClient.register(SOME_TOPIC, schema);

final KsqlJsonSerdeFactory jsonSerdeFactory =
new KsqlJsonSerdeFactory(new JsonSchemaProperties(ImmutableMap.of()));
Expand Down

0 comments on commit ca17671

Please sign in to comment.