Skip to content

Commit

Permalink
Enable Support for Status of Pipeline and IngestionPipeline in Elasti…
Browse files Browse the repository at this point in the history
…c Search (#16023)

* - Add Ingestion Pipeline supports search

* - Add Pipeline Status in Repository

* - Add PipelineStatus update on entity pipeline status updation

* - Reindex Pipeline Usage Entities

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
Co-authored-by: Karan Hotchandani <33024356+karanh37@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 30, 2024
1 parent d305479 commit 5954ecf
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public IngestionPipelineRepository(OpenMetadataApplicationConfig config) {
Entity.getCollectionDAO().ingestionPipelineDAO(),
PATCH_FIELDS,
UPDATE_FIELDS);

this.supportsSearch = true;
this.openMetadataApplicationConfig = config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.openmetadata.service.Entity.DASHBOARD;
import static org.openmetadata.service.Entity.DASHBOARD_DATA_MODEL;
import static org.openmetadata.service.Entity.MLMODEL;
import static org.openmetadata.service.Entity.PIPELINE;
import static org.openmetadata.service.Entity.SEARCH_INDEX;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.Entity.TOPIC;
Expand Down Expand Up @@ -140,38 +141,32 @@ private void addLineageToSearch(
Entity.getSearchRepository().getIndexMapping(toEntity.getType());
String destinationIndexName =
destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
Map<String, Object> relationshipDetails = new HashMap<>();
Map<String, Object> relationshipDetails =
buildRelationshipDetailsMap(fromEntity, toEntity, lineageDetails);
Pair<String, String> from = new ImmutablePair<>("_id", fromEntity.getId().toString());
Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString());
processLineageData(fromEntity, toEntity, lineageDetails, relationshipDetails);
searchClient.updateLineage(sourceIndexName, from, relationshipDetails);
searchClient.updateLineage(destinationIndexName, to, relationshipDetails);
}

