Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to use custom basePath #511

Merged
merged 10 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ For example, if you want to use an environment variable to set the `name` parame

|Name |Description
|-----------------------|-------------------------------
|`SERVER_SERVLET_CONTEXT_PATH` | URI basePath
|`KAFKA_CLUSTERS_0_NAME` | Cluster name
|`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` |Address where to connect
|`KAFKA_CLUSTERS_0_ZOOKEEPER` | Zookeper service address
Expand All @@ -163,8 +164,3 @@ For example, if you want to use an environment variable to set the `name` parame
|`KAFKA_CLUSTERS_0_READONLY` |Enable read only mode. Default: false
|`LOGGING_LEVEL_ROOT` | Setting log level (all, debug, info, warn, error, fatal, off). Default: debug
|`LOGGING_LEVEL_COM_PROVECTUS` |Setting log level (all, debug, info, warn, error, fatal, off). Default: debug





19 changes: 19 additions & 0 deletions docker/kafka-ui-reverse-proxy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
version: '2'
services:
nginx:
image: nginx:latest
volumes:
- ./proxy.conf:/etc/nginx/conf.d/default.conf
ports:
- 8080:80

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8082:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
SERVER_SERVLET_CONTEXT_PATH: /kafka-ui
9 changes: 9 additions & 0 deletions docker/proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
server {
listen 80;
server_name localhost;

location /kafka-ui {
# rewrite /kafka-ui/(.*) /$1 break;
proxy_pass http://kafka-ui:8080;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ public KeyedObjectPool<String, JMXConnector> pool() {
}

private GenericKeyedObjectPoolConfig poolConfig() {
GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
final var poolConfig = new GenericKeyedObjectPoolConfig();
poolConfig.setMaxIdlePerKey(3);
poolConfig.setMaxTotalPerKey(3);
return poolConfig;
}

@Bean
public MBeanExporter exporter() {
final MBeanExporter exporter = new MBeanExporter();
final var exporter = new MBeanExporter();
exporter.setAutodetect(true);
exporter.setExcludedBeans("pool");
return exporter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
package com.provectus.kafka.ui.config;

import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

@Component

public class CustomWebFilter implements WebFilter {

private final ServerProperties serverProperties;

public CustomWebFilter(ServerProperties serverProperties) {
this.serverProperties = serverProperties;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
if (exchange.getRequest().getURI().getPath().equals("/")
|| exchange.getRequest().getURI().getPath().startsWith("/ui")) {
String contextPath = serverProperties.getServlet().getContextPath() != null
? serverProperties.getServlet().getContextPath() : "";

if (exchange.getRequest().getURI().getPath().equals(contextPath + "/")
|| exchange.getRequest().getURI().getPath().startsWith(contextPath + "/ui")) {
return chain.filter(
exchange.mutate().request(exchange.getRequest().mutate().path("/index.html").build())
.build());
}
.build()
);
} else if (exchange.getRequest().getURI().getPath().startsWith(contextPath)) {
return chain.filter(
exchange.mutate().request(exchange.getRequest().mutate().contextPath(contextPath).build())
.build()
);
}

return chain.filter(exchange);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.util.ResourceUtil;
import java.util.concurrent.atomic.AtomicReference;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.core.io.Resource;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
@Log4j2
public class StaticController {
private final ServerProperties serverProperties;

@Value("classpath:static/index.html")
private Resource indexFile;
private final AtomicReference<String> renderedIndexFile = new AtomicReference<>();

@GetMapping(value = "/index.html", produces = { "text/html" })
public Mono<ResponseEntity<String>> getIndex() {
return Mono.just(ResponseEntity.ok(getRenderedIndexFile()));
}

public String getRenderedIndexFile() {
String rendered = renderedIndexFile.get();
if (rendered == null) {
rendered = buildIndexFile();
if (renderedIndexFile.compareAndSet(null, rendered)) {
return rendered;
} else {
return renderedIndexFile.get();
}
} else {
return rendered;
}
}

@SneakyThrows
private String buildIndexFile() {
final String contextPath = serverProperties.getServlet().getContextPath() != null
? serverProperties.getServlet().getContextPath() : "";
final String staticPath = contextPath + "/static";
return ResourceUtil.readAsString(indexFile)
.replace("href=\"./static", "href=\"" + staticPath)
.replace("src=\"./static", "src=\"" + staticPath)
.replace("window.basePath=\"/\"", "window.basePath=\"" + contextPath + "\"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ public ProtobufFileRecordDeserializer(Path protobufSchemaPath, String messageNam
}

@Override
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
public Object deserialize(ConsumerRecord<Bytes, Bytes> msg) {
try {
final DynamicMessage message = DynamicMessage.parseFrom(
final var message = DynamicMessage.parseFrom(
protobufSchema.toDescriptor(),
new ByteArrayInputStream(record.value().get())
new ByteArrayInputStream(msg.value().get())
);
byte[] bytes = ProtobufSchemaUtils.toJson(message);
return parseJson(bytes);
} catch (Throwable e) {
throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface RecordDeserializer {

Object deserialize(ConsumerRecord<Bytes, Bytes> record);
Object deserialize(ConsumerRecord<Bytes, Bytes> msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Message;
import com.provectus.kafka.ui.model.KafkaCluster;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
Expand All @@ -16,14 +17,17 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

Expand Down Expand Up @@ -99,57 +103,83 @@ private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {
return topicFormatMap.computeIfAbsent(record.topic(), k -> detectFormat(record));
}

private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
String schemaName = String.format(cluster.getSchemaNameTemplate(), record.topic());
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> msg) {
if (schemaRegistryClient != null) {
try {
final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
if (!versions.isEmpty()) {
final Integer version = versions.iterator().next();
final String subjectName = String.format(cluster.getSchemaNameTemplate(), record.topic());
final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false);
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
final Optional<String> type = getSchemaFromMessage(msg).or(() -> getSchemaBySubject(msg));
if (type.isPresent()) {
if (type.get().equals(MessageFormat.PROTOBUF.name())) {
try {
protobufDeserializer.deserialize(record.topic(), record.value().get());
protobufDeserializer.deserialize(msg.topic(), msg.value().get());
return MessageFormat.PROTOBUF;
} catch (Throwable e) {
log.info("Failed to get Protobuf schema for topic {}", record.topic(), e);
log.info("Failed to get Protobuf schema for topic {}", msg.topic(), e);
}
} else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
} else if (type.get().equals(MessageFormat.AVRO.name())) {
try {
avroDeserializer.deserialize(record.topic(), record.value().get());
avroDeserializer.deserialize(msg.topic(), msg.value().get());
return MessageFormat.AVRO;
} catch (Throwable e) {
log.info("Failed to get Avro schema for topic {}", record.topic(), e);
log.info("Failed to get Avro schema for topic {}", msg.topic(), e);
}
} else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) {
} else if (type.get().equals(MessageFormat.JSON.name())) {
try {
parseJsonRecord(record);
parseJsonRecord(msg);
return MessageFormat.JSON;
} catch (IOException e) {
log.info("Failed to parse json from topic {}", record.topic());
log.info("Failed to parse json from topic {}", msg.topic());
}
}
}
} catch (RestClientException | IOException e) {
log.warn("Failed to get Schema for topic {}", record.topic(), e);
} catch (Exception e) {
log.warn("Failed to get Schema for topic {}", msg.topic(), e);
}
}

try {
parseJsonRecord(record);
parseJsonRecord(msg);
return MessageFormat.JSON;
} catch (IOException e) {
log.info("Failed to parse json from topic {}", record.topic());
log.info("Failed to parse json from topic {}", msg.topic());
}

return MessageFormat.STRING;
}

private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic();
if (record.value() != null && avroDeserializer != null) {
byte[] valueBytes = record.value().get();
@SneakyThrows
private Optional<String> getSchemaFromMessage(ConsumerRecord<Bytes, Bytes> msg) {
Optional<String> result = Optional.empty();
final Bytes value = msg.value();
if (value != null) {
ByteBuffer buffer = ByteBuffer.wrap(value.get());
if (buffer.get() == 0) {
int id = buffer.getInt();
result = Optional.ofNullable(
schemaRegistryClient.getSchemaById(id)
).map(ParsedSchema::schemaType);
}
}
return result;
}

@SneakyThrows
private Optional<String> getSchemaBySubject(ConsumerRecord<Bytes, Bytes> msg) {
String schemaName = String.format(cluster.getSchemaNameTemplate(), msg.topic());
final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
if (!versions.isEmpty()) {
final Integer version = versions.iterator().next();
final String subjectName = String.format(cluster.getSchemaNameTemplate(), msg.topic());
final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false);
return Optional.ofNullable(schema).map(Schema::getSchemaType);
} else {
return Optional.empty();
}
}

private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> msg) throws IOException {
String topic = msg.topic();
if (msg.value() != null && avroDeserializer != null) {
byte[] valueBytes = msg.value().get();
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
return parseJson(bytes);
Expand All @@ -158,10 +188,10 @@ private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOExc
}
}

private Object parseProtobufRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic();
if (record.value() != null && protobufDeserializer != null) {
byte[] valueBytes = record.value().get();
private Object parseProtobufRecord(ConsumerRecord<Bytes, Bytes> msg) throws IOException {
String topic = msg.topic();
if (msg.value() != null && protobufDeserializer != null) {
byte[] valueBytes = msg.value().get();
final Message message = protobufDeserializer.deserialize(topic, valueBytes);
byte[] bytes = ProtobufSchemaUtils.toJson(message);
return parseJson(bytes);
Expand All @@ -170,8 +200,8 @@ private Object parseProtobufRecord(ConsumerRecord<Bytes, Bytes> record) throws I
}
}

private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
var value = record.value();
private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> msg) throws IOException {
var value = msg.value();
if (value == null) {
return Map.of();
}
Expand All @@ -184,12 +214,12 @@ private Object parseJson(byte[] bytes) throws IOException {
});
}

private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
String topic = record.topic();
if (record.value() == null) {
private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> msg) {
String topic = msg.topic();
if (msg.value() == null) {
return Map.of();
}
byte[] valueBytes = record.value().get();
byte[] valueBytes = msg.value().get();
return stringDeserializer.deserialize(topic, valueBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ public class SimpleRecordDeserializer implements RecordDeserializer {
private final StringDeserializer stringDeserializer = new StringDeserializer();

@Override
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
if (record.value() != null) {
return stringDeserializer.deserialize(record.topic(), record.value().get());
public Object deserialize(ConsumerRecord<Bytes, Bytes> msg) {
if (msg.value() != null) {
return stringDeserializer.deserialize(msg.topic(), msg.value().get());
} else {
return "empty";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import org.apache.commons.lang3.math.NumberUtils;

public class NumberUtil {

private NumberUtil() {
}

public static boolean isNumeric(Object value) {
return value != null && NumberUtils.isCreatable(value.toString());
}
Expand Down
Loading