Skip to content

Commit

Permalink
[bugfix] allows bulk inserts on aliases (#1286)
Browse files Browse the repository at this point in the history
## What does this PR do ?

This PR fixes the case where a bulk operation would fail on an elasticsearch index alias.

### How should this be manually tested?

```bash
#!/bin/bash
  
set -x
  
es_container=kuz_elasticsearch_1
  
es=$(docker inspect --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$es_container"):9200
kuzzle="localhost:7512"
  
curl -XPOST $kuzzle/index/_create
curl -XPUT $kuzzle/index/collection
  
curl -d '{"actions": {"add": {"index": "index", "alias": "alias"}}}' $es/_aliases
  
curl -H "Content-type: application/json" -d '{
    "bulkData": [
      {"index": {"_type": "collection", "_index": "alias"}},
      {"foo": "bar"}
    ]
}' $kuzzle/_bulk
```
  • Loading branch information
benoitvidis authored and scottinet committed Apr 17, 2019
1 parent d3c0d8e commit a95a819
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
19 changes: 15 additions & 4 deletions lib/services/elasticsearch.js
Expand Up @@ -665,11 +665,22 @@ class ElasticSearch extends Service {
updatedAt: dateNow
};

const esCache = {};

return this.client.indices.getMapping()
.then(mappings => {
// Keep a local cache of existents index/collections on elasticsearch
const reducer = (indexes, index) => Object.assign(indexes, { [index]: Object.keys(mappings[index].mappings) });
const esCache = Object.keys(mappings).reduce(reducer, {});
.then(raw => {
for (const index of Object.keys(raw)) {
esCache[index] = Object.keys(raw[index].mappings);
}

return this.client.cat.aliases({
format: 'json'
});
})
.then(aliases => {
for (const entry of aliases) {
esCache[entry.alias] = esCache[entry.index];
}

// set missing index & type if possible and add metadata
let lastAction; // NOSONAR
Expand Down
1 change: 1 addition & 0 deletions test/mocks/services/elasticsearchClient.mock.js
Expand Up @@ -25,6 +25,7 @@ class ElasticsearchClientMock {
this.scroll = sinon.stub().resolves();

this.cat = {
aliases: sinon.stub().resolves(),
indices: sinon.stub().resolves()
};

Expand Down
44 changes: 29 additions & 15 deletions test/services/implementations/elasticsearch.test.js
Expand Up @@ -970,20 +970,22 @@ describe('Test: ElasticSearch service', () => {
const
refreshIndexSpy = sinon.spy(elasticsearch, 'refreshIndexIfNeeded');

elasticsearch.kuzzle.indexCache.exists.resolves(true);
elasticsearch.client.bulk.resolves({});
const getMappingResult = {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([
{alias: 'alias', index}
]);

request.input.body = {
bulkData: [
{index: {_id: 1, _type: collection, _index: index}},
{firstName: 'foo'},
{index: {_id: 2, _type: collection, _index: index}},
{firstName: 'bar'},
{update: {_id: 1, _type: collection, _index: index}},
{update: {_id: 1, _type: collection, _index: 'alias'}},
{doc: {firstName: 'foobar'}},
{delete: {_id: 2, _type: collection, _index: index}}
]
Expand All @@ -997,18 +999,20 @@ describe('Test: ElasticSearch service', () => {
});

it('should add metadata to documents', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
elasticsearch.client.bulk.resolves({});
const getMappingResult = {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([
{alias: 'alias', index}
]);

request.input.body = {
bulkData: [
{index: {_id: 1, _type: collection, _index: index}},
{firstName: 'foo'},
{index: {_id: 2, _type: collection, _index: index}},
{index: {_id: 2, _type: collection, _index: 'alias'}},
{firstName: 'bar'},
{create: {_id: 3, _type: collection, _index: index}},
{firstName: 'gordon'},
Expand Down Expand Up @@ -1055,6 +1059,7 @@ describe('Test: ElasticSearch service', () => {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([]);

request.input.body = {
bulkData: []
Expand Down Expand Up @@ -1084,7 +1089,6 @@ describe('Test: ElasticSearch service', () => {
});

it('should raise a "Partial Error" response for bulk data import with some errors', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
elasticsearch.client.bulk.resolves({
errors: true,
items: [
Expand All @@ -1096,14 +1100,17 @@ describe('Test: ElasticSearch service', () => {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([
{alias: 'alias', index}
]);

request.input.body = {
bulkData: [
{index: {_id: 1, _type: collection, _index: index}},
{firstName: 'foo'},
{index: {_id: 2, _type: collection, _index: index}},
{firstName: 'bar'},
{update: {_id: 12, _type: collection, _index: index}},
{update: {_id: 12, _type: collection, _index: 'alias'}},
{doc: {firstName: 'foobar'}},
{update: {_id: 212, _type: collection, _index: index}},
{doc: {firstName: 'foobar'}}
Expand All @@ -1120,7 +1127,6 @@ describe('Test: ElasticSearch service', () => {
});

it('should override the type with the collection if one has been specified in the request', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
elasticsearch.client.bulk.resolves({
items: [
{index: {_id: 1, _index: index, _type: collection}},
Expand All @@ -1134,14 +1140,17 @@ describe('Test: ElasticSearch service', () => {
indexAlt: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([
{alias: 'alias', index}
]);

request.input.body = {
bulkData: [
{index: {_id: 1, _index: index}},
{firstName: 'foo'},
{index: {_id: 2, _index: 'indexAlt'}},
{firstName: 'bar'},
{update: {_id: 1, _index: index}},
{update: {_id: 1, _index: 'alias'}},
{doc: {firstName: 'foobar'}},
{delete: {_id: 2, _index: 'indexAlt'}}
]
Expand All @@ -1156,7 +1165,7 @@ describe('Test: ElasticSearch service', () => {
{firstName: 'foo'},
{index: {_id: 2, _index: 'indexAlt', _type: collection}},
{firstName: 'bar'},
{update: {_id: 1, _index: index, _type: collection}},
{update: {_id: 1, _index: 'alias', _type: collection}},
{doc: {firstName: 'foobar'}},
{delete: {_id: 2, _index: 'indexAlt', _type: collection}}
]);
Expand All @@ -1171,6 +1180,7 @@ describe('Test: ElasticSearch service', () => {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([]);

request.input.body = {
bulkData: [
Expand All @@ -1190,7 +1200,6 @@ describe('Test: ElasticSearch service', () => {
});

it('should return a rejected promise if bulk data try to write into internal index', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
request.input.body = {
bulkData: [
{index: {_id: 1, _index: index}},
Expand All @@ -1207,27 +1216,26 @@ describe('Test: ElasticSearch service', () => {
[kuzzle.internalEngine.index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([]);

elasticsearch.client.bulk.resolves({});

return should(elasticsearch.import(request)).be.rejectedWith(BadRequestError);
});

it('should return a rejected promise if body contains no bulkData parameter', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
request.input.body.bulkData = null;
return should(elasticsearch.import(request)).be.rejectedWith(BadRequestError);
});

it('should return a rejected promise if no type has been provided, locally or globally', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
request.input.resource.collection = null;

request.input.body = {
bulkData: [
{index: {_id: 1, _type: collection, _index: index}},
{firstName: 'foo'},
{index: {_id: 2, _type: collection, _index: index}},
{index: {_id: 2, _type: collection, _index: 'alias'}},
{firstName: 'bar'},
{update: {_id: 1, _index: index}},
{doc: {firstName: 'foobar'}},
Expand All @@ -1240,12 +1248,14 @@ describe('Test: ElasticSearch service', () => {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([
{alias: 'alias', index}
]);

return should(elasticsearch.import(request)).be.rejectedWith(BadRequestError);
});

it('should return a rejected promise if no index has been provided, locally or globally', () => {
elasticsearch.kuzzle.indexCache.exists.resolves(true);
request.input.resource.index = null;

request.input.body = {
Expand All @@ -1265,12 +1275,16 @@ describe('Test: ElasticSearch service', () => {
[index]: { mappings: { [collection]: {} } }
};
elasticsearch.client.indices.getMapping.resolves(getMappingResult);
elasticsearch.client.cat.aliases.resolves([
{alias: 'alias', index}
]);

return should(elasticsearch.import(request)).be.rejected();
return should(elasticsearch.import(request)).be.rejectedWith(BadRequestError);
});

it('should rejected if index and/or collection don\'t exist', () => {
elasticsearch.client.indices.getMapping.resolves({});
elasticsearch.client.cat.aliases.resolves([]);
request.input.resource.index = null;

request.input.body = {
Expand Down

0 comments on commit a95a819

Please sign in to comment.