Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
Merge pull request #59 from amihaiemil/58
Browse files Browse the repository at this point in the history
Refactored ElasticSearchRepository
  • Loading branch information
amihaiemil committed Oct 6, 2016
2 parents 120fa04 + 046ce44 commit 7642f6b
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 240 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>phantomjsdriver</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-http</artifactId>
<version>1.16</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand All @@ -71,6 +76,12 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-servlet-webserver</artifactId>
<version>1.9.64</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down
190 changes: 53 additions & 137 deletions src/main/java/com/amihaiemil/charles/ElasticSearchRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,29 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
package com.amihaiemil.charles;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import javax.json.Json;
import javax.json.JsonObject;

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.jcabi.http.Request;
import com.jcabi.http.Response;
import com.jcabi.http.request.ApacheRequest;
import com.jcabi.http.response.JsonResponse;
import com.jcabi.http.response.RestResponse;

/**
* Elasticsearch repository.
* Documents are put into an elastic search index using the _bulk API.
* Documents are put into an elastic search index making a HTTP POST to
* the _bulk API.<br><br>
*
* Use this class when you have your own ES instance setup.
*
* @author Mihai Andronache (amihaiemil@gmail.com)
*
Expand All @@ -70,23 +62,18 @@ public final class ElasticSearchRepository implements Repository {
*/
private static final String ES_INDEX_PATTERN =
"^(http:\\/\\/|https:\\/\\/)([a-zA-Z0-9._-]+)(:[0-9]{1,5})?\\/[a-zA-Z0-9-_.]+$";

/**
* Index information.
*/
private String indexInfo;

/**
* HTTP client.
*/
private CloseableHttpClient httpClient;
/**
* Request made to ES.
*/
private Request post;

/**
* Ctor.
* @param index ES index address.
*/
public ElasticSearchRepository(String index) {
this(index, HttpClientBuilder.create().build());
this(index, null, null);
}

/**
Expand All @@ -103,57 +90,21 @@ public ElasticSearchRepository(
+ "(http|https)://domain[:port]/indexname"
);
}
int portColonIndex = index.indexOf(':', 7);
int port = -1;
if(portColonIndex != -1) {
port = Integer.valueOf(
index.substring(
portColonIndex+1,
index.indexOf('/', portColonIndex)
)
);
}
String scheme = "http";
String domain;
int idxAfterScheme = 7;
if(index.startsWith("https://")) {
scheme = "https";
idxAfterScheme = 8;
if(username != null && password != null) {
String wCredentials;
if(index.startsWith("http://")) {
wCredentials = "http://" + username + ":" + password
+ "@" + index.substring(7);
} else {
wCredentials = "https://" + username + ":" + password
+ "@" + index.substring(8);
}
this.post = new ApacheRequest(wCredentials + "/_bulk?pretty")
.header("content-type", "application/json");
} else {
this.post = new ApacheRequest(index + "/_bulk?pretty")
.header("content-type", "application/json");
}
if(portColonIndex != -1) {
domain = index.substring(idxAfterScheme, portColonIndex);
} else {
domain = index.substring(idxAfterScheme, index.indexOf('/', 8));
}

CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(
new AuthScope(new HttpHost(domain, port, scheme)),
new UsernamePasswordCredentials(username, password)
);
this.httpClient = HttpClients.custom()
.setDefaultCredentialsProvider(credsProvider).build();
this.indexInfo = index;
}


/**
* Ctor
* @param index Index address
* @param httpClient HTTP client.
*/
public ElasticSearchRepository(
String index,
CloseableHttpClient httpClient
) {
if(!this.isIndexUrlValid(index)) {
throw new IllegalArgumentException(
"Wrong ES index url pattern! Expected "
+ "(http|https)://domain[:port]/indexname"
);
}
this.indexInfo = index;
this.httpClient = httpClient;
}

