Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Message Search Functionality #288

Merged
merged 16 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ settings.xml
kafka.properties*
kafka.truststore.jks*
kafka.keystore.jks*
/.vs
67 changes: 65 additions & 2 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties;
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties;
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties;
import kafdrop.form.SearchMessageForm;
import kafdrop.model.MessageVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;

@Tag(name = "message-controller", description = "Message Controller")
Expand Down Expand Up @@ -193,10 +195,71 @@ public String viewMessageForm(@PathVariable("name") String topicName,
return "message-inspector";
}

/**
* Human friendly view of searching messages.
*
* @param topicName Name of topic
* @param searchMessageForm Message form for submitting requests to search messages.
* @param errors
* @param model
* @return View for seeing messages in a partition.
*/
@GetMapping("/topic/{name:.+}/search-messages")
public String searchMessageForm(@PathVariable("name") String topicName,
@Valid @ModelAttribute("searchMessageForm") SearchMessageForm searchMessageForm,
BindingResult errors,
Model model) {
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();

if (searchMessageForm.isEmpty()) {
final SearchMessageForm defaultForm = new SearchMessageForm();

defaultForm.setSearchText("");
defaultForm.setFormat(defaultFormat);
defaultForm.setKeyFormat(defaultFormat);
defaultForm.setMaximumCount(100);
defaultForm.setStartTimestamp(new Date(0));
model.addAttribute("searchMessageForm", defaultForm);
}

final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

model.addAttribute("topic", topic);
model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());
model.addAttribute("defaultKeyFormat", defaultKeyFormat);
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

if (!searchMessageForm.isEmpty() && !errors.hasErrors()) {

final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto())
);

var searchResults = kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);

model.addAttribute("messages", searchResults.getMessages());
model.addAttribute("details", searchResults.getCompletionDetails());
}

return "search-message";
}

/**
* Returns the selected nessagr format based on the
* form submission
* Returns the selected message format based on the form submission
*
* @param format String representation of format name
* @return
Expand Down
106 changes: 106 additions & 0 deletions src/main/java/kafdrop/form/SearchMessageForm.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package kafdrop.form;

import com.fasterxml.jackson.annotation.JsonIgnore;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import kafdrop.util.MessageFormat;
import org.springframework.format.annotation.DateTimeFormat;

import java.util.Date;

public class SearchMessageForm {

@NotBlank
private String searchText;

@NotNull
@Min(1)
@Max(1000)
private Integer maximumCount;

private MessageFormat format;

private MessageFormat keyFormat;

private String descFile;

private String msgTypeName;

@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private Date startTimestamp;

public SearchMessageForm(String searchText, MessageFormat format) {
this.searchText = searchText;
this.format = format;
}

public Date getStartTimestamp() {
return startTimestamp;
}

public void setStartTimestamp(Date startTimestamp) {
this.startTimestamp = startTimestamp;
}

public SearchMessageForm(String searchText) {
this(searchText, MessageFormat.DEFAULT);
}

public SearchMessageForm() {
}

@JsonIgnore
public boolean isEmpty() {
return searchText == null || searchText.isEmpty();
}

public String getSearchText() {
return searchText;
}

public void setSearchText(String searchText) {
this.searchText = searchText;
}

public Integer getMaximumCount() {
return maximumCount;
}

public void setMaximumCount(Integer maximumCount) {
this.maximumCount = maximumCount;
}

public MessageFormat getKeyFormat() {
return keyFormat;
}

public void setKeyFormat(MessageFormat keyFormat) {
this.keyFormat = keyFormat;
}

public MessageFormat getFormat() {
return format;
}

public void setFormat(MessageFormat format) {
this.format = format;
}

public String getDescFile() {
return descFile;
}

public void setDescFile(String descFile) {
this.descFile = descFile;
}

public String getMsgTypeName() {
return msgTypeName;
}

public void setMsgTypeName(String msgTypeName) {
this.msgTypeName = msgTypeName;
}
}
43 changes: 43 additions & 0 deletions src/main/java/kafdrop/model/SearchResultsVO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2021 Kafdrop contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/

package kafdrop.model;

import java.util.List;

public final class SearchResultsVO {
private List<MessageVO> messages;

private String completionDetails;

public List<MessageVO> getMessages() {
return messages;
}

public String getCompletionDetails() {
return completionDetails;
}

public void setCompletionDetails(String completionDetails) {
this.completionDetails = completionDetails;
}

public void setMessages(List<MessageVO> messages) {
this.messages = messages;
}
}