Skip to content

Commit

Permalink
fix(s3): Wrap eventing poll loop in try/catch and emit error metric (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajordens committed Sep 18, 2017
1 parent 68da42a commit b17bf1c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.awsobjectmapper.AmazonObjectMapperConfigurer;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.clouddriver.aws.bastion.BastionConfig;
import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider;
import com.netflix.spinnaker.front50.model.EventingS3ObjectKeyLoader;
Expand Down Expand Up @@ -120,13 +121,15 @@ public TemporarySQSQueue temporaryQueueSupport(Optional<ApplicationInfoManager>
public ObjectKeyLoader eventingS3ObjectKeyLoader(ObjectMapper objectMapper,
S3Properties s3Properties,
S3StorageService s3StorageService,
TemporarySQSQueue temporaryQueueSupport) {
TemporarySQSQueue temporaryQueueSupport,
Registry registry) {
return new EventingS3ObjectKeyLoader(
Executors.newFixedThreadPool(1),
objectMapper,
s3Properties,
temporaryQueueSupport,
s3StorageService,
registry,
true
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.front50.config.S3Properties;
import com.netflix.spinnaker.front50.model.events.S3Event;
import com.netflix.spinnaker.front50.model.events.S3EventWrapper;
Expand Down Expand Up @@ -62,8 +63,9 @@ public class EventingS3ObjectKeyLoader implements ObjectKeyLoader, Runnable {
private static final Executor executor = Executors.newFixedThreadPool(5);

private final ObjectMapper objectMapper;
private final S3StorageService s3StorageService;
private final TemporarySQSQueue temporarySQSQueue;
private final S3StorageService s3StorageService;
private final Registry registry;

private final Cache<KeyWithObjectType, Long> objectKeysByLastModifiedCache;
private final LoadingCache<ObjectType, Map<String, Long>> objectKeysByObjectTypeCache;
Expand All @@ -77,10 +79,12 @@ public EventingS3ObjectKeyLoader(ExecutorService executionService,
S3Properties s3Properties,
TemporarySQSQueue temporarySQSQueue,
S3StorageService s3StorageService,
Registry registry,
boolean scheduleImmediately) {
this.objectMapper = objectMapper;
this.temporarySQSQueue = temporarySQSQueue;
this.s3StorageService = s3StorageService;
this.registry = registry;

this.objectKeysByLastModifiedCache = CacheBuilder
.newBuilder()
Expand Down Expand Up @@ -173,19 +177,24 @@ public Map<String, Long> listObjectKeys(ObjectType objectType) {
@Override
public void run() {
while (pollForMessages) {
List<Message> messages = temporarySQSQueue.fetchMessages();

if (messages.isEmpty()) {
continue;
}
try {
List<Message> messages = temporarySQSQueue.fetchMessages();

messages.forEach(message -> {
S3Event s3Event = unmarshall(objectMapper, message.getBody());
if (s3Event != null) {
tick(s3Event);
if (messages.isEmpty()) {
continue;
}
temporarySQSQueue.markMessageAsHandled(message.getReceiptHandle());
});

messages.forEach(message -> {
S3Event s3Event = unmarshall(objectMapper, message.getBody());
if (s3Event != null) {
tick(s3Event);
}
temporarySQSQueue.markMessageAsHandled(message.getReceiptHandle());
});
} catch (Exception e) {
log.error("Failed to poll for messages", e);
registry.counter("s3.eventing.pollErrors").increment();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.spinnaker.front50.model

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.front50.config.S3Properties
import com.netflix.spinnaker.front50.model.events.S3Event
import org.springframework.scheduling.TaskScheduler
Expand All @@ -34,6 +35,7 @@ class EventingS3ObjectKeyLoaderSpec extends Specification {
)
def temporarySQSQueue = Mock(TemporarySQSQueue)
def s3StorageService = Mock(S3StorageService)
def registry = Mock(Registry)

@Subject
def objectKeyLoader = new EventingS3ObjectKeyLoader(
Expand All @@ -42,6 +44,7 @@ class EventingS3ObjectKeyLoaderSpec extends Specification {
s3Properties,
temporarySQSQueue,
s3StorageService,
registry,
false
)

Expand Down

0 comments on commit b17bf1c

Please sign in to comment.