Skip to content

Commit db0dc31

Browse files
committed
Reindexing documents in background
1 parent 7c5765b commit db0dc31

File tree

3 files changed

+150
-95
lines changed

3 files changed

+150
-95
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package com.pannous.es.reindex;
2+
3+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
4+
import org.elasticsearch.action.bulk.BulkItemResponse;
5+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
6+
import org.elasticsearch.action.bulk.BulkResponse;
7+
import org.elasticsearch.action.index.IndexRequest;
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.client.Requests;
10+
import org.elasticsearch.common.StopWatch;
11+
import org.elasticsearch.common.logging.ESLogger;
12+
import org.elasticsearch.common.logging.Loggers;
13+
14+
import java.util.ArrayList;
15+
import java.util.Collection;
16+
import java.util.Collections;
17+
import java.util.List;
18+
19+
public class Indexer implements Runnable {
20+
21+
private final ESLogger logger;
22+
private Client client;
23+
private MySearchResponse rsp;
24+
private String newIndex;
25+
private String newType;
26+
private boolean withVersion;
27+
private float waitSeconds;
28+
29+
30+
public Indexer(Client client, MySearchResponse rsp, String newIndex, String newType, boolean withVersion, float waitSeconds) {
31+
32+
logger = Loggers.getLogger(this.getClass());
33+
34+
this.client = client;
35+
this.rsp = rsp;
36+
this.newIndex = newIndex;
37+
this.newType = newType;
38+
this.withVersion = withVersion;
39+
this.waitSeconds = waitSeconds;
40+
}
41+
42+
@Override
43+
public void run() {
44+
45+
boolean flushEnabled = false;
46+
long total = rsp.hits().totalHits();
47+
int collectedResults = 0;
48+
int failed = 0;
49+
while (true) {
50+
if (collectedResults > 0 && waitSeconds > 0) {
51+
try {
52+
Thread.sleep(Math.round(waitSeconds * 1000));
53+
} catch (InterruptedException ex) {
54+
break;
55+
}
56+
}
57+
StopWatch queryWatch = new StopWatch().start();
58+
int currentResults = rsp.doScoll();
59+
if (currentResults == 0)
60+
break;
61+
62+
MySearchHits res = rsp.hits();
63+
if (res == null)
64+
break;
65+
queryWatch.stop();
66+
StopWatch updateWatch = new StopWatch().start();
67+
failed += bulkUpdate(res, newIndex, newType, withVersion).size();
68+
if (flushEnabled)
69+
client.admin().indices().flush(new FlushRequest(newIndex)).actionGet();
70+
71+
updateWatch.stop();
72+
collectedResults += currentResults;
73+
logger.debug("Progress " + collectedResults + "/" + total
74+
+ ". Time of update:" + updateWatch.totalTime().getSeconds() + " query:"
75+
+ queryWatch.totalTime().getSeconds() + " failed:" + failed);
76+
}
77+
String str = "found " + total + ", collected:" + collectedResults
78+
+ ", transfered:" + (float) rsp.bytes() / (1 << 20) + "MB";
79+
if (failed > 0)
80+
logger.warn(failed + " FAILED documents! " + str);
81+
else
82+
logger.info(str);
83+
}
84+
85+
Collection<Integer> bulkUpdate(MySearchHits objects, String indexName,
86+
String newType, boolean withVersion) {
87+
BulkRequestBuilder brb = client.prepareBulk();
88+
for (MySearchHit hit : objects.getHits()) {
89+
if (hit.id() == null || hit.id().isEmpty()) {
90+
logger.warn("Skipped object without id when bulkUpdate:" + hit);
91+
continue;
92+
}
93+
94+
try {
95+
IndexRequest indexReq = Requests.indexRequest(indexName).type(newType).id(hit.id()).source(hit.source());
96+
if (withVersion)
97+
indexReq.version(hit.version());
98+
99+
brb.add(indexReq);
100+
} catch (Exception ex) {
101+
logger.warn("Cannot add object:" + hit + " to bulkIndexing action." + ex.getMessage());
102+
}
103+
}
104+
if (brb.numberOfActions() > 0) {
105+
BulkResponse rsp = brb.execute().actionGet();
106+
if (rsp.hasFailures()) {
107+
List<Integer> list = new ArrayList<Integer>(rsp.items().length);
108+
for (BulkItemResponse br : rsp.items()) {
109+
if (br.isFailed())
110+
list.add(br.itemId());
111+
}
112+
return list;
113+
}
114+
}
115+
return Collections.emptyList();
116+
}
117+
118+
}