private void processLineageData(
EntityReference fromEntity,
EntityReference toEntity,
LineageDetails lineageDetails,
Map<String, Object> relationshipDetails) {
Map<String, Object> fromDetails = new HashMap<>();
Map<String, Object> toDetails = new HashMap<>();
fromDetails.put("id", fromEntity.getId().toString());
fromDetails.put("type", fromEntity.getType());
fromDetails.put("fqn", fromEntity.getFullyQualifiedName());
toDetails.put("id", toEntity.getId().toString());
toDetails.put("type", toEntity.getType());
toDetails.put("fqn", toEntity.getFullyQualifiedName());
public static Map<String, Object> buildEntityRefMap(EntityReference entityRef) {
Map<String, Object> details = new HashMap<>();
details.put("id", entityRef.getId().toString());
details.put("type", entityRef.getType());
details.put("fqn", entityRef.getFullyQualifiedName());
return details;
}

public static Map<String, Object> buildRelationshipDetailsMap(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
Map<String, Object> relationshipDetails = new HashMap<>();
relationshipDetails.put(
"doc_id", fromEntity.getId().toString() + "-" + toEntity.getId().toString());
relationshipDetails.put("fromEntity", fromDetails);
relationshipDetails.put("toEntity", toDetails);
relationshipDetails.put("fromEntity", buildEntityRefMap(fromEntity));
relationshipDetails.put("toEntity", buildEntityRefMap(toEntity));
if (lineageDetails != null) {
relationshipDetails.put(
"pipeline",
JsonUtils.getMap(
CommonUtil.nullOrEmpty(lineageDetails.getPipeline())
? null
: lineageDetails.getPipeline()));
// Add Pipeline Details
addPipelineDetails(relationshipDetails, lineageDetails.getPipeline());
relationshipDetails.put(
"description",
CommonUtil.nullOrEmpty(lineageDetails.getDescription())
Expand All @@ -193,6 +188,25 @@ private void processLineageData(
"source",
CommonUtil.nullOrEmpty(lineageDetails.getSource()) ? null : lineageDetails.getSource());
}
return relationshipDetails;
}

public static void addPipelineDetails(
Map<String, Object> relationshipDetails, EntityReference pipelineRef) {
if (CommonUtil.nullOrEmpty(pipelineRef)) {
relationshipDetails.put(PIPELINE, JsonUtils.getMap(null));
} else {
Map<String, Object> pipelineMap;
if (pipelineRef.getType().equals(PIPELINE)) {
pipelineMap =
JsonUtils.getMap(
Entity.getEntity(pipelineRef, "pipelineStatus,tags,owner", Include.ALL));
} else {
pipelineMap = JsonUtils.getMap(Entity.getEntity(pipelineRef, "tags,owner", Include.ALL));
}
relationshipDetails.put("pipelineEntityType", pipelineRef.getType());
relationshipDetails.put(PIPELINE, pipelineMap);
}
}

private String validateLineageDetails(
Expand Down Expand Up @@ -220,13 +234,13 @@ public final String exportCsv(
boolean deleted,
String entityType)
throws IOException {
CsvDocumentation DOCUMENTATION = getCsvDocumentation("lineage");
List<CsvHeader> HEADERS = DOCUMENTATION.getHeaders();
Map lineageMap =
CsvDocumentation documentation = getCsvDocumentation("lineage");
List<CsvHeader> headers = documentation.getHeaders();
Map<String, Object> lineageMap =
Entity.getSearchRepository()
.searchLineageForExport(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType);
CsvFile csvFile = new CsvFile().withHeaders(HEADERS);
CsvFile csvFile = new CsvFile().withHeaders(headers);

addRecords(csvFile, lineageMap);
return CsvUtil.formatCsv(csvFile);
Expand Down Expand Up @@ -261,7 +275,7 @@ private String processColumnLineage(HashMap lineageMap) {
return "";
}

protected void addRecords(CsvFile csvFile, Map lineageMap) {
protected void addRecords(CsvFile csvFile, Map<String, Object> lineageMap) {
if (lineageMap.get("edges") != null && lineageMap.get("edges") instanceof Collection<?>) {
Collection collection = (Collection<HashMap>) lineageMap.get("edges");
HashSet<HashMap> edges = new HashSet<HashMap>(collection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,18 @@ public RestUtil.PutResponse<?> addPipelineStatus(
ChangeDescription change =
addPipelineStatusChangeDescription(
pipeline.getVersion(), pipelineStatus, storedPipelineStatus);
pipeline.setPipelineStatus(pipelineStatus);
pipeline.setChangeDescription(change);

// Update ES Indexes and usage of this pipeline index
searchRepository.updateEntity(pipeline);
searchRepository
.getSearchClient()
.reindexAcrossIndices("lineage.pipeline.fullyQualifiedName", pipeline.getEntityReference());

return new RestUtil.PutResponse<>(
Response.Status.OK,
pipeline
.withPipelineStatus(pipelineStatus)
.withUpdatedAt(System.currentTimeMillis())
.withChangeDescription(change),
pipeline.withPipelineStatus(pipelineStatus).withUpdatedAt(System.currentTimeMillis()),
ENTITY_UPDATED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.net.ssl.SSLContext;
Expand All @@ -16,6 +18,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.SSLUtil;
Expand All @@ -24,6 +27,7 @@
import os.org.opensearch.client.RequestOptions;

public interface SearchClient {
ExecutorService asyncExecutor = Executors.newFixedThreadPool(1);

String UPDATE = "update";

Expand Down Expand Up @@ -75,6 +79,10 @@ public interface SearchClient {

Response search(SearchRequest request) throws IOException;

default ExecutorService getAsyncExecutor() {
return asyncExecutor;
}

SearchResultListMapper listWithOffset(
String filter,
int limit,
Expand Down Expand Up @@ -118,6 +126,9 @@ Map<String, Object> searchLineageInternal(

void updateEntity(String indexName, String docId, Map<String, Object> doc, String scriptTxt);

/* This function takes in Entity Reference, Search for occurances of those entity across ES, and perform an update for that with reindexing the data from the database to ES */
void reindexAcrossIndices(String matchingKey, EntityReference sourceRef);

void deleteByScript(String indexName, String scriptTxt, Map<String, Object> params);

void deleteEntity(String indexName, String docId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,12 @@ public String getScriptWithParams(EntityInterface entity, Map<String, Object> fi
fieldAddParams.put(fieldChange.getName(), doc.get("votes"));
scriptTxt.append("ctx._source.votes = params.votes;");
}
if (fieldChange.getName().equalsIgnoreCase("pipelineStatus")) {
scriptTxt.append(
"if (ctx._source.containsKey('pipelineStatus')) { ctx._source.pipelineStatus = params.newPipelineStatus; } else { ctx._source['pipelineStatus'] = params.newPipelineStatus;}");
Map<String, Object> doc = JsonUtils.getMap(entity);
fieldAddParams.put("newPipelineStatus", doc.get("pipelineStatus"));
}
}
return scriptTxt.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package org.openmetadata.service.search;

import java.util.List;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class SearchRequest {
private final String query;
private final int from;
private int from;
private final int size;
private final String queryFilter;
private final String postFilter;
Expand All @@ -16,7 +20,6 @@ public class SearchRequest {
private final String fieldName;
private final String sortOrder;
private final List<String> includeSourceFields;

private final boolean getHierarchy;

public SearchRequest(ElasticSearchRequestBuilder builder) {
Expand All @@ -36,64 +39,6 @@ public SearchRequest(ElasticSearchRequestBuilder builder) {
this.getHierarchy = builder.getHierarchy;
}

// Getters for the attributes

public String getQuery() {
return query;
}

public int getFrom() {
return from;
}

public int getSize() {
return size;
}

public String getQueryFilter() {
return queryFilter;
}

public String getPostFilter() {
return postFilter;
}

public boolean fetchSource() {
return fetchSource;
}

public boolean trackTotalHits() {
return trackTotalHits;
}

public String getSortFieldParam() {
return sortFieldParam;
}

public boolean deleted() {
return deleted;
}

public String getIndex() {
return index;
}

public String getFieldName() {
return fieldName;
}

public String getSortOrder() {
return sortOrder;
}

public List<String> getIncludeSourceFields() {
return includeSourceFields;
}

public boolean getHierarchy() {
return getHierarchy;
}

// Builder class for ElasticSearchRequest

public static class ElasticSearchRequestBuilder {
Expand Down
Loading

0 comments on commit 5954ecf

Please sign in to comment.