Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Too many thread pools created #842

Closed
medusar opened this issue Jun 27, 2016 · 11 comments
Closed

Too many thread pools created #842

medusar opened this issue Jun 27, 2016 · 11 comments

Comments

@medusar
Copy link

medusar commented Jun 27, 2016

Hi, recently, i have met a problem, when i use the JdbcImporter for a while , too many threads are created(about 2000 in 10 hours), and no thread is destroyed, after analyzed by jstack, i find that the threads are all named like 'pool-{threadpool-num}-thread-1', and when i check the code, i think problems are here in JdbcImportor:

private void execute() throws ExecutionException, InterruptedException {
        logger.debug("executing (queue={})", getQueue().size());
        executor = new MetricSimplePipelineExecutor<SettingsPipelineRequest, Pipeline<SettingsPipelineRequest>>()
                .setQueue(getQueue())
                .setConcurrency(settings.getAsInt("concurrency", 1))
                .setPipelineProvider(pipelineProvider())
                .prepare()
                .execute()
                .waitFor();
        logger.debug("execution completed");
    }

each time the execute method is invoked, a new ThreadPool is created, and the pool will never be destroyed, as time goes by, too many thread pools are created, and because the concurrency is 1, there are also too many threads!

@jprante
Copy link
Owner

jprante commented Jun 27, 2016

Thanks for the report.

Try not to use .setConcurrency(settings.getAsInt("concurrency", 1))

JDBCImporter is not ready to get reused in embedded environment, it requires JVM exit for concurrency mode (which is undocumented).

It will be fixed next release.

@medusar
Copy link
Author

medusar commented Jun 27, 2016

wow, maybe i have found the problem, thanks for you reply anyway.
There are two problems here, the first is one i mentioned above, the second one is in the GcMonitor, when i set the param "monitor.gc.enabled" to false, it worked properly, and no such many thread created again, but when i used the default value:true, there will also be too many threads ! my version is 2.1.1.2

@medusar
Copy link
Author

medusar commented Jun 28, 2016

i have changed two places to solve the problem.

the first one is in JdbcImporter:

private void execute() throws ExecutionException, InterruptedException {
        logger.debug("executing (queue={})", getQueue().size());

        if (executor != null) {
            try {
                executor.shutdown();
                logger.info("shutdown executor:{}", executor);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("error shutdown executor:{}", executor);
            }
        }

        executor = new MetricSimplePipelineExecutor<SettingsPipelineRequest, Pipeline<SettingsPipelineRequest>>()
                .setQueue(getQueue())
                .setConcurrency(settings.getAsInt("concurrency", 1))
                .setPipelineProvider(pipelineProvider())
                .prepare()
                .execute()
                .waitFor();
        logger.debug("execution completed");
    }

i have added the shutdown method to shutdown the ThreadPool created by last call.

the second one is in StandardSink:

