Skip to content

Commit

Permalink
[Re-version] Second round of Versioning from legacy to OpenSearch ver…
Browse files Browse the repository at this point in the history
…sion support (#30)

Removes Legacy version numbering in favor of OpenSearch version 2.x+.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Nov 28, 2022
1 parent e1a87fb commit 5efa40e
Show file tree
Hide file tree
Showing 50 changed files with 215 additions and 1,635 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,8 @@ public void testBasicIndexWithExtractedRouting() throws Exception {
conf.set(ConfigurationOptions.ES_MAPPING_ROUTING, "number");

RestUtils.touch(index);
if (clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_7_X)) {
conf.set(ConfigurationOptions.OPENSEARCH_RESOURCE, index);
RestUtils.putMapping(index, type, StringUtils.toUTF("{\"_routing\": {\"required\":true}}"));
} else {
conf.set(ConfigurationOptions.OPENSEARCH_RESOURCE, target);
RestUtils.putMapping(index, type, StringUtils.toUTF("{\""+ type + "\":{\"_routing\": {\"required\":true}}}"));
}

conf.set(ConfigurationOptions.OPENSEARCH_RESOURCE, index);
RestUtils.putMapping(index, type, StringUtils.toUTF("{\"_routing\": {\"required\":true}}"));

runJob(conf);
}
Expand All @@ -251,13 +245,8 @@ public void testBasicIndexWithConstantRouting() throws Exception {
conf.set(ConfigurationOptions.ES_MAPPING_ROUTING, "<foobar/>");

RestUtils.touch(index);
if (clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_7_X)) {
conf.set(ConfigurationOptions.OPENSEARCH_RESOURCE, index);
RestUtils.putMapping(index, type, StringUtils.toUTF("{\"_routing\": {\"required\":true}}"));
} else {
conf.set(ConfigurationOptions.OPENSEARCH_RESOURCE, target);
RestUtils.putMapping(index, type, StringUtils.toUTF("{\""+ type + "\":{\"_routing\": {\"required\":true}}}"));
}
conf.set(ConfigurationOptions.OPENSEARCH_RESOURCE, index);
RestUtils.putMapping(index, type, StringUtils.toUTF("{\"_routing\": {\"required\":true}}"));

