Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ UPDATE glossary_term_entity
SET json = JSON_REMOVE(json, '$.relatedTerms')
WHERE JSON_EXTRACT(json, '$.relatedTerms') IS NOT NULL;

-- entity_extension version snapshots: handled by Java migration
-- migrateGlossaryTermVersionRelatedTermsToTermRelation (transforms in place to preserve history).

-- Backfill conceptMappings for existing glossary terms
UPDATE glossary_term_entity
SET json = JSON_SET(COALESCE(json, '{}'), '$.conceptMappings', JSON_ARRAY())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ UPDATE glossary_term_entity
SET json = (json::jsonb - 'relatedTerms')::json
WHERE jsonb_exists(json::jsonb, 'relatedTerms');

-- entity_extension version snapshots: handled by Java migration
-- migrateGlossaryTermVersionRelatedTermsToTermRelation (transforms in place to preserve history).

-- Backfill conceptMappings for existing glossary terms
UPDATE glossary_term_entity
SET json = jsonb_set(COALESCE(json::jsonb, '{}'::jsonb), '{conceptMappings}', '[]'::jsonb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ public void runDataMigration() {
+ "Webhook authentication may not work correctly until re-saved.",
e);
}
try {
MigrationUtil.migrateGlossaryTermVersionRelatedTermsToTermRelation(handle);
} catch (Exception e) {
LOG.error("v1130 glossaryTerm version relatedTerms transform failed; re-run to retry.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ public void runDataMigration() {
+ "Webhook authentication may not work correctly until re-saved.",
e);
}
try {
MigrationUtil.migrateGlossaryTermVersionRelatedTermsToTermRelation(handle);
} catch (Exception e) {
LOG.error("v1130 glossaryTerm version relatedTerms transform failed; re-run to retry.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.openmetadata.service.migration.utils.v1130;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.utils.JsonUtils;
Expand Down Expand Up @@ -129,4 +131,184 @@ public static void migrateWebhookSecretKeyToAuthType(Handle handle) {

LOG.info("Migrated {} event subscriptions with secretKey to authType", migratedCount);
}

private static final String SELECT_GLOSSARY_VERSIONS_MYSQL =
"SELECT id, extension, json FROM entity_extension "
+ "WHERE extension LIKE 'glossaryTerm.version.%' "
+ "AND JSON_CONTAINS_PATH(json, 'one', '$.relatedTerms[0].id') "
+ "AND (id > :id OR (id = :id AND extension > :extension)) "
+ "ORDER BY id, extension LIMIT :pageSize";

private static final String SELECT_GLOSSARY_VERSIONS_POSTGRES =
"SELECT id, extension, json::text AS json FROM entity_extension "
+ "WHERE extension LIKE 'glossaryTerm.version.%' "
+ "AND jsonb_exists((json::jsonb)->'relatedTerms'->0, 'id') "
+ "AND (id > :id OR (id = :id AND extension > :extension)) "
+ "ORDER BY id, extension LIMIT :pageSize";

private static final String UPDATE_VERSION_JSON_MYSQL =
"UPDATE entity_extension SET json = :json WHERE id = :id AND extension = :extension";

private static final String UPDATE_VERSION_JSON_POSTGRES =
"UPDATE entity_extension SET json = :json::jsonb WHERE id = :id AND extension = :extension";

private static final int VERSION_RELATED_TERMS_PAGE_SIZE = 500;
private static final String RELATED_TERMS = "relatedTerms";
private static final String CHANGE_DESCRIPTION = "changeDescription";

/**
* Wraps legacy {@code EntityReference[]} relatedTerms as {@code TermRelation[]} in
* glossaryTerm version snapshots — both top-level and inside changeDescription diff strings.
* Version reads bypass entity_relationship rehydration, so a strip would lose history. Idempotent.
*/
Comment thread
sonika-shah marked this conversation as resolved.
public static void migrateGlossaryTermVersionRelatedTermsToTermRelation(Handle handle) {
LOG.info("v1130: transforming legacy relatedTerms in glossaryTerm version snapshots");
boolean isMySQL = Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL());
String selectSql = isMySQL ? SELECT_GLOSSARY_VERSIONS_MYSQL : SELECT_GLOSSARY_VERSIONS_POSTGRES;
String updateSql = isMySQL ? UPDATE_VERSION_JSON_MYSQL : UPDATE_VERSION_JSON_POSTGRES;

String cursorId = "";
String cursorExtension = "";
long totalTransformed = 0;
long totalSkipped = 0;
int pageNumber = 0;
boolean morePages = true;

while (morePages) {
List<Map<String, Object>> rows =
handle
.createQuery(selectSql)
.bind("id", cursorId)
.bind("extension", cursorExtension)
.bind("pageSize", VERSION_RELATED_TERMS_PAGE_SIZE)
.mapToMap()
.list();

if (rows.isEmpty()) {
break;
}
pageNumber++;
morePages = rows.size() == VERSION_RELATED_TERMS_PAGE_SIZE;

PreparedBatch batch = handle.prepareBatch(updateSql);
int batchedUpdates = 0;
for (Map<String, Object> row : rows) {
String id = String.valueOf(row.get("id"));
String extension = String.valueOf(row.get("extension"));
String jsonStr = String.valueOf(row.get("json"));

cursorId = id;
cursorExtension = extension;

try {
ObjectNode root = (ObjectNode) JsonUtils.readTree(jsonStr);
if (transformSnapshot(root)) {
batch.bind("id", id).bind("extension", extension).bind("json", root.toString()).add();
batchedUpdates++;
}
} catch (Exception e) {
totalSkipped++;
LOG.warn(
"Skipping malformed glossaryTerm version snapshot id={} extension={}: {}",
id,
extension,
e.getMessage());
}
}

if (batchedUpdates > 0) {
batch.execute();
totalTransformed += batchedUpdates;
}

LOG.info(
"v1130 relatedTerms transform: page={} transformed={} skipped={} cursor=({},{})",
pageNumber,
totalTransformed,
totalSkipped,
cursorId,
cursorExtension);
}

LOG.info(
"v1130 relatedTerms transform done: pages={} transformed={} skipped={}",
pageNumber,
totalTransformed,
totalSkipped);
}

private static boolean transformSnapshot(ObjectNode root) {
boolean changed = false;
ArrayNode wrappedTopLevel = wrapLegacyRelatedTerms(root.get(RELATED_TERMS));
if (wrappedTopLevel != null) {
root.set(RELATED_TERMS, wrappedTopLevel);
changed = true;
}
JsonNode changeDescription = root.get(CHANGE_DESCRIPTION);
if (changeDescription instanceof ObjectNode cd) {
changed |= rewriteChangeDescriptionEntries(cd, "fieldsAdded", "newValue");
changed |= rewriteChangeDescriptionEntries(cd, "fieldsDeleted", "oldValue");
changed |= rewriteChangeDescriptionEntries(cd, "fieldsUpdated", "newValue");
changed |= rewriteChangeDescriptionEntries(cd, "fieldsUpdated", "oldValue");
}
return changed;
}

/** Wraps legacy items as TermRelation; returns null when nothing needs wrapping. */
private static ArrayNode wrapLegacyRelatedTerms(JsonNode array) {
if (array == null || !array.isArray() || array.isEmpty()) {
return null;
}
ArrayNode wrapped = JsonUtils.getObjectMapper().createArrayNode();
boolean changed = false;
for (JsonNode item : array) {
if (isWrappedTermRelation(item)) {
wrapped.add(item);
} else {
ObjectNode tr = JsonUtils.getObjectMapper().createObjectNode();
tr.set("term", item);
tr.put("relationType", "relatedTo");
wrapped.add(tr);
changed = true;
}
}
return changed ? wrapped : null;
}

private static boolean isWrappedTermRelation(JsonNode item) {
return item != null && item.isObject() && item.has("term");
}

/** Rewrites legacy relatedTerms items inside changeDescription diff JSON strings. */
private static boolean rewriteChangeDescriptionEntries(
ObjectNode changeDescription, String bucket, String valueField) {
JsonNode entries = changeDescription.get(bucket);
if (entries == null || !entries.isArray()) {
return false;
}
boolean anyChanged = false;
for (JsonNode entry : entries) {
if (!(entry instanceof ObjectNode entryObj)) {
continue;
}
JsonNode nameNode = entryObj.get("name");
if (nameNode == null || !RELATED_TERMS.equals(nameNode.asText())) {
continue;
}
JsonNode valueNode = entryObj.get(valueField);
if (valueNode == null || !valueNode.isTextual() || valueNode.asText().isEmpty()) {
continue;
}
try {
JsonNode parsed = JsonUtils.readTree(valueNode.asText());
ArrayNode wrapped = wrapLegacyRelatedTerms(parsed);
if (wrapped != null) {
entryObj.put(valueField, wrapped.toString());
anyChanged = true;
}
} catch (Exception ignored) {
}
}
return anyChanged;
}
}
Loading