src/main/java/com/pannous/es/reindex/ReIndexAction.java

Lines changed: 24 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
package com.pannous.es.reindex;
22

3-
import java.io.IOException;
4-
import java.util.ArrayList;
5-
import java.util.Collection;
6-
import java.util.Collections;
7-
import java.util.List;
8-
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
93
import org.elasticsearch.action.bulk.BulkItemResponse;
104
import org.elasticsearch.action.bulk.BulkRequestBuilder;
115
import org.elasticsearch.action.bulk.BulkResponse;
@@ -15,20 +9,23 @@
159
import org.elasticsearch.action.search.SearchType;
1610
import org.elasticsearch.client.Client;
1711
import org.elasticsearch.client.Requests;
18-
import org.elasticsearch.common.StopWatch;
1912
import org.elasticsearch.common.inject.Inject;
2013
import org.elasticsearch.common.settings.Settings;
2114
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2216
import org.elasticsearch.common.xcontent.XContentBuilder;
23-
import org.elasticsearch.rest.BaseRestHandler;
24-
import org.elasticsearch.rest.RestChannel;
25-
import org.elasticsearch.rest.RestController;
26-
import org.elasticsearch.rest.RestRequest;
27-
import org.elasticsearch.rest.XContentRestResponse;
28-
import org.elasticsearch.rest.XContentThrowableRestResponse;
29-
import static org.elasticsearch.rest.RestRequest.Method.*;
30-
import static org.elasticsearch.rest.RestStatus.*;
31-
import static org.elasticsearch.rest.action.support.RestXContentBuilder.*;
17+
import org.elasticsearch.rest.*;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import static org.elasticsearch.rest.RestRequest.Method.POST;
26+
import static org.elasticsearch.rest.RestRequest.Method.PUT;
27+
import static org.elasticsearch.rest.RestStatus.OK;
28+
import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder;
3229