runJob(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testQueryBuilder() throws Exception {
sets.setInternalVersion(version);
Resource read = new Resource(settings, true);
SearchRequestBuilder qb =
new SearchRequestBuilder(version, settings.getReadMetadata() && settings.getReadMetadataVersion())
new SearchRequestBuilder(settings.getReadMetadata() && settings.getReadMetadataVersion())
.resource(read)
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,35 +290,20 @@ public static void validateSettingsForReading(Settings settings) {
}

public static void validateSettingsForWriting(Settings settings) {
OpenSearchMajorVersion version = settings.getInternalVersionOrThrow();

// Things that were removed in 6.x and forward
if (version.onOrAfter(OpenSearchMajorVersion.V_6_X)) {
// File Scripts
if (StringUtils.hasText(settings.getUpdateScriptFile())) {
throw new OpenSearchHadoopIllegalArgumentException("Cannot use file scripts on ES 6.x and above. Please use " +
"stored scripts with [" + ConfigurationOptions.ES_UPDATE_SCRIPT_STORED + "] instead.");
}

// Timestamp and TTL in index/updates
if (StringUtils.hasText(settings.getMappingTimestamp())) {
throw new OpenSearchHadoopIllegalArgumentException("Cannot use timestamps on index/update requests in ES 6.x " +
"and above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TIMESTAMP + "] setting.");
}
if (StringUtils.hasText(settings.getMappingTtl())) {
throw new OpenSearchHadoopIllegalArgumentException("Cannot use TTL on index/update requests in ES 6.x and " +
"above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TTL + "] setting.");
}
} else {
if (StringUtils.hasText(settings.getMappingTtl())) {
LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TTL + "] is deprecated! Support for [ttl] on " +
"indexing and update requests has been removed in ES 6.x and above!");
}
if (StringUtils.hasText(settings.getMappingTimestamp())) {
LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TIMESTAMP + "] is deprecated! Support for " +
"[timestamp] on indexing and update requests has been removed in ES 6.x and above!");
}
// File Scripts
if (StringUtils.hasText(settings.getUpdateScriptFile())) {
throw new OpenSearchHadoopIllegalArgumentException("Cannot use file scripts on ES 6.x and above. Please use " +
"stored scripts with [" + ConfigurationOptions.ES_UPDATE_SCRIPT_STORED + "] instead.");
}

// Timestamp and TTL in index/updates
if (StringUtils.hasText(settings.getMappingTimestamp())) {
throw new OpenSearchHadoopIllegalArgumentException("Cannot use timestamps on index/update requests in ES 6.x " +
"and above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TIMESTAMP + "] setting.");
}
if (StringUtils.hasText(settings.getMappingTtl())) {
throw new OpenSearchHadoopIllegalArgumentException("Cannot use TTL on index/update requests in ES 6.x and " +
"above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TTL + "] setting.");
}
}

Expand Down
36 changes: 6 additions & 30 deletions mr/src/main/java/org/opensearch/hadoop/rest/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,12 @@ public Resource(Settings settings, boolean read) {
boolean typeExists = slash >= 0;

OpenSearchMajorVersion opensearchMajorVersion = settings.getInternalVersionOrThrow();
if (opensearchMajorVersion.after(OpenSearchMajorVersion.V_7_X)) {
// Types can no longer the specified at all! Index names only!
if (typeExists) {
throw new OpenSearchHadoopIllegalArgumentException(String.format(
"Detected type name in resource [%s]. Remove type name to continue.",
resource
));
}
}
if (opensearchMajorVersion.onOrBefore(OpenSearchMajorVersion.V_7_X)) {
// Type can be specified, but a warning will be returned. An ES 7.X cluster will accept types if include_type_name is true,
// which we will set in the case of a type existing.
// This is onOrBefore because we want to print the deprecation log no matter what version of ES they're running on.
if (typeExists) {
LOG.warn(String.format(
"Detected type name in resource [%s]. Type names are deprecated and will be removed in a later release.",
resource
));
}
}
if (opensearchMajorVersion.onOrBefore(OpenSearchMajorVersion.V_6_X)) {
// Type is required for writing via the bulk API, but not for reading. No type on a read resource means to read all types.
// This is important even if we're on a 6.x cluster that enforces a single type per index. 6.x STILL supports opening old 5.x
// indices in order to ease the upgrade process!!!!
if (!read && !typeExists) {
throw new OpenSearchHadoopIllegalArgumentException(String.format(
"No type found; Types are required when writing in ES versions 6 and below. Expected [index]/[type], but got [%s]",
resource
));
}
// Types can no longer the specified at all! Index names only!
if (typeExists) {
throw new OpenSearchHadoopIllegalArgumentException(String.format(
"Detected type name in resource [%s]. Remove type name to continue.",
resource
));
}

// Parse out the type if it exists and is valid.
Expand Down
69 changes: 11 additions & 58 deletions mr/src/main/java/org/opensearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import static org.opensearch.hadoop.rest.Request.Method.HEAD;
import static org.opensearch.hadoop.rest.Request.Method.POST;
import static org.opensearch.hadoop.rest.Request.Method.PUT;
import static org.opensearch.hadoop.util.OpenSearchMajorVersion.V_2_X;

public class RestClient implements Closeable, StatsAware {

Expand Down Expand Up @@ -331,16 +330,12 @@ public MappingSet getMappings(Resource indexResource) {
}

public MappingSet getMappings(String query, boolean includeTypeName) {
// If the version is not at least 7, then the property isn't guaranteed to exist. If it is, then defer to the flag.
boolean requestTypeNameInResponse = clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_7_X) && includeTypeName;
// Response will always have the type name in it if node version is before 7, and if it is not, defer to the flag.
boolean typeNameInResponse = clusterInfo.getMajorVersion().before(OpenSearchMajorVersion.V_7_X) || includeTypeName;
if (requestTypeNameInResponse) {
if (includeTypeName) {
query = query + "?include_type_name=true";
}
Map<String, Object> result = get(query, null);
if (result != null && !result.isEmpty()) {
return FieldParser.parseMappings(result, typeNameInResponse);
return FieldParser.parseMappings(result, includeTypeName);
}
return null;
}
Expand All @@ -360,14 +355,8 @@ public Map<String, Object> sampleForFields(Resource resource, Collection<String>
// remove trailing ,
sb.setLength(sb.length() - 1);
sb.append("],\n\"query\":{");
sb.append("\"bool\": { \"must\":[");

if (clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_2_X)) {
sb.append("\"bool\": { \"must\":[");
}
else {
sb.append("\"constant_score\":{ \"filter\": { \"and\":[");

}
for (String field: fields) {
sb.append(String.format(Locale.ROOT, "\n{ \"exists\":{ \"field\":\"%s\"} },", field));
}
Expand Down Expand Up @@ -497,12 +486,7 @@ public InputStream scroll(String scrollId) {
// NB: dynamically get the stats since the transport can change
long start = network.transportStats().netTotalTime;
try {
BytesArray body;
if (clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_2_X)) {
body = new BytesArray("{\"scroll_id\":\"" + scrollId + "\"}");
} else {
body = new BytesArray(scrollId);
}
BytesArray body = new BytesArray("{\"scroll_id\":\"" + scrollId + "\"}");
// use post instead of get to avoid some weird encoding issues (caused by the long URL)
// do not retry the request on another node, because that can lead to ES returning a error or
// less data being returned than requested. See: https://github.com/elastic/elasticsearch-hadoop/issues/1302
Expand All @@ -520,12 +504,7 @@ public boolean delete(String indexOrType) {
return (res.status() == HttpStatus.OK ? true : false);
}
public boolean deleteScroll(String scrollId) {
BytesArray body;
if (clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_2_X)) {
body = new BytesArray(("{\"scroll_id\":[\"" + scrollId + "\"]}").getBytes(StringUtils.UTF_8));
} else {
body = new BytesArray(scrollId.getBytes(StringUtils.UTF_8));
}
BytesArray body = new BytesArray(("{\"scroll_id\":[\"" + scrollId + "\"]}").getBytes(StringUtils.UTF_8));
Request req = new SimpleRequest(DELETE, null, "_search/scroll", body);
Response res = executeNotFoundAllowed(req);
return (res.status() == HttpStatus.OK ? true : false);
Expand Down Expand Up @@ -588,11 +567,8 @@ public long countIndexShard(String index, String shardId, QueryBuilder query) {
public long count(String index, String type, String shardId, QueryBuilder query) {
StringBuilder uri = new StringBuilder(index); // Only use index for counting in 7.X and up.
uri.append("/_search?size=0");
// Option added in the 6.x line. This must be set to true or else in 7.X and 6/7 mixed clusters
// will return lower bounded count values instead of an accurate count.
if (clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_6_X)) {
uri.append("&track_total_hits=true");
}
// always track total hits to get accurate count
uri.append("&track_total_hits=true");
if (StringUtils.hasLength(shardId)) {
uri.append("&preference=_shards:");
uri.append(shardId);
Expand Down Expand Up @@ -647,12 +623,7 @@ public boolean isAlias(String query) {
public void putMapping(String index, String type, byte[] bytes) {
// create index first (if needed) - it might return 403/404
touch(index);

if (clusterInfo.getMajorVersion().after(OpenSearchMajorVersion.V_6_X)) {
execute(PUT, index + "/_mapping", new BytesArray(bytes));
} else {
execute(PUT, index + "/_mapping/" + type, new BytesArray(bytes));
}
execute(PUT, index + "/_mapping", new BytesArray(bytes));
}

public EsToken createNewApiToken(String tokenName) {
Expand Down Expand Up @@ -727,42 +698,24 @@ public ClusterInfo mainInfo() {
Response response = execute(GET, "", true);
Map<String, Object> result = parseContent(response.body(), null);
if (result == null) {
throw new OpenSearchHadoopIllegalStateException("Unable to retrieve opensearch main cluster info.");
throw new OpenSearchHadoopIllegalStateException("Unable to retrieve OpenSearch main cluster info.");
}
String clusterName = result.get("cluster_name").toString();
String clusterUUID = (String)result.get("cluster_uuid");
@SuppressWarnings("unchecked")
Map<String, String> versionBody = (Map<String, String>) result.get("version");
if (versionBody == null || !StringUtils.hasText(versionBody.get("number"))) {
throw new OpenSearchHadoopIllegalStateException("Unable to retrieve elasticsearch version.");
throw new OpenSearchHadoopIllegalStateException("Unable to retrieve OpenSearch version.");
}
String versionNumber = versionBody.get("number");
OpenSearchMajorVersion major = OpenSearchMajorVersion.parse(versionNumber);
if (false && major.before(OpenSearchMajorVersion.V_6_X)) {
// todo remove this in versioning update
throw new OpenSearchHadoopIllegalStateException("Invalid major version [" + major + "]. " +
"Version is lower than minimum required version [" + OpenSearchMajorVersion.V_6_X + "].");
} else if (major.onOrAfter(V_2_X)) {
String tagline = result.get("tagline").toString();
if (ELASTICSEARCH_TAGLINE.equals(tagline) == false) {
LOG.warn("Could not verify server is OpenSearch! Invalid main action response body format [tag].");
}
if (false && major.onOrAfter(V_2_X)) {
// todo remove this in versioning update
String buildFlavor = versionBody.get("build_flavor");
if (ELASTICSEARCH_BUILD_FLAVOR.equals(buildFlavor) == false) {
LOG.warn("Could not verify server is Elasticsearch! Invalid main action response body format [build_flavor].");
}
}
}
return new ClusterInfo(new ClusterName(clusterName, clusterUUID), OpenSearchMajorVersion.parse(versionNumber));
}

/**
* @deprecated use RestClient#mainInfo() instead.
*/
@Deprecated
public OpenSearchMajorVersion remoteEsVersion() {
public OpenSearchMajorVersion remoteOpenSearchVersion() {
return mainInfo().getMajorVersion();
}

Expand Down
10 changes: 1 addition & 9 deletions mr/src/main/java/org/opensearch/hadoop/rest/RestRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,6 @@ Scroll scroll(String query, BytesArray body, ScrollReader reader) throws IOExcep
Scroll scrollResult = reader.read(scroll);
if (scrollResult == null) {
log.info(String.format("No scroll for query [%s/%s], likely because the index is frozen", query, body));
} else if (settings.getInternalVersionOrThrow().onOrBefore(OpenSearchMajorVersion.V_2_X)) {
// On ES 2.X and before, a scroll response does not contain any hits to start with.
// Another request will be needed.
scrollResult = new Scroll(scrollResult.getScrollId(), scrollResult.getTotalHits(), false);
}
return scrollResult;
} finally {
Expand Down Expand Up @@ -428,11 +424,7 @@ public void delete() {
// delete each retrieved batch, keep routing in mind:
String baseFormat = "{\"delete\":{\"_id\":\"%s\"}}\n";
String routedFormat;
if (client.clusterInfo.getMajorVersion().onOrAfter(OpenSearchMajorVersion.V_7_X)) {
routedFormat = "{\"delete\":{\"_id\":\"%s\", \"routing\":\"%s\"}}\n";
} else {
routedFormat = "{\"delete\":{\"_id\":\"%s\", \"_routing\":\"%s\"}}\n";
}
routedFormat = "{\"delete\":{\"_id\":\"%s\", \"routing\":\"%s\"}}\n";

boolean hasData = false;
while (sq.hasNext()) {
Expand Down
3 changes: 1 addition & 2 deletions mr/src/main/java/org/opensearch/hadoop/rest/RestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.opensearch.hadoop.cfg.FieldPresenceValidation;
import org.opensearch.hadoop.cfg.Settings;
import org.opensearch.hadoop.rest.query.BoolQueryBuilder;
import org.opensearch.hadoop.rest.query.ConstantScoreQueryBuilder;
import org.opensearch.hadoop.rest.query.QueryBuilder;
import org.opensearch.hadoop.rest.query.QueryUtils;
import org.opensearch.hadoop.rest.query.RawQueryBuilder;
Expand Down Expand Up @@ -449,7 +448,7 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
boolean includeVersion = settings.getReadMetadata() && settings.getReadMetadataVersion();
Resource read = new Resource(settings, true);
SearchRequestBuilder requestBuilder =
new SearchRequestBuilder(clusterInfo.getMajorVersion(), includeVersion)
new SearchRequestBuilder(includeVersion)
.resource(read)
// Overwrite the index name from the resource to be that of the concrete index in the partition definition
.indices(partition.getIndex())
Expand Down

0 comments on commit 5efa40e

Please sign in to comment.