/**
Expand All @@ -171,26 +122,20 @@ public ElasticSearchRepository(
*/
@Override
public void export(List<WebPage> pages) throws DataExportException {
String uri = indexInfo + "/_bulk?pretty";
try {
List<JsonObject> docs = new ArrayList<JsonObject>();
for(WebPage page : pages){
docs.add(this.preparePage(page));
}
LOG.info("Sending " + docs.size() + " to the elasticsearch index: " + indexInfo);
LOG.info("Sending " + pages.size() + " to the elasticsearch index");
JsonObject jsonResponse = this.sendToIndex(
new EsBulkContent(docs).structure(),
uri
new EsBulkContent(pages).structure()
);
if(jsonResponse.getBoolean("errors", Boolean.TRUE)) {
LOG.error(
"There were errors during indexing to "
+ indexInfo +
"There were errors during indexing to " +
". Whole JSON response: " +
jsonResponse.toString()
);
throw new DataExportException("Errors when calling the _bulk api.");
}
LOG.info("Bulk indexing of the " + docs.size() + " documents, finished in " + jsonResponse.getInt("took") + " miliseconds!");
LOG.info("Bulk indexing of the " + pages.size() + " documents, finished in " + jsonResponse.getInt("took") + " miliseconds!");
} catch (IOException e) {
LOG.error(e.getMessage(), e);
throw new DataExportException(e.getMessage());
Expand All @@ -200,62 +145,33 @@ public void export(List<WebPage> pages) throws DataExportException {
/**
* POSTs the given json string to an elasticsearch index.
* @param jsonStructure Json structure to index
* @param uri REST endpoint.
* @return JSON response body.
* @throws IOException if something goes wrong.
*/
private JsonObject sendToIndex(String jsonStructure, String uri)
private JsonObject sendToIndex(String jsonStructure)
throws IOException {
HttpPost request = new HttpPost(uri);
request.addHeader("content-type", "application/json");
request.setEntity(new StringEntity(jsonStructure, ContentType.APPLICATION_JSON));

CloseableHttpResponse response = null;
try {
response = httpClient.execute(request);
int statuscode = response.getStatusLine().getStatusCode();
JsonObject jsonResp = Json.createReader(
response.getEntity().getContent()
).readObject();
if(statuscode != HttpStatus.SC_OK) {
LOG.warn(
"Http status response from elastic search index: " +
statuscode +
". Whole JSON response: " +
jsonResp

this.post = this.post
.method(Request.POST).body().set(jsonStructure).back();
Response resp = post.fetch();
int status = resp.as(RestResponse.class).status();
JsonObject json = resp.as(JsonResponse.class).json().readObject();
if(status != HttpStatus.SC_OK) {
LOG.warn(
"Http status response from elastic search index: " +
status +
". Whole JSON response: " +
json
);
if(status == HttpStatus.SC_INTERNAL_SERVER_ERROR) {
LOG.error(
"500 SERVER ERROR from elasticsearch /_bulk api. Whole JSON response " +
json.toString()
);
if(statuscode == HttpStatus.SC_INTERNAL_SERVER_ERROR) {
LOG.error(
"500 SERVER ERROR from elasticsearch /_bulk api. Whole JSON response " +
jsonResp.toString()
);
throw new IOException("500 SERVER ERROR from elasticsearch /_bulk api!");
}
throw new IOException("500 SERVER ERROR from elasticsearch /_bulk api!");
}
return jsonResp;
} finally {
IOUtils.closeQuietly(httpClient);
IOUtils.closeQuietly(response);
}
}

/**
* Converts the WebPage to a Json (with the URL is id) for the ES index.
* @param page WebPage to index.
* @return JSON which contains the id + json-formatted page
* @throws IOException In case there are problems when parsing the webpage
*/
private JsonObject preparePage(WebPage page) throws IOException {
JsonWebPage jsonPage = new JsonWebPage(page);
try {
JsonObject parsed = jsonPage.toJsonObject();
return Json.createObjectBuilder()
.add("id", page.getUrl())
.add("category", parsed.getString("category"))
.add("page", parsed).build();
} catch (JsonProcessingException e) {
throw new IOException (e);
}
return json;
}

/**
Expand Down
55 changes: 35 additions & 20 deletions src/main/java/com/amihaiemil/charles/EsBulkContent.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE

package com.amihaiemil.charles;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.json.Json;
import javax.json.JsonObject;

import com.fasterxml.jackson.core.JsonProcessingException;

/**
* Index documents in bulk.
* @author Mihai Andronache (amihaiemil@gmail.com)
Expand All @@ -39,36 +43,32 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
final class EsBulkContent {

/**
* JSON docs to be indexed.
* WebPages that go to the ES _bulk API,
*/
private List<JsonObject> docs;

EsBulkContent(List<JsonObject> docs) {
if(docs == null || docs.size() == 0) {
throw new IllegalArgumentException("There must be at least 1 document!");
}

this.docs = new ArrayList<JsonObject>();
for(JsonObject doc : docs) {
this.docs.add(doc);
}
}

private List<WebPage> pages;

/**
* Size of the documents list.
* Ctor.
* @param index Index where the pages will be stored.
* @param pages Given web pages.
*/
public int size() {
return this.docs.size();
public EsBulkContent(List<WebPage> pages) {
if(pages == null || pages.size() == 0) {
throw new IllegalArgumentException("There must be at least 1 page!");
}
this.pages = pages;
}

/**
* Pepare the json structure for bulk indexing.
* @param docs The json documents to be indexed.
* @return The json structure as a String.
* @throws IOException If something goes wrong while parsing.
*/
public String structure() {
public String structure() throws IOException {
StringBuilder sb = new StringBuilder();
for(JsonObject doc : docs) {
for(WebPage page : pages) {
JsonObject doc = this.preparePage(page);
String id = doc.getString("id", "");
String action_and_meta_data;
if(id.isEmpty()) {
Expand All @@ -83,4 +83,19 @@ public String structure() {
return sb.toString();
}

/**
* Converts the WebPage to a Json (with the URL as id) for the ES index.
* @param page WebPage to index.
* @return JSON which contains the id + json-formatted page
* @throws IOException In case there are problems when parsing the webpage
*/
private JsonObject preparePage(WebPage page) throws IOException {
JsonWebPage jsonPage = new JsonWebPage(page);
JsonObject parsed = jsonPage.toJsonObject();
return Json.createObjectBuilder()
.add("id", page.getUrl())
.add("category", parsed.getString("category"))
.add("page", parsed).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void validatesIndexUrl() {
* @return JsonObject search results
* @throws Exception If something goes wrong.
*/
public JsonObject search(String query, String indexInfo, boolean auth) throws Exception {
private JsonObject search(String query, String indexInfo, boolean auth) throws Exception {
HttpGet request = new HttpGet(indexInfo + "/_search?q=" + query);
request.addHeader("content-type", "application/json");

Expand Down Expand Up @@ -239,7 +239,7 @@ public JsonObject search(String query, String indexInfo, boolean auth) throws Ex
* @param url URL of the page.
* @return WebPage
*/
public WebPage webPage(String url) {
private WebPage webPage(String url) {
WebPage page = new SnapshotWebPage();
page.setUrl(url);
page.setLinks(new LinkedHashSet<Link>());
Expand Down
Loading

0 comments on commit 7642f6b

Please sign in to comment.