Skip to content

Commit

Permalink
Fix issue #91
Browse files Browse the repository at this point in the history
  • Loading branch information
richardwilly98 committed Jun 24, 2013
1 parent ebb9b1e commit c22fa8f
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 11 deletions.
1 change: 1 addition & 0 deletions resources/issues/91/01_create-river.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl -XPUT "http://localhost:9200/_river/river91/_meta" -d @mongodb-river-with-gridfs.json
4 changes: 4 additions & 0 deletions resources/issues/91/02_test-issue-91.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
%MONGO_HOME%\bin\mongofiles --host localhost:27018 --db mydb91 --collection mycollec91 --type applicaton/pdf put test-document.pdf
%MONGO_HOME%\bin\mongo < test-issue-91.js
pause
curl -XGET localhost:9200/mydb91/_search?q=metadata.titleDoc:test91
12 changes: 12 additions & 0 deletions resources/issues/91/mongodb-river-with-gridfs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "mongodb",
"mongodb": {
"db": "mydb91",
"collection": "fs",
gridfs: "true"
},
"index": {
"name": "mydb91",
"type": "type91"
}
}
Binary file added resources/issues/91/test-document.pdf
Binary file not shown.
3 changes: 3 additions & 0 deletions resources/issues/91/test-issue-91.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use mydb91
var doc = db.fs.files.findOne()
db.fs.files.update({"_id": doc._id}, {$set: {"metadata.titleDoc":"test91"}})
37 changes: 29 additions & 8 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,6 @@ private void processOplogEntry(final DBObject entry)
}

object = MongoDBHelper.applyExcludeFields(object, excludeFields);
// if (excludeFields != null) {
// for (String excludeField : excludeFields) {
// object.removeField(excludeField);
// }
// }

// Initial support for sharded collection -
// https://jira.mongodb.org/browse/SERVER-4333
Expand All @@ -1143,11 +1138,14 @@ private void processOplogEntry(final DBObject entry)
logger.trace("oplog processing item {}", entry);
}

String objectId = getObjectIdFromOplogEntry(entry);
if (mongoGridFS
&& namespace.endsWith(GRIDFS_FILES_SUFFIX)
&& (OPLOG_INSERT_OPERATION.equals(operation) || OPLOG_UPDATE_OPERATION
.equals(operation))) {
String objectId = object.get(MONGODB_ID_FIELD).toString();
if (objectId == null) {
throw new NullPointerException(MONGODB_ID_FIELD);
}
GridFS grid = new GridFS(mongo.getDB(mongoDb), mongoCollection);
GridFSDBFile file = grid.findOne(new ObjectId(objectId));
if (file != null) {
Expand All @@ -1160,11 +1158,14 @@ private void processOplogEntry(final DBObject entry)
}

if (object instanceof GridFSDBFile) {
logger.info("Add attachment: {}", object.get(MONGODB_ID_FIELD));
if (objectId == null) {
throw new NullPointerException(MONGODB_ID_FIELD);
}
logger.info("Add attachment: {}", objectId);
HashMap<String, Object> data = new HashMap<String, Object>();
data.put(IS_MONGODB_ATTACHMENT, true);
data.put(MONGODB_ATTACHMENT, object);
data.put(MONGODB_ID_FIELD, object.get(MONGODB_ID_FIELD));
data.put(MONGODB_ID_FIELD, objectId);
addToStream(operation, oplogTimestamp, data);
} else {
if (OPLOG_UPDATE_OPERATION.equals(operation)) {
Expand All @@ -1177,6 +1178,26 @@ private void processOplogEntry(final DBObject entry)
}
}

/*
* Extract "_id" from "o" if it fails try to extract from "o2"
*/
private String getObjectIdFromOplogEntry(DBObject entry) {
if (entry.containsField(OPLOG_OBJECT)) {
DBObject object = (DBObject) entry.get(OPLOG_OBJECT);
if (object.containsField(MONGODB_ID_FIELD)) {
return object.get(MONGODB_ID_FIELD).toString();
}
}
if (entry.containsField(OPLOG_UPDATE)) {
DBObject object = (DBObject) entry.get(OPLOG_UPDATE);
if (object.containsField(MONGODB_ID_FIELD)) {
return object.get(MONGODB_ID_FIELD).toString();
}
}
logger.trace("Oplog entry {}", entry);
return null;
}

private DBObject getIndexFilter(final BSONTimestamp timestampOverride) {
BSONTimestamp time = timestampOverride == null ? getLastTimestamp(mongoOplogNamespace)
: timestampOverride;
Expand Down
6 changes: 3 additions & 3 deletions src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ log4j.appender.file.Append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %-5p %c %x - %m%n

log4j.logger.test.elasticsearch=DEBUG
log4j.logger.test.elasticsearch=TRACE
#, out, file
log4j.logger.org.elasticsearch.river.mongodb=DEBUG
log4j.logger.org.elasticsearch.river.mongodb=TRACE
#, out, file
log4j.logger.org.elasticsearch.test.elasticsearch.plugin.river.mongodb=DEBUG
log4j.logger.org.elasticsearch.test.elasticsearch.plugin.river.mongodb=TRACE
#, out, file

0 comments on commit c22fa8f

Please sign in to comment.