Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

Commit

Permalink
feat: add ability to use mongoose query for synchronize (thanks @andfk)
Browse files Browse the repository at this point in the history
* Adds ability to use mongoose query for synchronize

* Adds query instance docs example and tests

* Fix typo

* CI prettier fix
  • Loading branch information
andfk authored and nodkz committed Feb 12, 2018
1 parent eca1729 commit c801945
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 6 deletions.
12 changes: 11 additions & 1 deletion README.md
Expand Up @@ -331,7 +331,7 @@ Book
});
```

`esSynchronise` use same parameters as [find](http://mongoosejs.com/docs/api.html#model_Model.find) method.
`esSynchronise` use same parameters as [find](http://mongoosejs.com/docs/api.html#model_Model.find) method or alternatively you can pass a mongoose query instance in order to use any specific methods like `.populate()`.
It allows to synchronize a subset of documents, modifying the default projection...

```javascript
Expand All @@ -342,6 +342,16 @@ Book
});
```

```javascript
// using a mongoose query instance, populating the author `ref`
const query = Book.find({author: 'Arthur C. Clarke'}).populate('author')
Book
.esSynchronize(query, '+resume')
.then(function () {
console.log('end.');
});
```

### Filtered Indexing

You can specify a filter function to index a model to Elasticsearch based on some specific conditions. If document satisfies conditions it will be added to the elastic index. If not, it will be removed from index.
Expand Down
18 changes: 13 additions & 5 deletions lib/index.js
Expand Up @@ -328,11 +328,19 @@ function synchronize(conditions, projection, options, callback) {
const esOptions = model.esOptions();
const batch =
esOptions.bulk && esOptions.bulk.batch ? esOptions.bulk.batch : 50;
const stream = model
.find(conditions || {}, projection, options)
.lean()
.batchSize(batch)
.stream();
let stream;
if (conditions instanceof mongoose.Query) {
stream = conditions
.lean()
.batchSize(batch)
.stream();
} else {
stream = model
.find(conditions || {}, projection, options)
.lean()
.batchSize(batch)
.stream();
}
const bulker = esOptions.bulker || new Bulker(esOptions.client);
let streamClosed = false;

Expand Down
92 changes: 92 additions & 0 deletions test/es2/esSynchronise.js
Expand Up @@ -587,6 +587,98 @@ describe('esSynchronise', () => {
});
});

it('should index the database using a mongoose query instance', () => {
const users = [];

const UserSchema = new mongoose.Schema({
name: String,
age: Number,
});

const UserModel = mongoose.model('User', UserSchema);

return UserModel.remove({})
.exec()
.then(() => {
for (let i = 0; i < 100; i++) {
users.push({
_id: mongoose.Types.ObjectId(),
name: `Bob${i}`,
age: i,
});
}
return UserModel.collection.insertMany(users);
})
.then(() => {
const UserPluginSchema = new mongoose.Schema({
name: String,
age: Number,
});

UserPluginSchema.plugin(plugin, { index: 'users', type: 'user' });

const UserPluginModel = mongoose.model(
'UserPlugin',
UserPluginSchema,
'users'
);

return utils
.deleteModelIndexes(UserPluginModel)
.then(() => {
return UserPluginModel.esCreateMapping();
})
.then(() => {
return UserPluginModel;
});
})
.then(UserPluginModel => {
let docSent = 0;
let sent = 0;
let error = 0;

UserPluginModel.on('es-bulk-error', () => {
error++;
});

UserPluginModel.on('es-bulk-sent', () => {
sent++;
});

UserPluginModel.on('es-bulk-data', () => {
docSent++;
});

const query = UserPluginModel.find({ age: { $gte: 90 } });
return new utils.Promise((resolve, reject) => {
UserPluginModel.esSynchronize(query, err => {
if (err) {
reject(err);
return;
}
expect(error).to.be.equal(0);
expect(docSent).to.be.equal(10);
expect(sent).to.be.equal(1);
resolve(UserPluginModel);
});
});
})
.then(UserPluginModel => {
return UserPluginModel.esSearch({ match_all: {} }).then(result => {
expect(result.hits.total).to.eql(10);
const ids = result.hits.hits.map(hit => {
return hit._id;
});
const expected = users.slice(-10).map(user => {
return user._id.toString();
});
ids.sort();
expected.sort();
expect(ids).to.eql(expected);
});
});
});

it('should index filtering', () => {
const users = [];

Expand Down
105 changes: 105 additions & 0 deletions test/es5/esSynchronise.js
Expand Up @@ -587,6 +587,111 @@ describe('esSynchronise', () => {
});
});

it('should index the database using a mongoose query instance', () => {
const users = [];

// beware: indexing a document require two entry in the buffer
// 10 doc in buffer = buffer.length = 20
const bulkSize = 20;

const UserSchema = new mongoose.Schema({
name: String,
age: Number,
});

const UserModel = mongoose.model('User', UserSchema);

return UserModel.remove({})
.exec()
.then(() => {
for (let i = 0; i < 100; i++) {
users.push({
_id: mongoose.Types.ObjectId(),
name: `Bob${i}`,
age: i,
});
}
return UserModel.collection.insertMany(users);
})
.then(() => {
const UserPluginSchema = new mongoose.Schema({
name: String,
age: Number,
});

UserPluginSchema.plugin(plugin, {
index: 'users',
type: 'user',
bulk: { size: bulkSize },
});

const UserPluginModel = mongoose.model(
'UserPlugin',
UserPluginSchema,
'users'
);

return utils
.deleteModelIndexes(UserPluginModel)
.then(() => {
return UserPluginModel.esCreateMapping();
})
.then(() => {
return UserPluginModel;
});
})
.then(UserPluginModel => {
let docSent = 0;
let sent = 0;
let error = 0;

UserPluginModel.on('es-bulk-error', () => {
error++;
});

UserPluginModel.on('es-bulk-sent', () => {
sent++;
});

UserPluginModel.on('es-bulk-data', () => {
docSent++;
});

const query = UserPluginModel.find();
return new utils.Promise((resolve, reject) => {
UserPluginModel.esSynchronize(query, err => {
if (err) {
reject(err);
return;
}
expect(error).to.be.equal(0);
expect(docSent).to.be.equal(users.length);
expect(sent).to.be.equal(Math.ceil(2 * users.length / bulkSize));
resolve(UserPluginModel);
});
});
})
.then(UserPluginModel => {
return utils.Promise.all(
users.map(user => {
return new utils.Promise((resolve, reject) => {
UserPluginModel.esSearch({ match: { _id: user._id.toString() } })
.then(result => {
expect(result.hits.total).to.eql(1);
const hit = result.hits.hits[0];
expect(hit._source.name).to.be.equal(user.name);
expect(hit._source.age).to.be.equal(user.age);
resolve();
})
.catch(err => {
reject(err);
});
});
})
);
});
});

it('should index filtering', () => {
const users = [];

Expand Down

0 comments on commit c801945

Please sign in to comment.