3330
/**
3431
* Refeeds all the documents which matches the type and the (optional) query.
@@ -37,7 +34,8 @@
3734
*/
3835
public class ReIndexAction extends BaseRestHandler {
3936

40-
@Inject public ReIndexAction(Settings settings, Client client, RestController controller) {
37+
@Inject
38+
public ReIndexAction(Settings settings, Client client, RestController controller) {
4139
super(settings, client);
4240

4341
if (controller != null) {
@@ -47,7 +45,8 @@ public class ReIndexAction extends BaseRestHandler {
4745
}
4846
}
4947

50-
@Override public void handleRequest(RestRequest request, RestChannel channel) {
48+
@Override
49+
public void handleRequest(RestRequest request, RestChannel channel) {
5150
handleRequest(request, channel, null, false);
5251
}
5352

@@ -111,7 +110,7 @@ public void handleRequest(RestRequest request, RestChannel channel, String newTy
111110
}
112111

113112
public SearchRequestBuilder createScrollSearch(String oldIndexName, String oldType, String filter,
114-
int hitsPerPage, boolean withVersion, int keepTimeInMinutes) {
113+
int hitsPerPage, boolean withVersion, int keepTimeInMinutes) {
115114
SearchRequestBuilder srb = client.prepareSearch(oldIndexName).
116115
setTypes(oldType).
117116
setVersion(withVersion).
@@ -124,80 +123,14 @@ public SearchRequestBuilder createScrollSearch(String oldIndexName, String oldTy
124123
return srb;
125124
}
126125

127-
public int reindex(MySearchResponse rsp, String newIndex, String newType, boolean withVersion,
128-
float waitSeconds) {
129-
boolean flushEnabled = false;
130-
long total = rsp.hits().totalHits();
131-
int collectedResults = 0;
132-
int failed = 0;
133-
while (true) {
134-
if (collectedResults > 0 && waitSeconds > 0) {
135-
try {
136-
Thread.sleep(Math.round(waitSeconds * 1000));
137-
} catch (InterruptedException ex) {
138-
break;
139-
}
140-
}
141-
StopWatch queryWatch = new StopWatch().start();
142-
int currentResults = rsp.doScoll();
143-
if (currentResults == 0)
144-
break;
145-
146-
MySearchHits res = callback(rsp.hits());
147-
if (res == null)
148-
break;
149-
queryWatch.stop();
150-
StopWatch updateWatch = new StopWatch().start();
151-
failed += bulkUpdate(res, newIndex, newType, withVersion).size();
152-
if (flushEnabled)
153-
client.admin().indices().flush(new FlushRequest(newIndex)).actionGet();
154-
155-
updateWatch.stop();
156-
collectedResults += currentResults;
157-
logger.debug("Progress " + collectedResults + "/" + total
158-
+ ". Time of update:" + updateWatch.totalTime().getSeconds() + " query:"
159-
+ queryWatch.totalTime().getSeconds() + " failed:" + failed);
160-
}
161-
String str = "found " + total + ", collected:" + collectedResults
162-
+ ", transfered:" + (float) rsp.bytes() / (1 << 20) + "MB";
163-
if (failed > 0)
164-
logger.warn(failed + " FAILED documents! " + str);
165-
else
166-
logger.info(str);
167-
return collectedResults;
168-
}
169-
170-
Collection<Integer> bulkUpdate(MySearchHits objects, String indexName,
171-
String newType, boolean withVersion) {
172-
BulkRequestBuilder brb = client.prepareBulk();
173-
for (MySearchHit hit : objects.getHits()) {
174-
if (hit.id() == null || hit.id().isEmpty()) {
175-
logger.warn("Skipped object without id when bulkUpdate:" + hit);
176-
continue;
177-
}
126+
public Thread reindex(MySearchResponse rsp, String newIndex, String newType, boolean withVersion,
127+
float waitSeconds) {
178128

179-
try {
180-
IndexRequest indexReq = Requests.indexRequest(indexName).type(newType).id(hit.id()).source(hit.source());
181-
if (withVersion)
182-
indexReq.version(hit.version());
129+
Indexer indexer = new Indexer(client, rsp, newIndex, newType, withVersion, waitSeconds);
130+
Thread indexerThread = EsExecutors.daemonThreadFactory(settings, this.getClass().getCanonicalName()).newThread(indexer);
131+
indexerThread.start();
183132

184-
brb.add(indexReq);
185-
} catch (Exception ex) {
186-
logger.warn("Cannot add object:" + hit + " to bulkIndexing action." + ex.getMessage());
187-
}
188-
}
189-
if (brb.numberOfActions() > 0) {
190-
BulkResponse rsp = brb.execute().actionGet();
191-
if (rsp.hasFailures()) {
192-
List<Integer> list = new ArrayList<Integer>(rsp.items().length);
193-
for (BulkItemResponse br : rsp.items()) {
194-
if (br.isFailed())
195-
list.add(br.itemId());
196-
}
197-
return list;
198-
}
199-
}
200-
return Collections.emptyList();
133+
return indexerThread;
201134
}
202135

203136
protected MySearchHits callback(MySearchHits hits) {

src/test/java/com/pannous/es/reindex/ReIndexActionTester.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,10 @@ protected abstract MySearchResponse scrollSearch(String searchIndex, String sear
5858
refresh("oldtweets");
5959
assertThat(count("oldtweets"), equalTo(2L));
6060

61-
int res = action.reindex(scrollSearch("oldtweets", "tweet", ""), "tweets", "tweet", false, 0);
62-
assertThat(res, equalTo(2));
61+
Thread reindex = action.reindex(scrollSearch("oldtweets", "tweet", ""), "tweets", "tweet", false, 0);
62+
while(reindex.getState() != Thread.State.TERMINATED) {
63+
64+
}
6365
refresh("tweets");
6466
assertThat(count("tweets"), equalTo(2L));
6567

@@ -76,8 +78,10 @@ protected abstract MySearchResponse scrollSearch(String searchIndex, String sear
7678
add("oldtweets", "tweet", "{ \"name\" : \"peter test\", \"count\" : 2}");
7779
refresh("oldtweets");
7880
assertThat(count("oldtweets"), equalTo(2L));
79-
int res = action.reindex(scrollSearch("oldtweets", "tweet", "{ \"term\": { \"count\" : 2} }"), "tweets", "tweet", false, 0);
80-
assertThat(res, equalTo(1));
81+
Thread reindex = action.reindex(scrollSearch("oldtweets", "tweet", "{ \"term\": { \"count\" : 2} }"), "tweets", "tweet", false, 0);
82+
while(reindex.getState() != Thread.State.TERMINATED) {
83+
84+
}
8185
refresh("tweets");
8286
assertThat(count("tweets"), equalTo(1L));
8387
SearchResponse sr = client.prepareSearch("tweets").execute().actionGet();

0 commit comments

Comments
 (0)