Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hello, may I know when this program will be upgraded for elasticsearch 7.x and Flink 1.10.0 #1

Open
liningwonder opened this issue May 28, 2020 · 0 comments

Comments

@liningwonder
Copy link

liningwonder commented May 28, 2020

it looks like

public void configure(Configuration configuration) {
        HttpHost[] httpHosts = new HttpHost[hosts.size()];
        for (int i = 0; i < hosts.size(); i++) {
            httpHosts[i] = new HttpHost(hosts.get(i), port, schema);
        }
        client = new RestHighLevelClient(RestClient.builder(httpHosts));

        //Listener
        BulkProcessor.Listener listener = new BulkProcessor.Listener(){
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                if (bulkResponse.hasFailures()) {
                    for (BulkItemResponse itemResp : bulkResponse.getItems()) {
                        if (itemResp.isFailed()) {
                            LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
                            failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
                        }
                    }
                    hasFailure.set(true);
                }
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                LOG.error(throwable.getMessage());
                failureThrowable.compareAndSet(null, throwable);
                hasFailure.set(true);
            }
        };
        //Builder
        BulkProcessor.Builder bulkProcessorBuilder =
                BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener);
        //Config
        if (userConfig.containsKey(Constant.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
            bulkProcessorBuilder.setBulkActions(stringToInt(userConfig.get(Constant.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)));
        }
        if (userConfig.containsKey(Constant.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
            bulkProcessorBuilder.setBulkSize(new ByteSizeValue(stringToInt(
                    Constant.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
        }
        if (userConfig.containsKey(Constant.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
            bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(stringToInt(Constant.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
        }
        //BulkProcessor
        bulkProcessor = bulkProcessorBuilder.build();
        requestIndexer = new BulkProcessorIndexer(bulkProcessor);
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant