Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch - pushResults and pushErrors doesn't push the data to downstream #45

Closed
patilgirish opened this issue Dec 1, 2016 · 4 comments
Assignees
Labels

Comments

@patilgirish
Copy link

In elasticsearch bulk operation pushResults and pushErrors have not effect as the data is not being pushed to the stream .

  1. options is stated as pushResults but internally code is checking for pushResult
    if (!self.options.pushResult && !self.options.pushErrors) return;

  2. results data is not being pushed to the stream , so cannot be caught in the downstream

.pipe(etl.elastic.index(esClient,index,'type', {
    concurrency : 5,
    pushResult : true,
    pushErrors : true
  }))
.pipe(etl.map(function (d) {
    console.log(d);
    return d
  }))

this doesn't work.

@ZJONSSON could you please take a look ?

@ZJONSSON
Copy link
Owner

ZJONSSON commented Dec 1, 2016

Thanks @girishgoudapatil. What elasticsearch client version / server version are you using? Are you using etl.collect above to assemble bulks above the etl.elastic.index? Can you verify that the data is actually uploaded to ElasticSearch?

Here is the corresponding test:
https://github.com/ZJONSSON/node-etl/blob/master/test/elastic-test.js#L31-L43

 it('pipes data into elasticsearch',function() {
    var i = 0;
    var upsert = etl.elastic.index(client,'test','test',{pushResult:true});

    return data.stream()
      .pipe(etl.map(function(d) {
        d._id = i++;
        return d;
      }))
      .pipe(etl.collect(100))
      .pipe(upsert)
      .promise()
      .then(function(d) {
        assert.equal(d[0].items.length,3);
        assert.deepEqual(d[0].items[0].body,data.data[0]);
      });
  });

https://circleci.com/gh/ZJONSSON/node-etl/138

  elastic bulk insert
    ✓ pipes data into elasticsearch (746ms)
    ✓ retrieves data back from elasticsearch (2036ms)
    ✓ streams results with elastic.find
    ✓ streams results with elastic.scroll (608ms)

@ZJONSSON ZJONSSON self-assigned this Dec 1, 2016
@ZJONSSON ZJONSSON added the bug label Dec 1, 2016
@patilgirish
Copy link
Author

@ZJONSSON thanks for the quick response.

I am using elasticsearch@11.0.1 module , below code returns empty dataset for me in the resolve.

  .pipe(etl.collect(500))
  .pipe(etl.elastic.index(esClient,index,'test', {
    concurrency : 5,
    pushResult : true,
    pushErrors : true
  }))
  .promise()
  .then(function (data) {
    console.log(data);
    return upload(tmpFilename, s3key);
  })

actually pushing everything in one go under resolve would be memory intensive if you are processing millions of records

@ZJONSSON
Copy link
Owner

ZJONSSON commented Dec 1, 2016

Ah I spotted the culprit. pushErrors currently supersedes pushResult (see here) - meaning that if you elect pushErrors then pushResults is ignored (you receive nothing downstream unless records errored). I think this should probably be the other way around to eliminate confusion - will post a patch.

Your code should work if you simply remove pushErrors from the options in your original example. (pushResults pushes both successes and errors). Let me know if that works

@ZJONSSON
Copy link
Owner

ZJONSSON commented Dec 2, 2016

See #46 - please reopen if the issue persists
(published on npm as etl@0.3.32)

@ZJONSSON ZJONSSON closed this as completed Dec 2, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants