From e453b32ae1faeb42927b641ad060d0293d087d4d Mon Sep 17 00:00:00 2001 From: Marcus Krassmann Date: Mon, 20 Dec 2021 00:16:20 +0100 Subject: [PATCH] fixed many sonar lint findings --- .mvn/wrapper/MavenWrapperDownloader.java | 36 +++++++------------ src/main/java/kafdrop/Kafdrop.java | 5 ++- .../kafdrop/config/CorsConfiguration.java | 2 ++ .../config/HealthCheckConfiguration.java | 4 +-- .../config/InterceptorConfiguration.java | 3 +- .../kafdrop/config/KafkaConfiguration.java | 2 +- .../ProtobufDescriptorConfiguration.java | 12 ++----- .../kafdrop/config/SwaggerConfiguration.java | 10 +++--- .../kafdrop/controller/AclController.java | 4 +-- .../controller/BasicErrorController.java | 6 +++- .../kafdrop/controller/BrokerController.java | 4 +-- .../kafdrop/controller/ClusterController.java | 4 +-- .../controller/ConsumerController.java | 2 +- .../kafdrop/controller/MessageController.java | 10 +++--- .../kafdrop/controller/TopicController.java | 13 +++---- src/main/java/kafdrop/model/AclVO.java | 15 ++++++++ .../service/KafkaHighLevelConsumer.java | 4 --- .../kafdrop/service/KafkaMonitorImpl.java | 30 ++++++++-------- .../kafdrop/util/AvroMessageDeserializer.java | 6 ++-- src/main/java/kafdrop/util/ByteUtils.java | 3 ++ .../util/MsgPackMessageDeserializer.java | 3 +- .../util/ProtobufMessageDeserializer.java | 23 +++++------- ...obufSchemaRegistryMessageDeserializer.java | 8 ++--- src/test/java/kafdrop/KafdropIT.java | 6 +++- 24 files changed, 105 insertions(+), 110 deletions(-) diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java index c32394f1..a49ede0e 100644 --- a/.mvn/wrapper/MavenWrapperDownloader.java +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import java.net.*; import java.io.*; import java.nio.channels.*; @@ -25,7 +26,7 @@ public class MavenWrapperDownloader { * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. */ private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" - + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; /** * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to @@ -54,33 +55,22 @@ public static void main(String args[]) { // wrapperUrl parameter. File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); String url = DEFAULT_DOWNLOAD_URL; - if(mavenWrapperPropertyFile.exists()) { - FileInputStream mavenWrapperPropertyFileInputStream = null; - try { + if (mavenWrapperPropertyFile.exists()) { + try (FileInputStream mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile)) { mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); Properties mavenWrapperProperties = new Properties(); mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); } catch (IOException e) { System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); - } finally { - try { - if(mavenWrapperPropertyFileInputStream != null) { - mavenWrapperPropertyFileInputStream.close(); - } - } catch (IOException e) { - // Ignore ... - } } } System.out.println("- Downloading from: " + url); File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); - if(!outputFile.getParentFile().exists()) { - if(!outputFile.getParentFile().mkdirs()) { - System.out.println( - "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); - } + if (!outputFile.getParentFile().exists() && !outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); } System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); try { @@ -106,12 +96,12 @@ protected PasswordAuthentication getPasswordAuthentication() { }); } URL website = new URL(urlString); - ReadableByteChannel rbc; - rbc = Channels.newChannel(website.openStream()); - FileOutputStream fos = new FileOutputStream(destination); - fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); - fos.close(); - rbc.close(); + try ( + ReadableByteChannel rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination) + ) { + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + } } } diff --git a/src/main/java/kafdrop/Kafdrop.java b/src/main/java/kafdrop/Kafdrop.java index cda1aca6..32c32403 100644 --- a/src/main/java/kafdrop/Kafdrop.java +++ b/src/main/java/kafdrop/Kafdrop.java @@ -43,7 +43,7 @@ @SpringBootApplication public class Kafdrop { - private final static Logger LOG = LoggerFactory.getLogger(Kafdrop.class); + private static final Logger LOG = LoggerFactory.getLogger(Kafdrop.class); public static void main(String[] args) { createApplicationBuilder() @@ -99,8 +99,7 @@ public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) { try { System.setProperty("logging.dir", new File(loggingFile).getParent()); } catch (Exception ex) { - System.err.println("Unable to set up logging.dir from logging.file " + loggingFile + ": " + - Throwables.getStackTraceAsString(ex)); + LOG.error("Unable to set up logging.dir from logging.file {}", loggingFile, ex); } } if (environment.containsProperty("debug") && diff --git a/src/main/java/kafdrop/config/CorsConfiguration.java b/src/main/java/kafdrop/config/CorsConfiguration.java index e4abeab8..a7523956 100644 --- a/src/main/java/kafdrop/config/CorsConfiguration.java +++ b/src/main/java/kafdrop/config/CorsConfiguration.java @@ -69,6 +69,7 @@ public Filter corsFilter() { return new Filter() { @Override public void init(FilterConfig filterConfig) { + // nothing to init } @Override @@ -91,6 +92,7 @@ public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) @Override public void destroy() { + // nothing to destroy } }; } diff --git a/src/main/java/kafdrop/config/HealthCheckConfiguration.java b/src/main/java/kafdrop/config/HealthCheckConfiguration.java index 9c8dcc16..efea1a95 100644 --- a/src/main/java/kafdrop/config/HealthCheckConfiguration.java +++ b/src/main/java/kafdrop/config/HealthCheckConfiguration.java @@ -38,7 +38,7 @@ public HealthCheck(HealthEndpoint healthEndpoint) { } @ManagedAttribute - public Map getHealth() { + public Map getHealth() { final var health = (Health) healthEndpoint.health(); final var healthMap = new LinkedHashMap(); healthMap.put("status", getStatus(health)); @@ -46,7 +46,7 @@ public Map getHealth() { return healthMap; } - private Map getDetails(Map details) { + private Map getDetails(Map details) { return details.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> { diff --git a/src/main/java/kafdrop/config/InterceptorConfiguration.java b/src/main/java/kafdrop/config/InterceptorConfiguration.java index 2eed5707..dbda06e9 100644 --- a/src/main/java/kafdrop/config/InterceptorConfiguration.java +++ b/src/main/java/kafdrop/config/InterceptorConfiguration.java @@ -22,7 +22,6 @@ import org.springframework.stereotype.*; import org.springframework.web.servlet.*; import org.springframework.web.servlet.config.annotation.*; -import org.springframework.web.servlet.handler.*; import javax.servlet.http.*; @@ -39,7 +38,7 @@ public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new ProfileHandlerInterceptor()); } - public class ProfileHandlerInterceptor extends HandlerInterceptorAdapter { + public class ProfileHandlerInterceptor implements AsyncHandlerInterceptor { @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) { final var activeProfiles = environment.getActiveProfiles(); diff --git a/src/main/java/kafdrop/config/KafkaConfiguration.java b/src/main/java/kafdrop/config/KafkaConfiguration.java index e866cf81..6763885c 100644 --- a/src/main/java/kafdrop/config/KafkaConfiguration.java +++ b/src/main/java/kafdrop/config/KafkaConfiguration.java @@ -17,7 +17,7 @@ public final class KafkaConfiguration { private static final Logger LOG = LoggerFactory.getLogger(KafkaConfiguration.class); private String brokerConnect; - private Boolean isSecured = false; + private boolean isSecured = false; private String saslMechanism; private String securityProtocol; private String truststoreFile; diff --git a/src/main/java/kafdrop/config/ProtobufDescriptorConfiguration.java b/src/main/java/kafdrop/config/ProtobufDescriptorConfiguration.java index 87f54caf..e6660587 100644 --- a/src/main/java/kafdrop/config/ProtobufDescriptorConfiguration.java +++ b/src/main/java/kafdrop/config/ProtobufDescriptorConfiguration.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,17 +44,10 @@ public List getDescFilesList() { File path = new File(directory); // apply filter for listing only .desc file - FilenameFilter filter = new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - return name.endsWith(".desc"); - } - - }; + FilenameFilter filter = (dir, name) -> name.endsWith(".desc"); pathnames = path.list(filter); - return Arrays.asList(pathnames); + return Arrays.asList(Objects.requireNonNull(pathnames)); } } } diff --git a/src/main/java/kafdrop/config/SwaggerConfiguration.java b/src/main/java/kafdrop/config/SwaggerConfiguration.java index 01babc9e..59fb36aa 100644 --- a/src/main/java/kafdrop/config/SwaggerConfiguration.java +++ b/src/main/java/kafdrop/config/SwaggerConfiguration.java @@ -18,6 +18,7 @@ package kafdrop.config; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.*; @@ -33,6 +34,7 @@ import java.lang.reflect.Field; import java.util.List; +import java.util.Objects; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -60,7 +62,7 @@ public Docket swagger() { /** * Swagger Predicate for only selecting JSON endpoints. */ - public final class JsonRequestHandlerPredicate implements Predicate { + public static final class JsonRequestHandlerPredicate implements Predicate { @Override public boolean test(RequestHandler input) { return input.produces().contains(MediaType.APPLICATION_JSON); @@ -70,7 +72,7 @@ public boolean test(RequestHandler input) { /** * Swagger Predicate for ignoring {@code /actuator} endpoints. */ - public final class IgnoreDebugPathPredicate implements Predicate { + public static final class IgnoreDebugPathPredicate implements Predicate { @Override public boolean test(String input) { return !input.startsWith("/actuator"); @@ -85,7 +87,7 @@ public static BeanPostProcessor springfoxHandlerProviderBeanPostProcessor() { return new BeanPostProcessor() { @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + public Object postProcessAfterInitialization(@NotNull Object bean, @NotNull String beanName) throws BeansException { if (bean instanceof WebMvcRequestHandlerProvider || bean instanceof WebFluxRequestHandlerProvider) { customizeSpringfoxHandlerMappings(getHandlerMappings(bean)); } @@ -104,7 +106,7 @@ private void customizeSpringfoxHand private List getHandlerMappings(Object bean) { try { Field field = ReflectionUtils.findField(bean.getClass(), "handlerMappings"); - field.setAccessible(true); + Objects.requireNonNull(field).setAccessible(true); return (List) field.get(bean); } catch (IllegalArgumentException | IllegalAccessException e) { throw new IllegalStateException(e); diff --git a/src/main/java/kafdrop/controller/AclController.java b/src/main/java/kafdrop/controller/AclController.java index d1edfeb8..f6abe927 100644 --- a/src/main/java/kafdrop/controller/AclController.java +++ b/src/main/java/kafdrop/controller/AclController.java @@ -26,8 +26,8 @@ import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import java.util.List; @@ -52,7 +52,7 @@ public String acls(Model model) { @ApiResponses(value = { @ApiResponse(code = 200, message = "Success", response = String.class, responseContainer = "List") }) - @RequestMapping(path = "/acl", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/acl", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody List getAllTopics() { return kafkaMonitor.getAcls(); } diff --git a/src/main/java/kafdrop/controller/BasicErrorController.java b/src/main/java/kafdrop/controller/BasicErrorController.java index 8b8664ee..5b2249fa 100644 --- a/src/main/java/kafdrop/controller/BasicErrorController.java +++ b/src/main/java/kafdrop/controller/BasicErrorController.java @@ -1,5 +1,7 @@ package kafdrop.controller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.web.servlet.error.*; import org.springframework.boot.web.error.ErrorAttributeOptions; import org.springframework.boot.web.servlet.error.*; @@ -12,6 +14,8 @@ @Controller public final class BasicErrorController extends AbstractErrorController { + private static final Logger LOG = LoggerFactory.getLogger(BasicErrorController.class); + public BasicErrorController(ErrorAttributes errorAttributes) { super(errorAttributes); } @@ -19,7 +23,7 @@ public BasicErrorController(ErrorAttributes errorAttributes) { @RequestMapping("/error") public ModelAndView handleError(HttpServletRequest request) { final var error = getErrorAttributes(request, ErrorAttributeOptions.of(ErrorAttributeOptions.Include.STACK_TRACE)); - System.out.println("errorAtts: " + error); + LOG.info("errorAtts: {}", error); final var model = Map.of("error", error); return new ModelAndView("error", model); } diff --git a/src/main/java/kafdrop/controller/BrokerController.java b/src/main/java/kafdrop/controller/BrokerController.java index 4eb35e7f..d8df2291 100644 --- a/src/main/java/kafdrop/controller/BrokerController.java +++ b/src/main/java/kafdrop/controller/BrokerController.java @@ -49,7 +49,7 @@ public String brokerDetails(@PathVariable("id") int brokerId, Model model) { @ApiResponse(code = 200, message = "Success", response = BrokerVO.class), @ApiResponse(code = 404, message = "Invalid Broker ID") }) - @RequestMapping(path = "/broker/{id}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/broker/{id}", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody BrokerVO brokerDetailsJson(@PathVariable("id") int brokerId) { return kafkaMonitor.getBroker(brokerId).orElseThrow(() -> new BrokerNotFoundException("No such broker " + brokerId)); } @@ -58,7 +58,7 @@ public String brokerDetails(@PathVariable("id") int brokerId, Model model) { @ApiResponses(value = { @ApiResponse(code = 200, message = "Success", response = BrokerVO.class) }) - @RequestMapping(path = "/broker", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/broker", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody List brokerDetailsJson() { return kafkaMonitor.getBrokers(); } diff --git a/src/main/java/kafdrop/controller/ClusterController.java b/src/main/java/kafdrop/controller/ClusterController.java index efb6b40b..852b1b99 100644 --- a/src/main/java/kafdrop/controller/ClusterController.java +++ b/src/main/java/kafdrop/controller/ClusterController.java @@ -30,7 +30,6 @@ import org.springframework.ui.*; import org.springframework.web.bind.annotation.*; -import java.time.*; import java.util.*; import java.util.stream.*; @@ -93,7 +92,7 @@ public String clusterInfo(Model model, @ApiResponses(value = { @ApiResponse(code = 200, message = "Success", response = ClusterInfoVO.class) }) - @RequestMapping(path = "/", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody ClusterInfoVO getCluster() { final var vo = new ClusterInfoVO(); vo.brokers = kafkaMonitor.getBrokers(); @@ -112,6 +111,7 @@ public String brokerNotFound(Model model) { @ResponseStatus(HttpStatus.OK) @RequestMapping("/health_check") public void healthCheck() { + // only http code shall be checked } /** diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index 96895123..4996f44c 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -51,7 +51,7 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode @ApiResponse(code = 200, message = "Success", response = ConsumerVO.class), @ApiResponse(code = 404, message = "Invalid consumer group") }) - @RequestMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException { final var topicVos = kafkaMonitor.getTopics(); final var consumer = kafkaMonitor.getConsumers(topicVos) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 5272a6dc..949f47e0 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -36,8 +36,6 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ModelAttribute; import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; @@ -210,7 +208,7 @@ private MessageFormat getSelectedMessageFormat(String format) { @ApiResponse(code = 200, message = "Success", response = List.class), @ApiResponse(code = 404, message = "Invalid topic name") }) - @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/messages", produces = MediaType.APPLICATION_JSON_VALUE) + @GetMapping(value = "/topic/{name:.+}/messages", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody List getPartitionOrMessages( @PathVariable("name") String topicName, @@ -264,10 +262,10 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form // filter the input file name final var descFileName = descFile.replace(".desc", "") - .replaceAll("\\.", "") - .replaceAll("/", ""); + .replace(".", "") + .replace("/", ""); final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc"; - deserializer = new ProtobufMessageDeserializer(topicName, fullDescFile, msgTypeName); + deserializer = new ProtobufMessageDeserializer(fullDescFile, msgTypeName); } else if (format == MessageFormat.PROTOBUF) { final var schemaRegistryUrl = schemaRegistryProperties.getConnect(); final var schemaRegistryAuth = schemaRegistryProperties.getAuth(); diff --git a/src/main/java/kafdrop/controller/TopicController.java b/src/main/java/kafdrop/controller/TopicController.java index 8867cdd6..7fc6fc6b 100644 --- a/src/main/java/kafdrop/controller/TopicController.java +++ b/src/main/java/kafdrop/controller/TopicController.java @@ -21,8 +21,6 @@ import io.swagger.annotations.*; import kafdrop.model.*; import kafdrop.service.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.*; import org.springframework.stereotype.*; @@ -37,7 +35,6 @@ @Controller @RequestMapping("/topic") public final class TopicController { - private static final Logger LOG = LoggerFactory.getLogger(TopicController.class); private final KafkaMonitor kafkaMonitor; private final boolean topicDeleteEnabled; private final boolean topicCreateEnabled; @@ -60,7 +57,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model) return "topic-detail"; } - @RequestMapping(value = "/{name:.+}/delete", method = RequestMethod.POST) + @PostMapping(value = "/{name:.+}/delete") public String deleteTopic(@PathVariable("name") String topicName, Model model) { if (!topicDeleteEnabled) { model.addAttribute("deleteErrorMessage", "Not configured to be deleted."); @@ -93,7 +90,7 @@ public String createTopicPage(Model model) { @ApiResponse(code = 200, message = "Success", response = TopicVO.class), @ApiResponse(code = 404, message = "Invalid topic name") }) - @RequestMapping(path = "/{name:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/{name:.+}", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody TopicVO getTopic(@PathVariable("name") String topicName) { return kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); @@ -103,7 +100,7 @@ public String createTopicPage(Model model) { @ApiResponses(value = { @ApiResponse(code = 200, message = "Success", response = String.class, responseContainer = "List") }) - @RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody List getAllTopics() { return kafkaMonitor.getTopics(); } @@ -113,7 +110,7 @@ public String createTopicPage(Model model) { @ApiResponse(code = 200, message = "Success", response = String.class, responseContainer = "List"), @ApiResponse(code = 404, message = "Invalid topic name") }) - @RequestMapping(path = "/{name:.+}/consumers", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) + @GetMapping(path = "/{name:.+}/consumers", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody List getConsumers(@PathVariable("name") String topicName) { final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); @@ -128,7 +125,7 @@ public String createTopicPage(Model model) { @ApiResponses(value = { @ApiResponse(code = 200, message = "Success", response = String.class) }) - @RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.POST) + @PostMapping(produces = MediaType.APPLICATION_JSON_VALUE) public String createTopic(CreateTopicVO createTopicVO, Model model) { model.addAttribute("topicCreateEnabled", topicCreateEnabled); model.addAttribute("topicName", createTopicVO.getName()); diff --git a/src/main/java/kafdrop/model/AclVO.java b/src/main/java/kafdrop/model/AclVO.java index 5caa3e6a..a1333576 100644 --- a/src/main/java/kafdrop/model/AclVO.java +++ b/src/main/java/kafdrop/model/AclVO.java @@ -18,6 +18,8 @@ package kafdrop.model; +import java.util.Objects; + public final class AclVO implements Comparable{ private final String name; private final String resourceType; @@ -68,4 +70,17 @@ public String getPermissionType() { @Override public int compareTo(AclVO that) { return this.name.compareTo(that.name) ; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AclVO aclVO = (AclVO) o; + return name.equals(aclVO.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index c714fa17..b3ddc6e9 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -3,11 +3,9 @@ import kafdrop.config.*; import kafdrop.model.*; import kafdrop.util.*; -import org.apache.kafka.clients.*; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.Node; import org.apache.kafka.common.*; -import org.apache.kafka.common.config.*; import org.apache.kafka.common.serialization.*; import org.slf4j.*; import org.springframework.stereotype.*; @@ -125,7 +123,6 @@ synchronized List> getLatestRecords(TopicPartitio rec.offset(), rec.timestamp(), rec.timestampType(), - 0L, rec.serializedKeySize(), rec.serializedValueSize(), deserialize(deserializers.getKeyDeserializer(),rec.key()), @@ -185,7 +182,6 @@ synchronized List> getLatestRecords(String topic, rec.offset(), rec.timestamp(), rec.timestampType(), - 0L, rec.serializedKeySize(), rec.serializedValueSize(), deserialize(deserializers.getKeyDeserializer(), rec.key()), diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index a71ce99a..79038e9b 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -144,14 +144,14 @@ public List getMessages(String topic, int count, final var records = highLevelConsumer.getLatestRecords(topic, count, deserializers); if (records != null) { final var messageVos = new ArrayList(); - for (var record : records) { + for (var rec : records) { final var messageVo = new MessageVO(); - messageVo.setPartition(record.partition()); - messageVo.setOffset(record.offset()); - messageVo.setKey(record.key()); - messageVo.setMessage(record.value()); - messageVo.setHeaders(headersToMap(record.headers())); - messageVo.setTimestamp(new Date(record.timestamp())); + messageVo.setPartition(rec.partition()); + messageVo.setOffset(rec.offset()); + messageVo.setKey(rec.key()); + messageVo.setMessage(rec.value()); + messageVo.setHeaders(headersToMap(rec.headers())); + messageVo.setTimestamp(new Date(rec.timestamp())); messageVos.add(messageVo); } return messageVos; @@ -166,14 +166,14 @@ public List getMessages(TopicPartition topicPartition, long offset, i final var records = highLevelConsumer.getLatestRecords(topicPartition, offset, count, deserializers); if (records != null) { final var messageVos = new ArrayList(); - for (var record : records) { + for (var rec : records) { final var messageVo = new MessageVO(); messageVo.setPartition(topicPartition.partition()); - messageVo.setOffset(record.offset()); - messageVo.setKey(record.key()); - messageVo.setMessage(record.value()); - messageVo.setHeaders(headersToMap(record.headers())); - messageVo.setTimestamp(new Date(record.timestamp())); + messageVo.setOffset(rec.offset()); + messageVo.setKey(rec.key()); + messageVo.setMessage(rec.value()); + messageVo.setHeaders(headersToMap(rec.headers())); + messageVo.setTimestamp(new Date(rec.timestamp())); messageVos.add(messageVo); } return messageVos; @@ -243,8 +243,8 @@ private static List convert(List consumerGroup final var partition = topicPartitionOffset.getKey().partition(); final var offset = topicPartitionOffset.getValue().offset(); groupTopicPartitionOffsetMap - .computeIfAbsent(groupId, __ -> new TreeMap<>()) - .computeIfAbsent(topic, __ -> new TreeMap<>()) + .computeIfAbsent(groupId, unused -> new TreeMap<>()) + .computeIfAbsent(topic, unused -> new TreeMap<>()) .put(partition, offset); } } diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java index 2555bfa2..b01ea072 100644 --- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java +++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java @@ -24,10 +24,10 @@ public String deserializeMessage(ByteBuffer buffer) { private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { final var config = new HashMap(); - config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); if (schemaRegistryAuth != null) { - config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); + config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); } final var kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(config, false); diff --git a/src/main/java/kafdrop/util/ByteUtils.java b/src/main/java/kafdrop/util/ByteUtils.java index 881b9d11..71f58062 100644 --- a/src/main/java/kafdrop/util/ByteUtils.java +++ b/src/main/java/kafdrop/util/ByteUtils.java @@ -4,6 +4,9 @@ import java.nio.charset.*; final class ByteUtils { + private ByteUtils() { + // no instance allowed, static utility class + } static String readString(ByteBuffer buffer) { return new String(readBytes(buffer), StandardCharsets.UTF_8); } diff --git a/src/main/java/kafdrop/util/MsgPackMessageDeserializer.java b/src/main/java/kafdrop/util/MsgPackMessageDeserializer.java index 3097510d..e3ce01e1 100644 --- a/src/main/java/kafdrop/util/MsgPackMessageDeserializer.java +++ b/src/main/java/kafdrop/util/MsgPackMessageDeserializer.java @@ -15,8 +15,7 @@ public class MsgPackMessageDeserializer implements MessageDeserializer { @Override public String deserializeMessage(ByteBuffer buffer) { - MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(buffer); - try { + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(buffer)) { return unpacker.unpackValue().toJson(); } catch (IOException e) { final String errorMsg = "Unable to unpack msgpack message"; diff --git a/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java b/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java index 8eb9554d..c87d074e 100644 --- a/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java +++ b/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java @@ -1,27 +1,21 @@ package kafdrop.util; -import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.util.CollectionUtils; import com.google.protobuf.CodedInputStream; import com.google.protobuf.DescriptorProtos.FileDescriptorProto; import com.google.protobuf.DescriptorProtos.FileDescriptorSet; import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Descriptors.FileDescriptor; import com.google.protobuf.DynamicMessage; @@ -30,14 +24,12 @@ public class ProtobufMessageDeserializer implements MessageDeserializer { - private String topic; - private String fullDescFile; - private String msgTypeName; + private final String fullDescFile; + private final String msgTypeName; private static final Logger LOG = LoggerFactory.getLogger(ProtobufMessageDeserializer.class); - public ProtobufMessageDeserializer(String topic, String fullDescFile, String msgTypeName) { - this.topic = topic; + public ProtobufMessageDeserializer(String fullDescFile, String msgTypeName) { this.fullDescFile = fullDescFile; this.msgTypeName = msgTypeName; } @@ -45,13 +37,14 @@ public ProtobufMessageDeserializer(String topic, String fullDescFile, String msg @Override public String deserializeMessage(ByteBuffer buffer) { - try (InputStream input = new FileInputStream(new File(fullDescFile))) { + try (InputStream input = new FileInputStream(fullDescFile)) { FileDescriptorSet set = FileDescriptorSet.parseFrom(input); List descs = new ArrayList<>(); for (FileDescriptorProto ffdp : set.getFileList()) { - FileDescriptor fd = Descriptors.FileDescriptor.buildFrom(ffdp, - (FileDescriptor[]) descs.toArray(new FileDescriptor[descs.size()])); + FileDescriptor fd = Descriptors.FileDescriptor.buildFrom( + ffdp, + descs.toArray(new FileDescriptor[0])); descs.add(fd); } @@ -68,7 +61,7 @@ public String deserializeMessage(ByteBuffer buffer) { JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder().add(descriptors).build(); Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry); - return printer.print(message).replaceAll("\n", ""); // must remove line break so it defaults to collapse mode + return printer.print(message).replace("\n", ""); // must remove line break so it defaults to collapse mode } catch (FileNotFoundException e) { final String errorMsg = "Couldn't open descriptor file: " + fullDescFile; LOG.error(errorMsg, e); diff --git a/src/main/java/kafdrop/util/ProtobufSchemaRegistryMessageDeserializer.java b/src/main/java/kafdrop/util/ProtobufSchemaRegistryMessageDeserializer.java index 1a495d90..a8b1449b 100644 --- a/src/main/java/kafdrop/util/ProtobufSchemaRegistryMessageDeserializer.java +++ b/src/main/java/kafdrop/util/ProtobufSchemaRegistryMessageDeserializer.java @@ -2,7 +2,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; public class ProtobufSchemaRegistryMessageDeserializer implements MessageDeserializer { @@ -24,10 +24,10 @@ public String deserializeMessage(ByteBuffer buffer) { private static KafkaProtobufDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { final var config = new HashMap(); - config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); if (schemaRegistryAuth != null) { - config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); + config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); } final var kafkaAvroDeserializer = new KafkaProtobufDeserializer<>(); kafkaAvroDeserializer.configure(config, false); diff --git a/src/test/java/kafdrop/KafdropIT.java b/src/test/java/kafdrop/KafdropIT.java index 6ee8f766..73304628 100644 --- a/src/test/java/kafdrop/KafdropIT.java +++ b/src/test/java/kafdrop/KafdropIT.java @@ -2,7 +2,11 @@ import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertTrue; + class KafdropIT extends AbstractIntegrationTest { @Test - void contextTest(){} + void contextTest(){ + assertTrue(Initializer.kafka.isRunning()); + } }