Skip to content

Commit

Permalink
Use initial import for import_all_collection #177
Browse files Browse the repository at this point in the history
- Initial import with import_all_collection will list of the collection
of available in the database and import data directly from the
collection.
  • Loading branch information
richardwilly98 committed Dec 18, 2013
1 parent 4b37c00 commit d4bb11f
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/main/java/org/elasticsearch/river/mongodb/Slurper.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,21 @@ public void run() {
}

BSONTimestamp startTimestamp = null;
if (!definition.isSkipInitialImport() && !definition.isImportAllCollections()) {
if (!definition.isSkipInitialImport()) {
if (!riverHasIndexedFromOplog() && definition.getInitialTimestamp() == null) {
if (!isIndexEmpty()) {
MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);
break;
}
startTimestamp = doInitialImport();
if (definition.isImportAllCollections()) {
for(String name : slurpedDb.getCollectionNames()) {
DBCollection collection = slurpedDb.getCollection(name);
startTimestamp = doInitialImport(collection);
}
} else {
DBCollection collection = slurpedDb.getCollection(definition.getMongoCollection());
startTimestamp = doInitialImport(collection);
}
}
} else {
logger.info("Skip initial import from collection {}", definition.getMongoCollection());
Expand Down Expand Up @@ -154,21 +162,21 @@ protected boolean isIndexEmpty() {
* @throws InterruptedException
* if the blocking queue stream is interrupted while waiting
*/
protected BSONTimestamp doInitialImport() throws InterruptedException {
protected BSONTimestamp doInitialImport(DBCollection collection) throws InterruptedException {
// TODO: ensure the index type is empty
DBCollection slurpedCollection = slurpedDb.getCollection(definition.getMongoCollection());
// DBCollection slurpedCollection = slurpedDb.getCollection(definition.getMongoCollection());

logger.info("MongoDBRiver is beginning initial import of " + slurpedCollection.getFullName());
logger.info("MongoDBRiver is beginning initial import of " + collection.getFullName());
BSONTimestamp startTimestamp = getCurrentOplogTimestamp();
DBCursor cursor = null;
try {
if (definition.isDisableIndexRefresh()) {
updateIndexRefresh(definition.getIndexName(), -1L);
}
if (!definition.isMongoGridFS()) {
logger.info("Collection {} - count: {}", definition.getMongoCollection(), slurpedCollection.count());
logger.info("Collection {} - count: {}", definition.getMongoCollection(), collection.count());
long count = 0;
cursor = slurpedCollection.find(definition.getMongoCollectionFilter());
cursor = collection.find(definition.getMongoCollectionFilter());
while (cursor.hasNext()) {
DBObject object = cursor.next();
count++;
Expand Down

0 comments on commit d4bb11f

Please sign in to comment.