Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public static class CacheProperties {
public static class NgramProperties {
int ngramMin = 1;
int ngramMax = 4;
boolean distanceScore = true;
}

@Data
Expand All @@ -245,10 +246,10 @@ public static class NgramProperties {
public static class ClusterFtsProperties {
boolean enabled = true;
boolean defaultEnabled = false;
NgramProperties schemas = new NgramProperties(1, 4);
NgramProperties consumers = new NgramProperties(1, 4);
NgramProperties connect = new NgramProperties(1, 4);
NgramProperties acl = new NgramProperties(1, 4);
NgramProperties schemas = new NgramProperties(1, 4, true);
NgramProperties consumers = new NgramProperties(1, 4, true);
NgramProperties connect = new NgramProperties(1, 4, true);
NgramProperties acl = new NgramProperties(1, 4, true);

public boolean use(Boolean request) {
if (enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -137,15 +138,18 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
.operationName("getAllConnectors")
.build();

var maybeComparator = Optional.ofNullable(orderBy).map(this::getConnectorsComparator);

var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
? getConnectorsComparator(orderBy)
: getConnectorsComparator(orderBy).reversed();
? maybeComparator
: maybeComparator.map(Comparator::reversed);

Flux<FullConnectorInfoDTO> connectors = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));

Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.sort(comparator);
Flux<FullConnectorInfoDTO> sorted = comparator.map(connectors::sort).orElse(connectors);

return Mono.just(ResponseEntity.ok(job))
return Mono.just(ResponseEntity.ok(sorted))
.doOnEach(sig -> audit(context, sig));
}

Expand Down Expand Up @@ -280,9 +284,7 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
FullConnectorInfoDTO::getName,
Comparator.nullsFirst(Comparator.naturalOrder())
);
if (orderBy == null) {
return defaultComparator;
}

return switch (orderBy) {
case CONNECT -> Comparator.comparing(
FullConnectorInfoDTO::getConnect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -244,10 +245,10 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster

List<String> subjectsToRetrieve;
boolean paginate = true;
var schemaComparator = getComparatorForSchema(orderBy);
final Comparator<SubjectWithCompatibilityLevel> comparator =
var schemaComparator = Optional.ofNullable(orderBy).map(this::getComparatorForSchema);
final Optional<Comparator<SubjectWithCompatibilityLevel>> comparator =
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
? schemaComparator : schemaComparator.reversed();
? schemaComparator : schemaComparator.map(Comparator::reversed);
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
if (SortOrderDTO.DESC.equals(sortOrder)) {
filteredSubjects.sort(Comparator.nullsFirst(Comparator.reverseOrder()));
Expand All @@ -274,11 +275,13 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster

private List<SubjectWithCompatibilityLevel> paginateSchemas(
List<SubjectWithCompatibilityLevel> subjects,
Comparator<SubjectWithCompatibilityLevel> comparator,
Optional<Comparator<SubjectWithCompatibilityLevel>> comparator,
boolean paginate,
int pageSize,
int subjectToSkip) {
subjects.sort(comparator);

comparator.ifPresent(subjects::sort);

if (paginate) {
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AclBindingNgramFilter extends NgramFilter<AclBinding> {
private final List<Tuple2<List<String>, AclBinding>> bindings;

public AclBindingNgramFilter(Collection<AclBinding> bindings) {
this(bindings, true, new ClustersProperties.NgramProperties(1, 4));
this(bindings, true, new ClustersProperties.NgramProperties(1, 4, true));
}

public AclBindingNgramFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ConsumerGroupFilter extends NgramFilter<ConsumerGroupListing> {
private final List<Tuple2<List<String>, ConsumerGroupListing>> groups;

public ConsumerGroupFilter(Collection<ConsumerGroupListing> groups) {
this(groups, true, new ClustersProperties.NgramProperties(1, 4));
this(groups, true, new ClustersProperties.NgramProperties(1, 4, true));
}

public ConsumerGroupFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class KafkaConnectNgramFilter extends NgramFilter<FullConnectorInfoDTO> {
private final List<Tuple2<List<String>, FullConnectorInfoDTO>> connectors;

public KafkaConnectNgramFilter(Collection<FullConnectorInfoDTO> connectors) {
this(connectors, true, new ClustersProperties.NgramProperties(1, 4));
this(connectors, true, new ClustersProperties.NgramProperties(1, 4, true));
}

public KafkaConnectNgramFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import io.kafbat.ui.model.InternalTopic;
import io.kafbat.ui.model.InternalTopicConfig;
import io.kafbat.ui.service.index.lucene.IndexedTextField;
import io.kafbat.ui.service.index.lucene.NameDistanceScoringFunction;
import io.kafbat.ui.service.index.lucene.ShortWordAnalyzer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand All @@ -18,11 +21,11 @@
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -59,11 +62,13 @@ public LuceneTopicsIndex(List<InternalTopic> topics) throws IOException {

private Directory build(List<InternalTopic> topics) {
Directory directory = new ByteBuffersDirectory();

try (IndexWriter directoryWriter = new IndexWriter(directory, new IndexWriterConfig(this.analyzer))) {
for (InternalTopic topic : topics) {
Document doc = new Document();

doc.add(new StringField(FIELD_NAME_RAW, topic.getName(), Field.Store.YES));
doc.add(new TextField(FIELD_NAME, topic.getName(), Field.Store.NO));
doc.add(new IndexedTextField(FIELD_NAME, topic.getName(), Field.Store.YES));
doc.add(new IntPoint(FIELD_PARTITIONS, topic.getPartitionCount()));
doc.add(new IntPoint(FIELD_REPLICATION, topic.getReplicationFactor()));
doc.add(new LongPoint(FIELD_SIZE, topic.getSegmentSize()));
Expand Down Expand Up @@ -117,9 +122,9 @@ public List<InternalTopic> find(String search, Boolean showInternal,
closeLock.readLock().lock();
try {

QueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer);
PrefixQueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer);
queryParser.setDefaultOperator(QueryParser.Operator.AND);
Query nameQuery = queryParser.parse(search);;
Query nameQuery = queryParser.parse(search);

Query internalFilter = new TermQuery(new Term(FIELD_INTERNAL, "true"));

Expand All @@ -129,6 +134,12 @@ public List<InternalTopic> find(String search, Boolean showInternal,
queryBuilder.add(internalFilter, BooleanClause.Occur.MUST_NOT);
}

BooleanQuery combined = queryBuilder.build();
Query wrapped = new FunctionScoreQuery(
combined,
new NameDistanceScoringFunction(FIELD_NAME, queryParser.getPrefixes())
);

List<SortField> sortFields = new ArrayList<>();
sortFields.add(SortField.FIELD_SCORE);
if (!sortField.equals(FIELD_NAME)) {
Expand All @@ -137,7 +148,7 @@ public List<InternalTopic> find(String search, Boolean showInternal,

Sort sort = new Sort(sortFields.toArray(new SortField[0]));

TopDocs result = this.indexSearcher.search(queryBuilder.build(), count != null ? count : this.maxSize, sort);
TopDocs result = this.indexSearcher.search(wrapped, count != null ? count : this.maxSize, sort);

List<String> topics = new ArrayList<>();
for (ScoreDoc scoreDoc : result.scoreDocs) {
Expand Down
35 changes: 33 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/index/NgramFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,9 +25,11 @@
public abstract class NgramFilter<T> {
private final Analyzer analyzer;
private final boolean enabled;
private final boolean distanceScore;

public NgramFilter(ClustersProperties.NgramProperties properties, boolean enabled) {
this.enabled = enabled;
this.distanceScore = properties.isDistanceScore();
this.analyzer = new ShortWordNGramAnalyzer(properties.getNgramMin(), properties.getNgramMax(), false);
}

Expand Down Expand Up @@ -52,15 +57,25 @@ public List<T> find(String search, Comparator<T> comparator) {
try {
List<SearchResult<T>> result = new ArrayList<>();
List<String> queryTokens = tokenizeString(analyzer, search);
Map<String, Integer> queryFreq = termFreq(queryTokens);
Map<String, Integer> queryFreq = Map.of();

if (!distanceScore) {
queryFreq = termFreq(queryTokens);
}

for (Tuple2<List<String>, T> item : getItems()) {
for (String field : item.getT1()) {
List<String> itemTokens = tokenizeString(analyzer, field);
HashSet<String> itemTokensSet = new HashSet<>(itemTokens);
if (itemTokensSet.containsAll(queryTokens)) {
double score = cosineSimilarity(queryFreq, itemTokens);
double score;
if (distanceScore) {
score = distanceSimilarity(queryTokens, itemTokens);
} else {
score = cosineSimilarity(queryFreq, itemTokens);
}
result.add(new SearchResult<>(item.getT2(), score));
break;
}
}
}
Expand All @@ -77,6 +92,22 @@ public List<T> find(String search, Comparator<T> comparator) {
}
}

private double distanceSimilarity(List<String> queryTokens, List<String> itemTokens) {
int smallest = Integer.MAX_VALUE;
for (String queryToken : queryTokens) {
int i = itemTokens.indexOf(queryToken);
if (i >= 0) {
smallest = Math.min(smallest, i);
}
}

if (smallest == Integer.MAX_VALUE) {
return 1.0;
} else {
return 1.0 / (1.0 + smallest);
}
}

private List<T> list(Stream<T> stream, Comparator<T> comparator) {
if (comparator != null) {
return stream.sorted(comparator).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static org.apache.lucene.search.BoostAttribute.DEFAULT_BOOST;

import io.kafbat.ui.service.index.TopicsIndex.FieldType;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.IntPoint;
Expand All @@ -14,10 +16,11 @@
import org.apache.lucene.search.PrefixQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TermRangeQuery;

public class PrefixQueryParser extends QueryParser {

private final List<String> prefixes = new ArrayList<>();

public PrefixQueryParser(String field, Analyzer analyzer) {
super(field, analyzer);
}
Expand Down Expand Up @@ -60,7 +63,13 @@ protected Query newTermQuery(Term term, float boost) {
.orElse(FieldType.STRING);

Query query = switch (fieldType) {
case STRING -> new PrefixQuery(term);
case STRING -> {
if (Objects.equals(term.field(), field)) {
prefixes.add(term.text());
}

yield new PrefixQuery(term);
}
case INT -> IntPoint.newExactQuery(term.field(), Integer.parseInt(term.text()));
case LONG -> LongPoint.newExactQuery(term.field(), Long.parseLong(term.text()));
case BOOLEAN -> new TermQuery(term);
Expand All @@ -72,4 +81,7 @@ protected Query newTermQuery(Term term, float boost) {
return new BoostQuery(query, boost);
}

public List<String> getPrefixes() {
return prefixes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public SchemasFilter(Collection<String> subjects, boolean enabled, ClustersPrope

@Override
public List<String> find(String search) {
return super.find(search, String::compareTo);
return super.find(search, null);
}

@Override
Expand Down
Loading
Loading