private Ingest createIngest(Settings settings) {
        Settings.Builder settingsBuilder = Settings.settingsBuilder()
                .put("cluster.name", settings.get("elasticsearch.cluster.name", settings.get("elasticsearch.cluster", "elasticsearch")))
                .putArray("host", settings.getAsArray("elasticsearch.host"))
                .put("port", settings.getAsInt("elasticsearch.port", 9300))
                .put("sniff", settings.getAsBoolean("elasticsearch.sniff", false))
                .put("autodiscover", settings.getAsBoolean("elasticsearch.autodiscover", false))
                .put("name", "importer") // prevents lookup of names.txt, we don't have it
                .put("client.transport.ignore_cluster_name", false) // ignore cluster name setting
                .put("client.transport.ping_timeout", settings.getAsTime("elasticsearch.timeout", TimeValue.timeValueSeconds(5))) //  ping timeout
                .put("client.transport.nodes_sampler_interval", settings.getAsTime("elasticsearch.timeout", TimeValue.timeValueSeconds(5))); // for sniff sampling
        // optional found.no transport plugin
        if (settings.get("transport.type") != null) {
            settingsBuilder.put("transport.type", settings.get("transport.type"));
        }
        // copy found.no transport settings
        Settings foundTransportSettings = settings.getAsSettings("transport.found");
        if (foundTransportSettings != null) {
            Map<String, String> foundTransportSettingsMap = foundTransportSettings.getAsMap();
            for (Map.Entry<String, String> entry : foundTransportSettingsMap.entrySet()) {
                settingsBuilder.put("transport.found." + entry.getKey(), entry.getValue());
            }
        }

        //设置GcMonitor可以通过参数控制,默认不开启
        settingsBuilder.put("monitor.gc.enabled", settings.getAsBoolean("monitor.gc.enabled", false));

        return ClientBuilder.builder()
                .put(settingsBuilder.build())
                .put(ClientBuilder.MAX_ACTIONS_PER_REQUEST, settings.getAsInt("max_bulk_actions", 10000))
                .put(ClientBuilder.MAX_CONCURRENT_REQUESTS, settings.getAsInt("max_concurrent_bulk_requests",
                        Runtime.getRuntime().availableProcessors() * 2))
                .put(ClientBuilder.MAX_VOLUME_PER_REQUEST, settings.getAsBytesSize("max_bulk_volume", ByteSizeValue.parseBytesSizeValue("10m", "")))
                .put(ClientBuilder.FLUSH_INTERVAL, settings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5)))
                .setMetric(sinkMetric)
                .toBulkTransportClient();
    }

i haved added this line:

settingsBuilder.put("monitor.gc.enabled", settings.getAsBoolean("monitor.gc.enabled", false));

this lines allow me to control whether or not to switch on the 'GcMonitor', the Monitor will be scheduled each time the execute method invoked in JdbcImporter, so each time a new ThreadPool will be created.

so my problem : too many threadpools are created but no one is destroy,
and my solution:

  1. close last ThreadPool before create a new one in JdbcImporter.execute()
  2. close the GcMonitor, so that no thread pool will be created

the second solution is not a good practice because i think it is a bug which need to be fixed

@medusar
Copy link
Author

medusar commented Jun 28, 2016

and my config file:

"schedule" : "0 0/1 * * * ? *",

@backingwu
Copy link

@medusar 这个问题在你修改了代码之后解决了没?

@medusar
Copy link
Author

medusar commented Jun 29, 2016

@backingwu 解决了,就是按照我上面的方法

@wrxri
Copy link

wrxri commented Jul 6, 2016

when the next release is going to be ready ?

@jprante
Copy link
Owner

jprante commented Jul 7, 2016

Fixed in 2.3.3.1 release.

@liangcz
Copy link

liangcz commented Aug 11, 2016

this problem happens in method SimplePipelineExecutor.prepare, property executorService is null, it will be create a fixedThreadPool every times. and it seems not fixed in 2.3.4.0 release
i slove this problem this way

private void execute() throws ExecutionException, InterruptedException {
        logger.debug("executing (queue={})", getQueue().size());
        if (executorService == null) {
            executorService = Executors.newFixedThreadPool(settings.getAsInt("concurrency", 1));
        }
        executor = new MetricSimplePipelineExecutor<SettingsPipelineRequest, Pipeline<SettingsPipelineRequest>>()
                .setQueue(getQueue())
                .setConcurrency(settings.getAsInt("concurrency", 1))
                .setPipelineProvider(pipelineProvider())
                .setExecutorService(executorService)
                .prepare()
                .execute()
                .waitFor();
        logger.debug("execution completed");
    }

add setExecutorService method in MetricSimplePipelineExecutor and SimplePipelineExecutor,
add property executorService in class JDBCImporter, then init executorService here
in this way , we do not need to shutdow executor

@jprante
Copy link
Owner

jprante commented Aug 27, 2016

@liangcz thanks

@medusar
Copy link
Author

medusar commented Aug 7, 2019

Let me close this..

@medusar medusar closed this as completed Aug 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants