Skip to content

Commit

Permalink
Fix #2834: Merge the change events 'delete' and 'add' as 'update' for…
Browse files Browse the repository at this point in the history
… Activity Feed (#3006)
  • Loading branch information
vivekratnavel committed Feb 27, 2022
1 parent 83c9b75 commit 68d09a8
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 78 deletions.
7 changes: 7 additions & 0 deletions catalog-rest-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@
<version>2.17.1</version>
</dependency>

<!-- Diff util to compute diffs in plain text -->
<dependency>
<groupId>io.github.java-diff-utils</groupId>
<artifactId>java-diff-utils</artifactId>
<version>4.11</version>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.FeedRepository;
import org.openmetadata.catalog.resources.feeds.MessageParser;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ChangeEvent;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.type.FieldChange;
import org.openmetadata.catalog.util.ChangeEventParser;
import org.openmetadata.catalog.util.EntityInterface;
import org.openmetadata.catalog.util.JsonUtils;
import org.openmetadata.catalog.util.RestUtil;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon

// Add a new thread to the entity for every change event
// for the event to appear in activity feeds
List<Thread> threads = getThreads(responseContext);
List<Thread> threads = getThreads(responseContext, changeEvent);
if (threads != null) {
for (var thread : threads) {
feedDao.create(thread);
Expand Down Expand Up @@ -170,76 +171,30 @@ private static ChangeEvent copyChangeEvent(ChangeEvent changeEvent) {
.withCurrentVersion(changeEvent.getCurrentVersion());
}

private enum CHANGE_TYPE {
UPDATE,
ADD,
DELETE
}

private List<Thread> getThreads(ContainerResponseContext responseContext) {
private List<Thread> getThreads(ContainerResponseContext responseContext, ChangeEvent changeEvent) {
Object entity = responseContext.getEntity();
if (entity == null) {
return null; // Response has no entity to produce change event from
}

var entityInterface = Entity.getEntityInterface(entity);
if (entityInterface.getChangeDescription() == null) {
// entityInterface can be null in case of Tags
// TODO: remove this null check when entityInterface should never be null
if (entityInterface == null || entityInterface.getChangeDescription() == null) {
return null;
}
List<FieldChange> fieldsUpdated = entityInterface.getChangeDescription().getFieldsUpdated();
List<Thread> threads = new ArrayList<>(getThreads(fieldsUpdated, entity, CHANGE_TYPE.UPDATE));

List<FieldChange> fieldsAdded = entityInterface.getChangeDescription().getFieldsAdded();
threads.addAll(getThreads(fieldsAdded, entity, CHANGE_TYPE.ADD));

List<FieldChange> fieldsDeleted = entityInterface.getChangeDescription().getFieldsDeleted();
threads.addAll(getThreads(fieldsDeleted, entity, CHANGE_TYPE.DELETE));
return threads;
return getThreads(entity, entityInterface.getChangeDescription(), changeEvent);
}

private List<Thread> getThreads(List<FieldChange> fields, Object entity, CHANGE_TYPE changeType) {
private List<Thread> getThreads(Object entity, ChangeDescription changeDescription, ChangeEvent changeEvent) {
List<Thread> threads = new ArrayList<>();
var entityInterface = Entity.getEntityInterface(entity);
EntityReference entityReference = Entity.getEntityReference(entity);
String entityType = entityReference.getType();
String entityFQN = entityReference.getName();
for (var field : fields) {
// if field name has dots, then it is an array field
String fieldName = field.getName();
String arrayFieldName = null;
String arrayFieldValue = null;
String newFieldValue = field.getNewValue() != null ? field.getNewValue().toString() : StringUtils.EMPTY;
if (fieldName.contains(".")) {
String[] fieldNameParts = fieldName.split("\\.");
// For array type, it should have 3 ex: columns.comment.description
fieldName = fieldNameParts[0];
if (fieldNameParts.length == 3) {
arrayFieldName = fieldNameParts[1];
arrayFieldValue = fieldNameParts[2];
}
}

MessageParser.EntityLink link =
new MessageParser.EntityLink(entityType, entityFQN, fieldName, arrayFieldName, arrayFieldValue);

// Create an automated post
String message = null;
switch (changeType) {
case ADD:
message =
String.format("Added %s: `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
break;
case UPDATE:
message =
String.format("Updated %s to `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
break;
case DELETE:
message = String.format("Deleted %s", arrayFieldValue != null ? arrayFieldValue : fieldName);
break;
default:
break;
}
Map<EntityLink, String> messages = ChangeEventParser.getFormattedMessages(changeDescription, entity, changeEvent);

// Create an automated thread
for (var link : messages.keySet()) {
threads.add(
new Thread()
.withId(UUID.randomUUID())
Expand All @@ -248,7 +203,7 @@ private List<Thread> getThreads(List<FieldChange> fields, Object entity, CHANGE_
.withAbout(link.getLinkString())
.withUpdatedBy(entityInterface.getUpdatedBy())
.withUpdatedAt(System.currentTimeMillis())
.withMessage(message));
.withMessage(messages.get(link)));
}

return threads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
public final class MessageParser {
Expand All @@ -30,6 +31,8 @@ private MessageParser() {}
// Pattern to match the following markdown entity links:
// <#E/{entityType}/{entityFQN}> -- <#E/table/bigquery_gcp.shopify.raw_product_catalog>
// <#E/{entityType}/{entityFQN}/{fieldName}> -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/description>
// <#E/{entityType}/{entityFQN}/{fieldName}/{arrayFieldName}>
// -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/columns/comment>
// <#E/{entityType}/{entityFQN}/{fieldName}/{arrayFieldName}/{arrayFieldValue}>
// -- <#E/table/bigquery_gcp.shopify.raw_product_catalog/columns/comment/description>
private static final Pattern ENTITY_LINK_PATTERN =
Expand Down Expand Up @@ -82,11 +85,11 @@ public EntityLink(
this.linkType = LinkType.ENTITY_ARRAY_FIELD;
this.fullyQualifiedFieldType = String.format("%s.%s.member", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s.%s", entityFqn, arrayFieldName, arrayFieldValue);
} else if (arrayFieldName != null) {
this.linkType = LinkType.ENTITY_ARRAY_FIELD;
this.fullyQualifiedFieldType = String.format("%s.%s.member", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s", entityFqn, arrayFieldName);
} else if (fieldName != null) {
if (arrayFieldName != null) {
// Only array field name is not supported
throw new IllegalArgumentException(invalidEntityLink());
}
this.fullyQualifiedFieldType = String.format("%s.%s", entityType, fieldName);
this.fullyQualifiedFieldValue = String.format("%s.%s", entityFqn, fieldName);

Expand All @@ -105,7 +108,10 @@ public String getLinkString() {
builder.append("/").append(fieldName);
}
if (linkType == LinkType.ENTITY_ARRAY_FIELD) {
builder.append("/").append(arrayFieldName).append("/").append(arrayFieldValue);
builder.append("/").append(arrayFieldName);
if (StringUtils.isNotEmpty(arrayFieldValue)) {
builder.append("/").append(arrayFieldValue);
}
}
builder.append(">");
return builder.toString();
Expand Down

0 comments on commit 68d09a8

Please sign in to comment.