Skip to content

Commit

Permalink
Fix #2837: Add a Feed Count API with entityLink filter support (#2860)
Browse files Browse the repository at this point in the history
  • Loading branch information
vivekratnavel committed Feb 19, 2022
1 parent f080481 commit 28807f8
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.SecurityContext;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.catalog.CatalogApplicationConfig;
Expand Down Expand Up @@ -53,6 +54,7 @@ public void init(CatalogApplicationConfig config, Jdbi jdbi) {

public Void process(ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
String method = requestContext.getMethod();
SecurityContext securityContext = requestContext.getSecurityContext();
try {
ChangeEvent changeEvent = getChangeEvent(method, responseContext);
if (changeEvent != null) {
Expand All @@ -75,7 +77,7 @@ public Void process(ContainerRequestContext requestContext, ContainerResponseCon
List<Thread> threads = getThreads(responseContext);
if (threads != null) {
for (var thread : threads) {
feedDao.create(thread);
feedDao.create(thread, securityContext);
}
}
}
Expand Down Expand Up @@ -225,12 +227,11 @@ private List<Thread> getThreads(List<FieldChange> fields, Object entity, CHANGE_
switch (changeType) {
case ADD:
message =
String.format("Added %s: `*%s*`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
String.format("Added %s: `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
break;
case UPDATE:
message =
String.format(
"Updated %s to `*%s*`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
String.format("Updated %s to `%s`", arrayFieldValue != null ? arrayFieldValue : fieldName, newFieldValue);
break;
case DELETE:
message = String.format("Deleted %s", arrayFieldValue != null ? arrayFieldValue : fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.openmetadata.catalog.jdbi3.AirflowPipelineRepository.AirflowPipelineEntityInterface;
import org.openmetadata.catalog.jdbi3.BotsRepository.BotsEntityInterface;
import org.openmetadata.catalog.jdbi3.ChartRepository.ChartEntityInterface;
import org.openmetadata.catalog.jdbi3.CollectionDAO.FieldRelationshipDAO.FromFieldMapper;
import org.openmetadata.catalog.jdbi3.CollectionDAO.TagDAO.TagLabelMapper;
import org.openmetadata.catalog.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper;
import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardEntityInterface;
Expand Down Expand Up @@ -516,6 +517,33 @@ interface FeedDAO {

@SqlUpdate("UPDATE thread_entity SET json = :json where id = :id")
void update(@Bind("id") String id, @Bind("json") String json);

@SqlQuery(
"SELECT entityLink, COUNT(*) count FROM field_relationship fr INNER JOIN thread_entity te ON fr.fromFQN=te.id "
+ "WHERE fr.toFQN LIKE CONCAT(:fqnPrefix, '%') AND fr.toType like concat(:toType, '%') AND fr.fromType = :fromType "
+ "AND fr.relation = :relation AND te.resolved= :isResolved "
+ "GROUP BY entityLink")
@RegisterRowMapper(CountFieldMapper.class)
List<List<String>> listCountByEntityLink(
@Bind("fqnPrefix") String fqnPrefix,
@Bind("fromType") String fromType,
@Bind("toType") String toType,
@Bind("relation") int relation,
@Bind("isResolved") boolean isResolved);

@SqlQuery(
"SELECT entityLink, COUNT(*) count FROM thread_entity WHERE (id IN (<threadIds>)) "
+ "AND resolved= :isResolved GROUP BY entityLink")
@RegisterRowMapper(CountFieldMapper.class)
List<List<String>> listCountByThreads(
@BindList("threadIds") List<String> threadIds, @Bind("isResolved") boolean isResolved);

class CountFieldMapper implements RowMapper<List<String>> {
@Override
public List<String> map(ResultSet rs, StatementContext ctx) throws SQLException {
return Arrays.asList(rs.getString("entityLink"), rs.getString("count"));
}
}
}

interface FieldRelationshipDAO {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.SecurityContext;
import org.apache.commons.lang3.StringUtils;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.feed.EntityLinkThreadCount;
import org.openmetadata.catalog.api.feed.ThreadCount;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.resources.feeds.FeedUtil;
import org.openmetadata.catalog.resources.feeds.MessageParser;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.Include;
import org.openmetadata.catalog.type.Post;
Expand All @@ -44,10 +50,13 @@ public FeedRepository(CollectionDAO dao) {
}

@Transaction
public Thread create(Thread thread) throws IOException, ParseException {
// Validate user creating thread
public Thread create(Thread thread, SecurityContext securityContext) throws IOException, ParseException {
String fromUser = thread.getPosts().get(0).getFrom();
dao.userDAO().findEntityByName(fromUser);

if (SecurityUtil.isSecurityEnabled(securityContext)) {
// Validate user creating thread if security is enabled
dao.userDAO().findEntityByName(fromUser);
}

// Validate about data entity is valid
EntityLink about = EntityLink.parse(thread.getAbout());
Expand Down Expand Up @@ -136,6 +145,46 @@ public Thread addPostToThread(String id, Post post) throws IOException {
return thread;
}

@Transaction
public ThreadCount getThreadsCount(String link, boolean isResolved) throws IOException {
ThreadCount threadCount = new ThreadCount();
List<List<String>> result;
List<EntityLinkThreadCount> entityLinkThreadCounts = new ArrayList<>();
AtomicInteger totalCount = new AtomicInteger(0);
if (link == null) {
// Get thread count of all entities
result =
dao.feedDAO()
.listCountByEntityLink(
StringUtils.EMPTY, "thread", StringUtils.EMPTY, Relationship.IS_ABOUT.ordinal(), isResolved);
} else {
EntityLink entityLink = EntityLink.parse(link);
EntityReference reference = EntityUtil.validateEntityLink(entityLink);
if (reference.getType().equals(Entity.USER)) {
List<String> threadIds = new ArrayList<>(getUserThreadIds(reference));
result = dao.feedDAO().listCountByThreads(threadIds, isResolved);
} else {
result =
dao.feedDAO()
.listCountByEntityLink(
entityLink.getFullyQualifiedFieldValue(),
"thread",
entityLink.getFullyQualifiedFieldType(),
Relationship.IS_ABOUT.ordinal(),
isResolved);
}
}
result.forEach(
l -> {
int count = Integer.parseInt(l.get(1));
entityLinkThreadCounts.add(new EntityLinkThreadCount().withEntityLink(l.get(0)).withCount(count));
totalCount.addAndGet(count);
});
threadCount.withTotalCount(totalCount.get());
threadCount.withCounts(entityLinkThreadCounts);
return threadCount;
}

@Transaction
public List<Thread> listThreads(String link) throws IOException {
if (link == null) {
Expand All @@ -157,23 +206,7 @@ public List<Thread> listThreads(String link) throws IOException {
// TODO remove hardcoding of thread
// For a user entitylink get created or replied relationships to the thread
if (reference.getType().equals(Entity.USER)) {
// TODO: Add user mentioned threads as well
threadIds.addAll(
dao.relationshipDAO()
.findTo(
reference.getName(),
reference.getType(),
Relationship.CREATED.ordinal(),
"thread",
toBoolean(Include.NON_DELETED)));
threadIds.addAll(
dao.relationshipDAO()
.findTo(
reference.getName(),
reference.getType(),
Relationship.REPLIED_TO.ordinal(),
"thread",
toBoolean(Include.NON_DELETED)));
threadIds.addAll(getUserThreadIds(reference));
} else {
// Only data assets are added as about
result =
Expand All @@ -196,7 +229,29 @@ public List<Thread> listThreads(String link) throws IOException {
}
}
// sort the list by thread updated timestamp before returning
threads.sort(Comparator.comparing(Thread::getUpdatedAt));
threads.sort(Comparator.comparing(Thread::getUpdatedAt, Comparator.reverseOrder()));
return threads;
}

private List<String> getUserThreadIds(EntityReference user) {
List<String> threadIds = new ArrayList<>();
// TODO: Add user mentioned threads as well
threadIds.addAll(
dao.relationshipDAO()
.findTo(
user.getName(),
user.getType(),
Relationship.CREATED.ordinal(),
"thread",
toBoolean(Include.NON_DELETED)));
threadIds.addAll(
dao.relationshipDAO()
.findTo(
user.getName(),
user.getType(),
Relationship.REPLIED_TO.ordinal(),
"thread",
toBoolean(Include.NON_DELETED)));
return threadIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.UUID;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -38,6 +39,7 @@
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.catalog.api.feed.CreateThread;
import org.openmetadata.catalog.api.feed.ThreadCount;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.jdbi3.CollectionDAO;
import org.openmetadata.catalog.jdbi3.FeedRepository;
Expand Down Expand Up @@ -96,7 +98,7 @@ public ThreadList list(
@Context UriInfo uriInfo,
@Parameter(
description = "Filter threads by entity link",
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}>"))
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}/{fieldName}>"))
@QueryParam("entityLink")
String entityLink)
throws IOException {
Expand All @@ -120,6 +122,33 @@ public Thread get(@Context UriInfo uriInfo, @PathParam("id") String id) throws I
return addHref(uriInfo, dao.get(id));
}

@GET
@Path("/count")
@Operation(
summary = "count of threads",
tags = "feeds",
description = "Get a count of threads, optionally filtered by `entityLink` for each of the entities.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Count of threads",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = ThreadCount.class)))
})
public ThreadCount getThreadCount(
@Context UriInfo uriInfo,
@Parameter(
description = "Filter threads by entity link",
schema = @Schema(type = "string", example = "<E#/{entityType}/{entityFQN}/{fieldName}>"))
@QueryParam("entityLink")
String entityLink,
@Parameter(description = "Filter threads by whether it is active or resolved", schema = @Schema(type = "boolean"))
@DefaultValue("false")
@QueryParam("isResolved")
Boolean isResolved)
throws IOException {
return dao.getThreadsCount(entityLink, isResolved);
}

@POST
@Operation(
summary = "Create a thread",
Expand All @@ -136,7 +165,7 @@ public Response create(@Context UriInfo uriInfo, @Context SecurityContext securi
throws IOException, ParseException {
Thread thread = getThread(securityContext, create);
FeedUtil.addPost(thread, new Post().withMessage(create.getMessage()).withFrom(create.getFrom()));
addHref(uriInfo, dao.create(thread));
addHref(uriInfo, dao.create(thread, securityContext));
return Response.created(thread.getHref()).entity(thread).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,14 @@ public static Invocation.Builder addHeaders(WebTarget target, Map<String, String
}
return target.request();
}

/**
* Returns true if authentication is enabled.
*
* @param securityContext security context
* @return true if jwt filter based authentication is enabled, false otherwise
*/
public static boolean isSecurityEnabled(SecurityContext securityContext) {
return !securityContext.getAuthenticationScheme().equals(SecurityContext.BASIC_AUTH);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"$id": "https://open-metadata.org/schema/api/feed/threadCount.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Count of threads related to an entity",
"description": "This schema defines the type for reporting the count of threads related to an entity.",
"type": "object",
"javaType": "org.openmetadata.catalog.api.feed.ThreadCount",
"definitions": {
"entityLinkThreadCount": {
"description": "Type used to return thread count per entity link.",
"type": "object",
"javaType": "org.openmetadata.catalog.api.feed.EntityLinkThreadCount",
"properties": {
"count": {
"description": "Count of threads for the given entity link.",
"type": "integer",
"minimum": 0
},
"entityLink": {
"$ref": "../../type/basic.json#/definitions/entityLink"
}
},
"required": ["count", "entityLink"],
"additionalProperties": false
}
},
"properties": {
"totalCount": {
"description": "Total count of all the threads.",
"type": "integer",
"minimum": 0
},
"counts": {
"description": "",
"type": "array",
"items": {
"$ref": "#/definitions/entityLinkThreadCount"
}
}
},
"required": ["counts", "totalCount"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
"format": "date-time"
},
"entityLink": {
"description": "Link to an entity or field within an entity using this format `<#E/{entities}/{entityType}/{field}/{fieldValue}`.",
"description": "Link to an entity or field within an entity using this format `<#E/{entities}/{entityType}/{field}/{arrayFieldName}/{arrayFieldValue}`.",
"type": "string",
"pattern": "^<#E/\\S+/\\S+>$"
"pattern": "^<#E\/\\S+\/\\S+>$"
},
"sqlQuery": {
"description": "SQL query statement. Example - 'select * from orders'.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.data.CreateTable;
import org.openmetadata.catalog.api.feed.CreateThread;
import org.openmetadata.catalog.api.feed.EntityLinkThreadCount;
import org.openmetadata.catalog.api.feed.ThreadCount;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.feed.Thread;
import org.openmetadata.catalog.entity.teams.Team;
Expand Down Expand Up @@ -196,6 +198,10 @@ void post_validThreadAndList_200(TestInfo test) throws IOException {
listThreads(TABLE_COLUMN_LINK, userAuthHeaders).getData().size()); // About TABLE Column Description
assertEquals(++totalThreadCount, listThreads(null, userAuthHeaders).getData().size()); // Overall threads
}

// Test the /api/v1/feed/count API
assertEquals(userThreadCount, listThreadsCount(USER_LINK, userAuthHeaders).getTotalCount());
assertEquals(tableThreadCount, getThreadCount(TABLE_LINK, userAuthHeaders));
}

@Test
Expand Down Expand Up @@ -304,4 +310,18 @@ public static ThreadList listThreads(String entityLink, Map<String, String> auth
target = entityLink != null ? target.queryParam("entityLink", entityLink) : target;
return TestUtils.get(target, ThreadList.class, authHeaders);
}

public static ThreadCount listThreadsCount(String entityLink, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = getResource("feed/count");
target = entityLink != null ? target.queryParam("entityLink", entityLink) : target;
return TestUtils.get(target, ThreadCount.class, authHeaders);
}

private int getThreadCount(String entityLink, Map<String, String> authHeaders) throws HttpResponseException {
List<EntityLinkThreadCount> linkThreadCount = listThreadsCount(entityLink, authHeaders).getCounts();
EntityLinkThreadCount threadCount =
linkThreadCount.stream().filter(l -> l.getEntityLink().equals(entityLink)).findFirst().orElseThrow();
return (int) threadCount.getCount();
}
}

0 comments on commit 28807f8

Please sign in to comment.