Skip to content

Commit

Permalink
More ES refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>
  • Loading branch information
harshavamsi authored and nknize committed Dec 1, 2022
1 parent 73e74a9 commit f4968e6
Show file tree
Hide file tree
Showing 17 changed files with 233 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,7 @@ class OpenSearchFixturePlugin implements Plugin<Project> {
File scriptsDir = new File(project.buildDir, 'scripts')
scriptsDir.mkdirs()
File script = null
if (majorVersion <= 2) {
scriptsDir.mkdirs()
script = new File(scriptsDir, "increment.groovy").setText("ctx._source.counter+=1", 'UTF-8')
} else if (majorVersion == 5) {
scriptsDir.mkdirs()
script = new File(scriptsDir, "increment.painless").setText("ctx._source.counter = ctx._source.getOrDefault('counter', 0) + 1", 'UTF-8')
}
script = new File(scriptsDir, "increment.painless").setText("ctx._source.counter = ctx._source.getOrDefault('counter', 0) + 1", 'UTF-8')
if (script != null) {
integTestCluster.extraConfigFile("script", script)
}
Expand Down
173 changes: 115 additions & 58 deletions mr/src/main/java/org/opensearch/hadoop/rest/InitializationUtils.java

Large diffs are not rendered by default.

78 changes: 42 additions & 36 deletions mr/src/main/java/org/opensearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -111,7 +111,6 @@ public class RestClient implements Closeable, StatsAware {
mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, false);
}


private final Stats stats = new Stats();

