This repository was archived by the owner on May 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 55
Add event sampling. Name threads #43
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
aeb88c2
bump eventsource dep
e1ac5f8
Implement bounded resource consumption.
8b0d540
Address PR comment.
7f0fa1d
Adjust some Gradle things including adding the sonatype repo.
2d8df5e
Add comment to gradle file
be5cc25
Enhance log statement.
236075c
Address PR comments.
57f4905
Minor tweaks. Address PR comment.
426cd52
Add some logging.
ba7e423
Add better checking for newer snapshot version.
3466fa9
Move start wait param into builder. Restore original constructor sign…
b7d87be
Revert accidentally added file.
63662d2
Add event sampling. Name threads.
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| package com.launchdarkly.client; | ||
|
|
||
| import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
| import com.google.gson.Gson; | ||
| import org.apache.http.HttpStatus; | ||
| import org.apache.http.client.methods.CloseableHttpResponse; | ||
|
|
@@ -14,22 +15,34 @@ | |
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Random; | ||
| import java.util.concurrent.*; | ||
|
|
||
| class EventProcessor implements Closeable { | ||
| private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory()); | ||
| ThreadFactory threadFactory = new ThreadFactoryBuilder() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an extra linefeed there?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This thread factory doesn't actually need to be a class member. The only reason it's assigned here is that we're not instantiating the |
||
| .setDaemon(true) | ||
| .setNameFormat("LaunchDarkly-EventProcessor-%d") | ||
| .build(); | ||
| private final ScheduledExecutorService scheduler = | ||
| Executors.newSingleThreadScheduledExecutor(threadFactory); | ||
| private final Random random = new Random(); | ||
| private final BlockingQueue<Event> queue; | ||
| private final String apiKey; | ||
| private final LDConfig config; | ||
| private final Consumer consumer; | ||
|
|
||
| EventProcessor(String apiKey, LDConfig config) { | ||
| this.apiKey = apiKey; | ||
| this.queue = new ArrayBlockingQueue<>(config.capacity); | ||
| this.consumer = new Consumer(config); | ||
| this.config = config; | ||
| this.scheduler.scheduleAtFixedRate(consumer, 0, config.flushInterval, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| boolean sendEvent(Event e) { | ||
| if (config.samplingInterval > 0 && random.nextInt(config.samplingInterval) != 0) { | ||
| return true; | ||
| } | ||
| return queue.offer(e); | ||
| } | ||
|
|
||
|
|
@@ -43,18 +56,8 @@ public void flush() { | |
| this.consumer.flush(); | ||
| } | ||
|
|
||
| static class DaemonThreadFactory implements ThreadFactory { | ||
| public Thread newThread(Runnable r) { | ||
| Thread thread = new Thread(r); | ||
| thread.setDaemon(true); | ||
| return thread; | ||
| } | ||
| } | ||
|
|
||
| class Consumer implements Runnable { | ||
| private final Logger logger = LoggerFactory.getLogger(Consumer.class); | ||
|
|
||
|
|
||
| private final CloseableHttpClient client; | ||
| private final LDConfig config; | ||
|
|
||
|
|
@@ -78,6 +81,7 @@ public void flush() { | |
| } | ||
|
|
||
| private void postEvents(List<Event> events) { | ||
| logger.debug("Posting " + events.size() + " event(s).."); | ||
| CloseableHttpResponse response = null; | ||
| Gson gson = new Gson(); | ||
| String json = gson.toJson(events); | ||
|
|
@@ -95,24 +99,21 @@ private void postEvents(List<Event> events) { | |
| if (status >= 300) { | ||
| if (status == HttpStatus.SC_UNAUTHORIZED) { | ||
| logger.error("Invalid API key"); | ||
| } | ||
| else { | ||
| } else { | ||
| logger.error("Unexpected status code: " + status); | ||
| } | ||
| } | ||
| else { | ||
| logger.debug("Successfully processed events"); | ||
| } else { | ||
| logger.debug("Successfully posted " + events.size() + " event(s)."); | ||
| } | ||
| } catch (IOException e) { | ||
| logger.error("Unhandled exception in LaunchDarkly client", e); | ||
| logger.error("Unhandled exception in LaunchDarkly client attempting to connect to URI: " + config.eventsURI, e); | ||
| } finally { | ||
| try { | ||
| if (response != null) response.close(); | ||
| } catch (IOException e) { | ||
| logger.error("Unhandled exception in LaunchDarkly client", e); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do exactly? Never encountered this before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From here: https://discuss.gradle.org/t/how-to-get-gradle-to-download-newer-snapshots-to-gradle-cache-when-using-an-ivy-repository/7344
In short it goes with the addition on lines 8-12. We're instructing gradle to always go look in a repo for the latest version of this dep. When this is pinned to a stable 1.0.0 it won't be needed, but is very handy for local development and potentially build/deploy systems that that might have older snapshots cached.
I am not a gradle expert, so this might not be the best way to do it.