Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some sonar issues #2110

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,6 @@ public RetryingKafkaConnectClient(KafkaConnectCluster config) {
super(new RetryingApiClient(config));
}

private static Retry conflictCodeRetry() {
return Retry
.fixedDelay(MAX_RETRIES, RETRIES_DELAY)
.filter(e -> e instanceof WebClientResponseException.Conflict)
.onRetryExhaustedThrow((spec, signal) ->
new KafkaConnectConflictReponseException(
(WebClientResponseException.Conflict) signal.failure()));
}

private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
}

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Expand Down Expand Up @@ -80,6 +63,23 @@ public RetryingApiClient(KafkaConnectCluster config) {
setPassword(config.getPassword());
}

private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
}

private static Retry conflictCodeRetry() {
return Retry
.fixedDelay(MAX_RETRIES, RETRIES_DELAY)
.filter(e -> e instanceof WebClientResponseException.Conflict)
.onRetryExhaustedThrow((spec, signal) ->
new KafkaConnectConflictReponseException(
(WebClientResponseException.Conflict) signal.failure()));
}

@Override
public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
MultiValueMap<String, String> queryParams, Object body,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public KeyedObjectPool<JmxConnectionInfo, JMXConnector> pool() {
return pool;
}

private GenericKeyedObjectPoolConfig poolConfig() {
final var poolConfig = new GenericKeyedObjectPoolConfig();
private GenericKeyedObjectPoolConfig<JMXConnector> poolConfig() {
final var poolConfig = new GenericKeyedObjectPoolConfig<JMXConnector>();
poolConfig.setMaxIdlePerKey(3);
poolConfig.setMaxTotalPerKey(3);
return poolConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();

// we use empty polls counting to verify that partition was fully read
for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < 3; ) {
int emptyPolls = 0;
while (recordsToSend.size() < desiredMsgsToPoll && emptyPolls < 3) {
var polledRecords = poll(sink, consumer, POLL_TIMEOUT);
log.debug("{} records polled from {}", polledRecords.count(), tp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@Slf4j
public class MessageFilters {

private static GroovyScriptEngineImpl GROOVY_ENGINE;
private static GroovyScriptEngineImpl groovyEngine;

private MessageFilters() {
}
Expand Down Expand Up @@ -76,11 +76,11 @@ private static Object parseToJsonOrReturnNull(JsonSlurper parser, @Nullable Stri

private static synchronized GroovyScriptEngineImpl getGroovyEngine() {
// it is pretty heavy object, so initializing it on-demand
if (GROOVY_ENGINE == null) {
GROOVY_ENGINE = (GroovyScriptEngineImpl)
if (groovyEngine == null) {
groovyEngine = (GroovyScriptEngineImpl)
new ScriptEngineManager().getEngineByName("groovy");
}
return GROOVY_ENGINE;
return groovyEngine;
}

private static CompiledScript compileScript(String script) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.provectus.kafka.ui.exception;

public class SchemaConverterException extends RuntimeException {
public SchemaConverterException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.provectus.kafka.ui.exception;

public class SerDeException extends RuntimeException {
public SerDeException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.serde;

import com.provectus.kafka.ui.exception.SerDeException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.serde.schemaregistry.SchemaRegistryAwareRecordSerDe;
import com.provectus.kafka.ui.service.ClustersStorage;
Expand Down Expand Up @@ -42,7 +43,7 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) {
return new SimpleRecordSerDe();
}
} catch (Throwable e) {
throw new RuntimeException("Can't init deserializer", e);
throw new SerDeException("Can't init deserializer", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.exception.SerDeException;
import com.provectus.kafka.ui.model.MessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
Expand All @@ -18,7 +19,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -95,7 +95,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
try {
builder.key(parse(msg.key().get(), descriptor));
builder.keyFormat(MessageFormat.PROTOBUF);
} catch (Throwable e) {
} catch (Exception e) {
log.debug("Failed to deserialize key as protobuf, falling back to string formatter", e);
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
Expand All @@ -107,7 +107,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
try {
builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
builder.valueFormat(MessageFormat.PROTOBUF);
} catch (Throwable e) {
} catch (Exception e) {
log.debug("Failed to deserialize value as protobuf, falling back to string formatter", e);
builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.value().get()));
builder.keyFormat(FALLBACK_FORMATTER.getFormat());
Expand Down Expand Up @@ -153,8 +153,8 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
try {
JsonFormat.parser().merge(key, builder);
keyPayload = builder.build().toByteArray();
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record key for topic " + topic, e);
} catch (Exception e) {
throw new SerDeException("Failed to merge record key for topic " + topic, e);
}
}
}
Expand All @@ -164,8 +164,8 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
try {
JsonFormat.parser().merge(data, builder);
valuePayload = builder.build().toByteArray();
} catch (Throwable e) {
throw new RuntimeException("Failed to merge record value for topic " + topic, e);
} catch (Exception e) {
throw new SerDeException("Failed to merge record value for topic " + topic, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.serde.schemaregistry;

import com.provectus.kafka.ui.exception.SerDeException;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
Expand Down Expand Up @@ -39,8 +40,8 @@ protected Serializer<Object> createSerializer(SchemaRegistryClient client) {
protected Object read(String value, ParsedSchema schema) {
try {
return AvroSchemaUtils.toObject(value, (AvroSchema) schema);
} catch (Throwable e) {
throw new RuntimeException("Failed to serialize record for topic " + topic, e);
} catch (Exception e) {
throw new SerDeException("Failed to serialize record for topic " + topic, e);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.exception.SerDeException;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -43,8 +44,8 @@ protected Message read(String value, ParsedSchema schema) {
try {
JsonFormat.parser().merge(value, builder);
return builder.build();
} catch (Throwable e) {
throw new RuntimeException("Failed to serialize record for topic " + topic, e);
} catch (Exception e) {
throw new SerDeException("Failed to serialize record for topic " + topic, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;

import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.exception.SerDeException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.MessageSchemaDTO;
Expand Down Expand Up @@ -110,8 +111,8 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
fillDeserializedKvBuilder(msg, false, builder);
}
return builder.build();
} catch (Throwable e) {
throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e);
} catch (Exception e) {
throw new SerDeException("Failed to parse record from topic " + msg.topic(), e);
}
}

Expand Down Expand Up @@ -284,7 +285,7 @@ private <T> Optional<T> wrapClientCall(Callable<T> call) {
if (restClientException.getStatus() == 404) {
return Optional.empty();
} else {
throw new RuntimeException("Error calling SchemaRegistryClient", restClientException);
throw new SerDeException("Error calling SchemaRegistryClient", restClientException);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;

@Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ private Mono<List<InternalConsumerGroup>> getConsumerGroups(
});
}

@Deprecated // need to migrate to pagination
/**Deprecated method.
* @deprecated (need to migrate to pagination)
**/
@Deprecated(forRemoval = true)
public Mono<List<InternalConsumerGroup>> getAllConsumerGroups(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(ac -> describeConsumerGroups(ac, null)
Expand Down Expand Up @@ -103,12 +106,6 @@ public Mono<List<InternalConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster
}));
}

@Value
public static class ConsumerGroupsPage {
List<InternalConsumerGroup> consumerGroups;
int totalPages;
}

public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
KafkaCluster cluster,
int page,
Expand Down Expand Up @@ -216,4 +213,10 @@ public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
return new KafkaConsumer<>(props);
}

@Value
public static class ConsumerGroupsPage {
List<InternalConsumerGroup> consumerGroups;
int totalPages;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
@Service
@RequiredArgsConstructor
@Slf4j
@SuppressWarnings("squid:S2589") //False positive. Generated CreateTopicMessageDTO.getPartition() can return null
public class MessagesService {
private final AdminClientService adminClientService;
private final DeserializationService deserializationService;
Expand Down Expand Up @@ -127,7 +128,7 @@ private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster,
}
});
return Mono.fromFuture(cf);
} catch (Throwable e) {
} catch (Exception e) {
return Mono.error(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,11 @@ private <T> Flux<T> failoverAble(Flux<T> request, FailoverFlux<T> failoverMethod

private abstract static class Failover<E> {
private final InternalSchemaRegistry schemaRegistry;
private final Supplier<E> failover;
private final Supplier<E> failoverSupplier;

private Failover(InternalSchemaRegistry schemaRegistry, Supplier<E> failover) {
private Failover(InternalSchemaRegistry schemaRegistry, Supplier<E> failoverSupplier) {
this.schemaRegistry = Objects.requireNonNull(schemaRegistry);
this.failover = Objects.requireNonNull(failover);
this.failoverSupplier = Objects.requireNonNull(failoverSupplier);
}

abstract E error(Throwable error);
Expand All @@ -418,7 +418,7 @@ public E failover(Throwable error) {
&& schemaRegistry.isFailoverAvailable()) {
var uri = ((WebClientRequestException) error).getUri();
schemaRegistry.markAsUnavailable(String.format("%s://%s", uri.getScheme(), uri.getAuthority()));
return failover.get();
return failoverSupplier.get();
}
return error(error);
}
Expand Down
Loading