public enum Health {
Expand All @@ -131,13 +130,13 @@ public RestClient(Settings settings) {

if (ConfigurationOptions.ES_BATCH_WRITE_RETRY_POLICY_SIMPLE.equals(retryPolicyName)) {
retryPolicyName = SimpleHttpRetryPolicy.class.getName();
}
else if (ConfigurationOptions.ES_BATCH_WRITE_RETRY_POLICY_NONE.equals(retryPolicyName)) {
} else if (ConfigurationOptions.ES_BATCH_WRITE_RETRY_POLICY_NONE.equals(retryPolicyName)) {
retryPolicyName = NoHttpRetryPolicy.class.getName();
}

this.retryPolicy = ObjectUtils.instantiate(retryPolicyName, settings);
// Assume that the opensearch major version is the latest if the version is not already present in the settings
// Assume that the opensearch major version is the latest if the version is not
// already present in the settings
this.clusterInfo = settings.getClusterInfoOrUnnamedLatest();
this.errorExtractor = new ErrorExtractor();
}
Expand Down Expand Up @@ -233,13 +232,17 @@ public int getResponseCode() {
}

/**
* Executes a single bulk operation against the provided resource, using the passed data as the request body.
* This method will retry bulk requests if the entire bulk request fails, but will not retry singular
* Executes a single bulk operation against the provided resource, using the
* passed data as the request body.
* This method will retry bulk requests if the entire bulk request fails, but
* will not retry singular
* document failures.
*
* @param resource target of the bulk request.
* @param data bulk request body. This body will be cleared of entries on any successful bulk request.
* @return a BulkActionResponse object that will detail if there were failing documents that should be retried.
* @param data bulk request body. This body will be cleared of entries on
* any successful bulk request.
* @return a BulkActionResponse object that will detail if there were failing
* documents that should be retried.
*/
public BulkActionResponse bulk(Resource resource, TrackingBytesArray data) {
// NB: dynamically get the stats since the transport can change
Expand Down Expand Up @@ -278,16 +281,15 @@ Iterator<Map> parseBulkActionResponse(Response response) {

public String postDocument(Resource resource, BytesArray document) throws IOException {
// If untyped, the type() method returns '_doc'
Request request = new SimpleRequest(Method.POST, null, resource.index() + "/" + resource.type(), null, document);
Request request = new SimpleRequest(Method.POST, null, resource.index() + "/" + resource.type(), null,
document);
Response response = execute(request, true);
Object id = parseContent(response.body(), "_id");
if (id == null || !StringUtils.hasText(id.toString())) {
throw new OpenSearchHadoopInvalidRequest(
String.format("Could not determine successful write operation. Request[%s > %s] Response[%s]",
request.method(), request.path(),
IOUtils.asString(response.body())
)
);
IOUtils.asString(response.body())));
}
return id.toString();
}
Expand All @@ -309,12 +311,10 @@ public List<List<Map<String, Object>>> targetShards(String index, String routing
Response res = executeNotFoundAllowed(req);
if (res.status() == HttpStatus.OK) {
shardsJson = parseContent(res.body(), "shards");
}
else {
} else {
shardsJson = Collections.emptyList();
}
}
else {
} else {
shardsJson = get(target, "shards");
}

Expand All @@ -325,7 +325,9 @@ public MappingSet getMappings(Resource indexResource) {
if (indexResource.isTyped()) {
return getMappings(indexResource.index() + "/_mapping/" + indexResource.type(), true);
} else {
return getMappings(indexResource.index() + "/_mapping" + (indexReadMissingAsEmpty ? "?ignore_unavailable=true" : ""), false);
return getMappings(
indexResource.index() + "/_mapping" + (indexReadMissingAsEmpty ? "?ignore_unavailable=true" : ""),
false);
}
}

Expand Down Expand Up @@ -371,7 +373,8 @@ public Map<String, Object> sampleForFields(Resource resource, Collection<String>
endpoint = resource.index() + "/" + resource.type();
}

Map<String, List<Map<String, Object>>> hits = parseContent(execute(GET, endpoint + "/_search", new BytesArray(sb.toString())).body(), "hits");
Map<String, List<Map<String, Object>>> hits = parseContent(
execute(GET, endpoint + "/_search", new BytesArray(sb.toString())).body(), "hits");
List<Map<String, Object>> docs = hits.get("hits");
if (docs == null || docs.isEmpty()) {
return Collections.emptyMap();
Expand Down Expand Up @@ -446,12 +449,12 @@ protected Response execute(Request request, boolean checkStatus, boolean retry)
protected Response executeNotFoundAllowed(Request req) {
Response res = execute(req, false);
switch (res.status()) {
case HttpStatus.OK:
break;
case HttpStatus.NOT_FOUND:
break;
default:
checkResponse(req, res);
case HttpStatus.OK:
break;
case HttpStatus.NOT_FOUND:
break;
default:
checkResponse(req, res);
}

return res;
Expand All @@ -460,10 +463,11 @@ protected Response executeNotFoundAllowed(Request req) {
private void checkResponse(Request request, Response response) {
if (response.hasFailed()) {
// check error first
String msg = null;
String msg = null;
// try to parse the answer
try {
OpenSearchHadoopException ex = errorExtractor.extractError(this.<Map> parseContent(response.body(), null));
OpenSearchHadoopException ex = errorExtractor
.extractError(this.<Map>parseContent(response.body(), null));
msg = (ex != null) ? ex.toString() : null;
if (response.isClientError()) {
msg = msg + "\n" + request.body();
Expand Down Expand Up @@ -503,6 +507,7 @@ public boolean delete(String indexOrType) {
Response res = executeNotFoundAllowed(req);
return (res.status() == HttpStatus.OK ? true : false);
}

public boolean deleteScroll(String scrollId) {
BytesArray body = new BytesArray(("{\"scroll_id\":[\"" + scrollId + "\"]}").getBytes(StringUtils.UTF_8));
Request req = new SimpleRequest(DELETE, null, "_search/scroll", body);
Expand Down Expand Up @@ -565,7 +570,7 @@ public long countIndexShard(String index, String shardId, QueryBuilder query) {
}

public long count(String index, String type, String shardId, QueryBuilder query) {
StringBuilder uri = new StringBuilder(index); // Only use index for counting in 7.X and up.
StringBuilder uri = new StringBuilder(index);
uri.append("/_search?size=0");
// always track total hits to get accurate count
uri.append("&track_total_hits=true");
Expand All @@ -587,7 +592,8 @@ public long count(String index, String type, String shardId, QueryBuilder query)
Number count = (Number) totalObject.get("value");
if (count != null) {
if (!"eq".equals(relation)) {
throw new OpenSearchHadoopParsingException("Count operation returned non-exact count of [" + relation + "][" + count + "]");
throw new OpenSearchHadoopParsingException(
"Count operation returned non-exact count of [" + relation + "][" + count + "]");
}
finalCount = count.longValue();
} else {
Expand Down Expand Up @@ -659,8 +665,7 @@ public EsToken createNewApiToken(String tokenName) {
content.get("api_key").toString(),
expirationTime,
remoteInfo.getClusterName().getName(),
remoteInfo.getMajorVersion()
);
remoteInfo.getMajorVersion());
}

public boolean cancelToken(EsToken tokenToCancel) {
Expand All @@ -670,14 +675,14 @@ public boolean cancelToken(EsToken tokenToCancel) {
}
String serviceForToken = tokenToCancel.getClusterName();
if (!StringUtils.hasText(serviceForToken)) {
throw new OpenSearchHadoopIllegalArgumentException("Attempting to cancel access token that has no service name");
throw new OpenSearchHadoopIllegalArgumentException(
"Attempting to cancel access token that has no service name");
}
if (!serviceForToken.equals(remoteInfo.getClusterName().getName())) {
throw new OpenSearchHadoopIllegalArgumentException(String.format(
"Attempting to cancel access token for a cluster named [%s] through a differently named cluster [%s]",
serviceForToken,
remoteInfo.getClusterName().getName()
));
remoteInfo.getClusterName().getName()));
}
FastByteArrayOutputStream out = new FastByteArrayOutputStream(256);
JacksonJsonGenerator generator = new JacksonJsonGenerator(out);
Expand All @@ -701,7 +706,7 @@ public ClusterInfo mainInfo() {
throw new OpenSearchHadoopIllegalStateException("Unable to retrieve OpenSearch main cluster info.");
}
String clusterName = result.get("cluster_name").toString();
String clusterUUID = (String)result.get("cluster_uuid");
String clusterUUID = (String) result.get("cluster_uuid");
@SuppressWarnings("unchecked")
Map<String, String> versionBody = (Map<String, String>) result.get("version");
if (versionBody == null || !StringUtils.hasText(versionBody.get("number"))) {
Expand Down Expand Up @@ -729,7 +734,8 @@ public Health getHealth(String index) {
sb.append(index);
String status = get(sb.toString(), "status");
if (status == null) {
throw new OpenSearchHadoopIllegalStateException("Could not determine index health, returned status was null. Bailing out...");
throw new OpenSearchHadoopIllegalStateException(
"Could not determine index health, returned status was null. Bailing out...");
}
return Health.valueOf(status.toUpperCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,6 @@ protected void writeObjectHeader(List<Object> list) {
commaMightBeNeeded = id(list, commaMightBeNeeded);
commaMightBeNeeded = addExtractorOrDynamicValue(list, getMetadataExtractorOrFallback(MetadataExtractor.Metadata.PARENT, parentExtractor), requestParameterNames.parent, commaMightBeNeeded);
commaMightBeNeeded = addExtractorOrDynamicValueAsFieldWriter(list, getMetadataExtractorOrFallback(MetadataExtractor.Metadata.ROUTING, routingExtractor), requestParameterNames.routing, commaMightBeNeeded);
commaMightBeNeeded = addExtractorOrDynamicValue(list, getMetadataExtractorOrFallback(MetadataExtractor.Metadata.TTL, ttlExtractor), "\"_ttl\":", commaMightBeNeeded);
commaMightBeNeeded = addExtractorOrDynamicValue(list, getMetadataExtractorOrFallback(MetadataExtractor.Metadata.TIMESTAMP, timestampExtractor), "\"_timestamp\":", commaMightBeNeeded);

// version & version_type fields
Object versionField = getMetadataExtractorOrFallback(MetadataExtractor.Metadata.VERSION, versionExtractor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -51,9 +51,9 @@ public static class StaticFieldExtractor implements FieldExtractor {
public Object field(Object target) {
return field;
}

protected Object field() {
return field;
return field;
}

public void setField(Object field) {
Expand All @@ -65,17 +65,21 @@ public boolean needsInit() {
return needsInit;
}
}

public void setSettings(Settings settings) {
this.settings = settings;
this.version = settings.getInternalVersionOrThrow();
this.settings = settings;
this.version = settings.getInternalVersionOrThrow();
}

/**
* A special field extractor meant to be used for metadata fields that are supported in
* some versions of Elasticsearch, but not others. In the case that a metadata field is
* unsupported for the configured version of Elasticsearch, this extractor which throws
* exceptions for using unsupported metadata tags is returned instead of the regular one.
* A special field extractor meant to be used for metadata fields that are
* supported in
* some versions of Elasticsearch, but not others. In the case that a metadata
* field is
* unsupported for the configured version of Elasticsearch, this extractor which
* throws
* exceptions for using unsupported metadata tags is returned instead of the
* regular one.
*/
private static class UnsupportedMetadataFieldExtractor extends StaticFieldExtractor {
private Metadata unsupportedMetadata;
Expand Down Expand Up @@ -103,18 +107,19 @@ public void reset() {
public FieldExtractor get(Metadata metadata) {
FieldExtractor fieldExtractor = pool.get(metadata);

if (fieldExtractor == null || (fieldExtractor instanceof StaticFieldExtractor && ((StaticFieldExtractor)fieldExtractor).needsInit())) {
if (fieldExtractor == null || (fieldExtractor instanceof StaticFieldExtractor
&& ((StaticFieldExtractor) fieldExtractor).needsInit())) {
Object value = getValue(metadata);
if (value == null) {
return null;
}

if (fieldExtractor == null) {
fieldExtractor = _createExtractorFor(metadata);
}
if(fieldExtractor instanceof StaticFieldExtractor && ((StaticFieldExtractor)fieldExtractor).needsInit()) {
((StaticFieldExtractor)fieldExtractor).setField(value);

if (fieldExtractor instanceof StaticFieldExtractor && ((StaticFieldExtractor) fieldExtractor).needsInit()) {
((StaticFieldExtractor) fieldExtractor).setField(value);
}
pool.put(metadata, fieldExtractor);
}
Expand All @@ -133,13 +138,13 @@ private FieldExtractor _createExtractorFor(Metadata metadata) {
}
return createExtractorFor(metadata);
}

protected FieldExtractor createExtractorFor(Metadata metadata) {
return new StaticFieldExtractor();
return new StaticFieldExtractor();
}

public abstract Object getValue(Metadata metadata);

@Override
public void setObject(Object entity) {
this.entity = entity;
Expand Down

0 comments on commit f4968e6

Please sign in to comment.