Skip to content

Commit

Permalink
Fix for issue #79
Browse files Browse the repository at this point in the history
  • Loading branch information
richardwilly98 committed Jul 15, 2013
1 parent fc08f60 commit 3d90733
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 5 deletions.
1 change: 1 addition & 0 deletions resources/issues/79/01_create-river.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl -XPUT "http://localhost:9200/_river/river79/_meta" -d @mongodb-river-simple.json
5 changes: 5 additions & 0 deletions resources/issues/79/02_test-issue-79.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
%MONGO_HOME%\bin\mongo < test-issue-79-1.js
pause
%MONGO_HOME%\bin\mongo < test-issue-79-2.js
pause
curl -XGET localhost:9200/mydb79/_search?q=name:richard
14 changes: 14 additions & 0 deletions resources/issues/79/mongodb-river-simple.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "mongodb",
"mongodb": {
"db": "mydb79",
"collection": "mycollec79",
"options": {
"drop_collection": true
}
},
"index": {
"name": "mydb79",
"type": "type79"
}
}
7 changes: 7 additions & 0 deletions resources/issues/79/test-issue-79-1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use mydb79
var o = {
'name': 'issue79'
}

db.mycollec79.save(o)
db.mycollec79.drop()
5 changes: 5 additions & 0 deletions resources/issues/79/test-issue-79-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use mydb79
var o = {
'name': 'richard'
}
db.mycollec79.save(o)
17 changes: 12 additions & 5 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.bson.types.ObjectId;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -444,8 +446,8 @@ public void start() {
// TODO: include plugin version from pom.properties file.
// http://stackoverflow.com/questions/5270611/read-maven-properties-file-inside-jar-war-file
logger.info(
"starting mongodb stream. options: secondaryreadpreference [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]",
mongoSecondaryReadPreference, throttleSize, mongoGridFS,
"starting mongodb stream. options: secondaryreadpreference [{}], drop_collection [{}], throttlesize [{}], gridfs [{}], filter [{}], db [{}], collection [{}], script [{}], indexing to [{}]/[{}]",
mongoSecondaryReadPreference, dropCollection, throttleSize, mongoGridFS,
mongoFilter, mongoDb, mongoCollection, script, indexName,
typeName);
try {
Expand Down Expand Up @@ -858,9 +860,14 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
logger.info("Drop collection request [{}], [{}]",
index, type);
bulk.request().requests().clear();
client.admin().indices()
.prepareDeleteMapping(index).setType(type)
.execute().actionGet();
client.admin().indices().prepareRefresh(index).execute().actionGet();
ImmutableMap<String, MappingMetaData> mappings = client.admin().cluster().prepareState().execute().actionGet().getState().getMetaData().index(index).mappings();
if (mappings.containsKey(type)) {
DeleteMappingRequest deleteMappingRequest = new DeleteMappingRequest(index);
deleteMappingRequest.type(type);
client.admin().indices().deleteMapping(deleteMappingRequest);
}

deletedDocuments = 0;
updatedDocuments = 0;
insertedDocuments = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package test.elasticsearch.plugin.river.mongodb.simple;

import static org.elasticsearch.client.Requests.countRequest;
import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath;
import static org.elasticsearch.index.query.QueryBuilders.fieldQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.count.CountResponse;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -121,4 +124,58 @@ public void testDropCollection() throws Throwable {
}
}

@Test
public void testDropCollectionIssue79() throws Throwable {
logger.debug("Start testDropCollectionIssue79");
try {
String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));
assertThat(
getNode().client().admin().indices()
.prepareTypesExists(getIndex())
.setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(true));
String collectionName = mongoCollection.getName();
mongoCollection.drop();
Thread.sleep(wait);
assertThat(mongoDB.collectionExists(collectionName), equalTo(false));
Thread.sleep(wait);
refreshIndex();
assertThat(
getNode().client().admin().indices()
.prepareTypesExists(getIndex())
.setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(!dropCollectionOption));
dbObject = (DBObject) JSON.parse(mongoDocument);
String value = String.valueOf(System.currentTimeMillis());
dbObject.put("attribute1", value);
mongoCollection.insert(dbObject);
Thread.sleep(wait);
assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));
assertThat(
getNode().client().admin().indices()
.prepareTypesExists(getIndex())
.setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(true));
CountResponse countResponse = getNode()
.client()
.count(countRequest(getIndex())
.query(fieldQuery("attribute1", value))).actionGet();
assertThat(countResponse.getCount(), equalTo(1L));
} catch (Throwable t) {
logger.error("testDropCollectionIssue79 failed.", t);
t.printStackTrace();
throw t;
}
}
}

0 comments on commit 3d90733

Please sign in to comment.