Skip to content

Commit

Permalink
Enable updating collector endpoint while Emitter is running (close #508)
Browse files Browse the repository at this point in the history
PR #509
  • Loading branch information
matus-tomlein committed Mar 9, 2022
1 parent 41405a5 commit f3cfe35
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public void testUpdatingEmitterSettings() throws InterruptedException {

assertTrue(emitter.getEmitterStatus());
emitter.setHttpMethod(POST);
assertEquals("https://com.acme/i", emitter.getEmitterUri());
emitter.setEmitterUri("com/foo");
assertEquals("https://com.acme/i", emitter.getEmitterUri());
assertEquals("https://com.acme/com.snowplowanalytics.snowplow/tp2", emitter.getEmitterUri());
emitter.setEmitterUri("com.foo");
assertEquals("https://com.foo/com.snowplowanalytics.snowplow/tp2", emitter.getEmitterUri());
emitter.setBufferOption(DefaultGroup);
assertEquals(HeavyGroup, emitter.getBufferOption());

Expand Down Expand Up @@ -377,6 +377,18 @@ public void testPauseAndResumeEmittingEvents() throws InterruptedException {
emitter.flush();
}

public void testUpdatesNetworkConnectionWhileRunning() throws InterruptedException {
Emitter emitter = new Emitter(getContext(), "com.acme", new Emitter.EmitterBuilder()
.eventStore(new MockEventStore()));

emitter.flush();
Thread.sleep(100);
assertTrue(emitter.getEmitterStatus()); // is running
emitter.setEmitterUri("new.uri"); // update while running
assertTrue(emitter.getEmitterStatus()); // is running
assertTrue(emitter.getEmitterUri().contains("new.uri"));
}

// Emitter Builder

public Emitter getEmitter(NetworkConnection networkConnection, BufferOption option) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import okhttp3.OkHttpClient;

Expand Down Expand Up @@ -74,7 +75,7 @@ public class Emitter {
private OkHttpClient client;

private boolean isCustomNetworkConnection;
private NetworkConnection networkConnection;
private final AtomicReference<NetworkConnection> networkConnection = new AtomicReference<NetworkConnection>();
private EventStore eventStore;
private int emptyCount;

Expand Down Expand Up @@ -323,16 +324,16 @@ public Emitter(@NonNull Context context, @NonNull String collectorUri, @Nullable
endpoint = protocol + endpoint;
}
this.uri = endpoint;
this.networkConnection = new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(endpoint)
setNetworkConnection(new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(endpoint)
.method(builder.httpMethod)
.tls(builder.tlsVersions)
.emitTimeout(builder.emitTimeout)
.customPostPath(builder.customPostPath)
.client(builder.client)
.build();
.build());
} else {
isCustomNetworkConnection = true;
this.networkConnection = builder.networkConnection;
setNetworkConnection(builder.networkConnection);
}

if (builder.threadPoolSize > 2) {
Expand All @@ -357,7 +358,7 @@ public void add(final @NonNull Payload payload) {
eventStore.add(payload);
if (isRunning.compareAndSet(false, true)) {
try {
attemptEmit();
attemptEmit(getNetworkConnection());
} catch (Throwable t) {
isRunning.set(false);
Logger.e(TAG, "Received error during emission process: %s", t);
Expand All @@ -374,7 +375,7 @@ public void flush() {
Executor.execute(TAG, () -> {
if (isRunning.compareAndSet(false, true)) {
try {
attemptEmit();
attemptEmit(getNetworkConnection());
} catch (Throwable t) {
isRunning.set(false);
Logger.e(TAG, "Received error during emission process: %s", t);
Expand Down Expand Up @@ -444,7 +445,7 @@ public boolean shutdown(long timeout) {
* + Otherwise will attempt to emit again
*/
@SuppressWarnings("all")
private void attemptEmit() {
private void attemptEmit(NetworkConnection networkConnection) {
if (isEmittingPaused.get()) {
Logger.d(TAG, "Emitter paused.");
isRunning.compareAndSet(true, false);
Expand All @@ -468,13 +469,13 @@ private void attemptEmit() {
} catch (InterruptedException e) {
Logger.e(TAG, "Emitter thread sleep interrupted: " + e.toString());
}
attemptEmit();
attemptEmit(getNetworkConnection()); // at this point we update network connection since it might be outdated after sleep
return;
}
emptyCount = 0;

List<EmitterEvent> events = eventStore.getEmittableEvents(sendLimit);
List<Request> requests = buildRequests(events);
List<Request> requests = buildRequests(events, networkConnection.getHttpMethod());
List<RequestResult> results = networkConnection.sendRequests(requests);

Logger.v(TAG, "Processing emitter results.");
Expand Down Expand Up @@ -507,12 +508,12 @@ private void attemptEmit() {

if (failureCount > 0 && successCount == 0) {
if (Util.isOnline(this.context)) {
Logger.e(TAG, "Ensure collector path is valid: %s", getEmitterUri());
Logger.e(TAG, "Ensure collector path is valid: %s", networkConnection.getUri());
}
Logger.e(TAG, "Emitter loop stopping: failures.");
isRunning.compareAndSet(true, false);
} else {
attemptEmit();
attemptEmit(getNetworkConnection()); // refresh network connection for next emit
}
}

Expand All @@ -524,19 +525,19 @@ private void attemptEmit() {
*
* @param events a list of EmittableEvents pulled
* from the database.
* @param httpMethod HTTP method to use (passed in order to ensure consistency within attemptEmit)
* @return a list of ready to send requests
*/
@NonNull
protected List<Request> buildRequests(@NonNull List<EmitterEvent> events) {
protected List<Request> buildRequests(@NonNull List<EmitterEvent> events, HttpMethod httpMethod) {
List<Request> requests = new ArrayList<>();
String sendingTime = Util.getTimestamp();
HttpMethod httpMethod = networkConnection.getHttpMethod();

if (httpMethod == GET) {
for (EmitterEvent event : events) {
Payload payload = event.payload;
addSendingTimeToPayload(payload, sendingTime);
boolean isOversize = isOversize(payload);
boolean isOversize = isOversize(payload, httpMethod);
Request request = new Request(payload, event.eventId, isOversize);
requests.add(request);
}
Expand All @@ -551,11 +552,11 @@ protected List<Request> buildRequests(@NonNull List<EmitterEvent> events) {
Long eventId = event.eventId;
addSendingTimeToPayload(payload, sendingTime);

if (isOversize(payload)) {
if (isOversize(payload, httpMethod)) {
Request request = new Request(payload, eventId, true);
requests.add(request);

} else if (isOversize(payload, postPayloadMaps)) {
} else if (isOversize(payload, postPayloadMaps, httpMethod)) {
Request request = new Request(postPayloadMaps, reqEventIds);
requests.add(request);

Expand Down Expand Up @@ -586,20 +587,22 @@ protected List<Request> buildRequests(@NonNull List<EmitterEvent> events) {
/**
* Calculate if the payload exceeds the maximum amount of bytes allowed on configuration.
* @param payload to send.
* @param httpMethod HTTP method to use (passed in order to ensure consistency within attemptEmit)
* @return weather the payload exceeds the maximum size allowed.
*/
private boolean isOversize(@NonNull Payload payload) {
return isOversize(payload, new ArrayList<>());
private boolean isOversize(@NonNull Payload payload, HttpMethod httpMethod) {
return isOversize(payload, new ArrayList<>(), httpMethod);
}

/**
* Calculate if the payload bundle exceeds the maximum amount of bytes allowed on configuration.
* @param payload to add om the payload bundle.
* @param previousPaylods already in the payload bundle.
* @param httpMethod HTTP method to use (passed in order to ensure consistency within attemptEmit)
* @return weather the payload bundle exceeds the maximum size allowed.
*/
private boolean isOversize(@NonNull Payload payload, @NonNull List<Payload> previousPaylods) {
long byteLimit = networkConnection.getHttpMethod() == GET ? byteLimitGet : byteLimitPost;
private boolean isOversize(@NonNull Payload payload, @NonNull List<Payload> previousPaylods, HttpMethod httpMethod) {
long byteLimit = httpMethod == GET ? byteLimitGet : byteLimitPost;
return isOversize(payload, byteLimit, previousPaylods);
}

Expand Down Expand Up @@ -682,15 +685,15 @@ public void setSendLimit(int sendLimit) {
* @param method the HttpMethod
*/
public void setHttpMethod(@NonNull HttpMethod method) {
if (!isCustomNetworkConnection && !isRunning.get()) {
if (!isCustomNetworkConnection) {
this.httpMethod = method;
this.networkConnection = new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
setNetworkConnection(new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
.method(httpMethod)
.tls(tlsVersions)
.emitTimeout(emitTimeout)
.customPostPath(customPostPath)
.client(client)
.build();
.build());
}
}

Expand All @@ -700,15 +703,15 @@ public void setHttpMethod(@NonNull HttpMethod method) {
* @param security the Protocol
*/
public void setRequestSecurity(@NonNull Protocol security) {
if (!isCustomNetworkConnection && !isRunning.get()) {
if (!isCustomNetworkConnection) {
this.requestSecurity = security;
this.networkConnection = new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
setNetworkConnection(new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
.method(httpMethod)
.tls(tlsVersions)
.emitTimeout(emitTimeout)
.customPostPath(customPostPath)
.client(client)
.build();
.build());
}
}

Expand All @@ -718,15 +721,15 @@ public void setRequestSecurity(@NonNull Protocol security) {
* @param uri new Emitter URI
*/
public void setEmitterUri(@NonNull String uri) {
if (!isCustomNetworkConnection && !isRunning.get()) {
if (!isCustomNetworkConnection) {
this.uri = uri;
this.networkConnection = new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
setNetworkConnection(new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
.method(httpMethod)
.tls(tlsVersions)
.emitTimeout(emitTimeout)
.customPostPath(customPostPath)
.client(client)
.build();
.build());
}
}

Expand All @@ -736,15 +739,15 @@ public void setEmitterUri(@NonNull String uri) {
* @param customPostPath new Emitter custom Post path
*/
public void setCustomPostPath(@Nullable String customPostPath) {
if (!isCustomNetworkConnection && !isRunning.get()) {
if (!isCustomNetworkConnection) {
this.customPostPath = customPostPath;
this.networkConnection = new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
setNetworkConnection(new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
.method(httpMethod)
.tls(tlsVersions)
.emitTimeout(emitTimeout)
.customPostPath(customPostPath)
.client(client)
.build();
.build());
}
}

Expand All @@ -754,15 +757,15 @@ public void setCustomPostPath(@Nullable String customPostPath) {
* @param emitTimeout new Emitter timeout
*/
public void setEmitTimeout(int emitTimeout) {
if (!isCustomNetworkConnection && !isRunning.get()) {
if (!isCustomNetworkConnection) {
this.emitTimeout = emitTimeout;
this.networkConnection = new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
setNetworkConnection(new OkHttpNetworkConnection.OkHttpNetworkConnectionBuilder(uri)
.method(httpMethod)
.tls(tlsVersions)
.emitTimeout(emitTimeout)
.customPostPath(customPostPath)
.client(client)
.build();
.build());
}
}

Expand All @@ -771,7 +774,7 @@ public void setEmitTimeout(int emitTimeout) {
*/
@NonNull
public String getEmitterUri() {
return networkConnection.getUri().toString();
return getNetworkConnection().getUri().toString();
}

/**
Expand Down Expand Up @@ -891,6 +894,10 @@ public int getEmitTimeout() {
*/
@Nullable
public NetworkConnection getNetworkConnection() {
return this.networkConnection;
return this.networkConnection.get();
}

private void setNetworkConnection(@NonNull NetworkConnection networkConnection) {
this.networkConnection.set(networkConnection);
}
}

0 comments on commit f3cfe35

Please sign in to comment.