diff --git a/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventRateLimiter.java b/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventRateLimiter.java deleted file mode 100644 index d01230ef..00000000 --- a/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventRateLimiter.java +++ /dev/null @@ -1,106 +0,0 @@ -package fi.helsinki.cs.tmc.spyware; - -import fi.helsinki.cs.tmc.utilities.LazyHashMap; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.Callable; - -/** - * Buffers the latest version of an event and sends it at most with a certain interval. - * - *

- * An event rate limiter forwards events to another receiver. - * When an event with a key {@code K} comes in, - * a cooldown is activated. No other event with key {@code K} will be sent while - * the cooldown is active. When the cooldown for {@code K} expires, - * the most recent {@code K} received during the cooldown period is sent, - * if any. - * - *

- * Separating the concern of rate limiting here allows event sources to - * fire as many events as they want, as long as they are fine with some events - * being discarded. - */ -@Deprecated // Decided against using it for now. Might will use later, so won't delete yet (20120513). -public class EventRateLimiter implements EventReceiver { - public static final long DEFAULT_COOLDOWN = 30*1000; - - private class EventKeyRecord { - private long cooldownLength = DEFAULT_COOLDOWN; - private LoggableEvent newestUnsent = null; - private Timer cooldownTimer = null; - - public synchronized void receive(LoggableEvent ev) { - if (cooldownTimer == null) { - assert newestUnsent == null; - nextReceiver.receiveEvent(ev); - - cooldownTimer = new Timer("EventRateLimiter cooldown", true); - cooldownTimer.schedule(new TimerTask() { - @Override - public void run() { - forwardFromTimer(); - } - }, cooldownLength); - } else { - newestUnsent = ev; - } - } - - public synchronized void setCooldown(long cooldownLength) { - this.cooldownLength = cooldownLength; - } - - public synchronized void close() { //TODO: unit test - if (cooldownTimer != null) { - cooldownTimer.cancel(); - } - if (newestUnsent != null) { - nextReceiver.receiveEvent(newestUnsent); - newestUnsent = null; - } - } - - private synchronized void forwardFromTimer() { - if (newestUnsent != null) { - nextReceiver.receiveEvent(newestUnsent); - newestUnsent = null; - } - - cooldownTimer = null; - } - } - - private final EventReceiver nextReceiver; - - private final LazyHashMap recordsByKey = new LazyHashMap(new Callable() { - @Override - public EventKeyRecord call() throws Exception { - return new EventKeyRecord(); - } - }); - - public EventRateLimiter(EventReceiver nextReceiver) { - this.nextReceiver = nextReceiver; - } - - public synchronized void setCooldownForEventKey(String eventKey, long delayMillis) { - recordsByKey.get(eventKey).setCooldown(delayMillis); - } - - @Override - public synchronized void receiveEvent(LoggableEvent event) { - recordsByKey.get(event.getKey()).receive(event); - } - - /** - * Flush and close. - */ - @Override - public void close() { - for (EventKeyRecord rec : recordsByKey.values()) { - rec.close(); - } - recordsByKey.clear(); - } -} diff --git a/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventSendBuffer.java b/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventSendBuffer.java index 6ece39a4..81abe666 100644 --- a/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventSendBuffer.java +++ b/tmc-plugin/src/fi/helsinki/cs/tmc/spyware/EventSendBuffer.java @@ -1,5 +1,7 @@ package fi.helsinki.cs.tmc.spyware; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.collect.Iterables; import fi.helsinki.cs.tmc.data.Course; import fi.helsinki.cs.tmc.model.CourseDb; @@ -33,6 +35,7 @@ public class EventSendBuffer implements EventReceiver { public static final int DEFAULT_MAX_EVENTS = 64 * 1024; public static final int DEFAULT_AUTOSEND_THREHSOLD = DEFAULT_MAX_EVENTS / 2; public static final int DEFAULT_AUTOSEND_COOLDOWN = 30*1000; + public static final int DEFAULT_MAX_EVENTS_PER_SEND = 500; private Random random = new Random(); private SpywareSettings settings; @@ -45,6 +48,7 @@ public class EventSendBuffer implements EventReceiver { private int eventsToRemoveAfterSend = 0; private int maxEvents = DEFAULT_MAX_EVENTS; private int autosendThreshold = DEFAULT_AUTOSEND_THREHSOLD; + private int maxEventsPerSend = DEFAULT_MAX_EVENTS_PER_SEND; // Servers have POST size limits private Cooldown autosendCooldown; @@ -70,17 +74,17 @@ public EventSendBuffer(SpywareSettings settings, ServerAccess serverAccess, Cour } public void setSendingInterval(long interval) { + checkArgument(interval >= 0); this.sendingTask.setInterval(interval); } public void setSavingInterval(long interval) { + checkArgument(interval >= 0); this.savingTask.setInterval(interval); } public void setMaxEvents(int newMaxEvents) { - if (newMaxEvents <= 0) { - throw new IllegalArgumentException(); - } + checkArgument(newMaxEvents > 0); synchronized (sendQueue) { if (newMaxEvents < maxEvents) { @@ -107,7 +111,17 @@ public void setAutosendThreshold(int autosendThreshold) { } public void setAutosendCooldown(long durationMillis) { - this.autosendCooldown.setDurationMillis(durationMillis); + checkArgument(durationMillis > 0); + synchronized (sendQueue) { + this.autosendCooldown.setDurationMillis(durationMillis); + } + } + + public void setMaxEventsPerSend(int maxEventsPerSend) { + checkArgument(maxEventsPerSend > 0); + synchronized (sendQueue) { + this.maxEventsPerSend = maxEventsPerSend; + } } public void sendNow() { @@ -174,9 +188,6 @@ public void close() { private SingletonTask sendingTask = new SingletonTask(new Runnable() { - // Sending too many at once may go over the server's POST size limit. - private static final int MAX_EVENTS_PER_SEND = 500; - @Override public void run() { boolean shouldSendMore; @@ -198,7 +209,9 @@ public void run() { log.log(Level.INFO, "Sending {0} events to {1}", new Object[] { eventsToSend.size(), url }); - doSend(eventsToSend, url); + if (!tryToSend(eventsToSend, url)) { + shouldSendMore = false; + } } while (shouldSendMore); } @@ -207,7 +220,7 @@ private ArrayList copyEventsToSendFromQueue() { ArrayList eventsToSend = new ArrayList(sendQueue.size()); Iterator i = sendQueue.iterator(); - while (i.hasNext() && eventsToSend.size() < MAX_EVENTS_PER_SEND) { + while (i.hasNext() && eventsToSend.size() < maxEventsPerSend) { eventsToSend.add(i.next()); } @@ -235,7 +248,7 @@ private String pickDestinationUrl() { return url; } - private void doSend(final ArrayList eventsToSend, final String url) { + private boolean tryToSend(final ArrayList eventsToSend, final String url) { CancellableCallable task = serverAccess.getSendEventLogJob(url, eventsToSend); Future future = BgTask.start("Sending stats", task); @@ -245,7 +258,7 @@ private void doSend(final ArrayList eventsToSend, final String ur future.cancel(true); } catch (ExecutionException ex) { log.log(Level.INFO, "Sending failed", ex); - return; + return false; } log.log(Level.INFO, "Sent {0} events successfully to {1}", new Object[] { eventsToSend.size(), url }); @@ -256,6 +269,7 @@ private void doSend(final ArrayList eventsToSend, final String ur // then we may end up sending duplicate events later. // This will hopefully be very rare. savingTask.start(); + return true; } private void removeSentEventsFromQueue() { diff --git a/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/data/serialization/CourseListParserTest.java b/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/data/serialization/CourseListParserTest.java index 2239bdc9..4ee9c0ae 100644 --- a/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/data/serialization/CourseListParserTest.java +++ b/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/data/serialization/CourseListParserTest.java @@ -6,19 +6,20 @@ import fi.helsinki.cs.tmc.data.CourseListUtils; import fi.helsinki.cs.tmc.data.Exercise; import fi.helsinki.cs.tmc.data.ExerciseListUtils; +import java.util.Date; import java.util.GregorianCalendar; import java.util.List; import org.junit.Before; public class CourseListParserTest { - + private CourseListParser parser; - + @Before public void setUp() { parser = new CourseListParser(); } - + @Test public void itShouldParseJsonCourseLists() { String exercisesJson = @@ -33,30 +34,33 @@ public void itShouldParseJsonCourseLists() { "checksum: \"123abc\"" + "}]"; String json = "{api_version: 1, courses: [{\"name\": \"TheCourse\",\"exercises\": " + exercisesJson + "}]}"; - + List result = parser.parseFromJson(json); - + Course course = CourseListUtils.getCourseByName(result, "TheCourse"); assertEquals("TheCourse", course.getName()); - + Exercise exercise = ExerciseListUtils.getExerciseByName(course.getExercises(), "TheExercise"); - + assertEquals("TheCourse", exercise.getCourseName()); - + GregorianCalendar cal = new GregorianCalendar(); cal.setTime(exercise.getDeadline()); assertEquals(2015, cal.get(GregorianCalendar.YEAR)); assertEquals(1, cal.get(GregorianCalendar.HOUR_OF_DAY)); assertEquals(30, cal.get(GregorianCalendar.MINUTE)); - + assertEquals("http://example.com/courses/123/exercises/1.zip", exercise.getDownloadUrl()); assertEquals("http://example.com/courses/123/exercises/1/submissions", exercise.getReturnUrl()); assertTrue(exercise.isAttempted()); assertFalse(exercise.isCompleted()); + + exercise.setDeadline(new Date(new Date().getTime() + 60 * 60 * 1000)); assertTrue(exercise.isReturnable()); + assertEquals("123abc", exercise.getChecksum()); } - + @Test public void itShouldParseAnEmptyJsonArrayAsAnEmptyCourseList() { List empty = parser.parseFromJson("{api_version: 1, courses: []}"); @@ -67,12 +71,12 @@ public void itShouldParseAnEmptyJsonArrayAsAnEmptyCourseList() { public void itShouldThrowAnNullPointerExceptionIfTheInputIsEmpty() throws Exception { parser.parseFromJson(null); } - + @Test(expected = IllegalArgumentException.class) public void itShouldThrowAnIllegalArgumentExceptionIfTheInputIsEmpty() throws Exception { parser.parseFromJson(" "); } - + @Test public void itShouldParseANullDeadlineAsNull() { String exercisesJson = @@ -86,9 +90,9 @@ public void itShouldParseANullDeadlineAsNull() { "returnable: true" + "}]"; String json = "{api_version: 1, courses: [{\"name\": \"TheCourse\",\"exercises\": " + exercisesJson + "}]}"; - + List result = parser.parseFromJson(json); - + assertNull(result.get(0).getExercises().get(0).getDeadline()); } } diff --git a/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventRateLimiterTest.java b/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventRateLimiterTest.java deleted file mode 100644 index 5c447981..00000000 --- a/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventRateLimiterTest.java +++ /dev/null @@ -1,91 +0,0 @@ -package fi.helsinki.cs.tmc.spyware; - -import org.junit.Before; -import org.junit.Test; - -public class EventRateLimiterTest extends EventForwardedTestBase { - private final long reasonableTime = 300; - - private EventRateLimiter limiter; - - @Before - public void setUp() { - limiter = new EventRateLimiter(receiver); - } - - @Override - protected EventReceiver getSystemUnderTest() { - return limiter; - } - - @Test - public void testDeliversOnlyLatestEventWhenSpammed() { - limiter.setCooldownForEventKey("course|exercise|one", reasonableTime); - limiter.setCooldownForEventKey("course|exercise|two", reasonableTime * 2); - - sendEvents(30, "one"); - sendEvents(20, "two"); - sleep(reasonableTime * 1.1); - sendEvents(20, "two"); - sleep(reasonableTime * 2.1); - - assertReceivedExactly(0, 30, 29, 30+20+19); - } - - @Test - public void testWorksWithSparseEvents() { - limiter.setCooldownForEventKey("course|exercise|one", reasonableTime); - - sendEvents(2, "one"); - sleep(reasonableTime * 1.1); - sendEvents(1, "one"); - sleep(reasonableTime * 1.1); - sendEvents(1, "one"); - sleep(reasonableTime * 1.1); - sendEvents(1, "one"); - sleep(reasonableTime * 1.1); - - assertReceivedExactly(0, 1, 2, 3, 4); - } - - @Test - public void testFlushesOnClose() { - limiter.setCooldownForEventKey("course|exercise|one", reasonableTime); - - sendEvents(3, "one"); - sleep(reasonableTime * 1.1); - sendEvents(3, "one"); - - limiter.close(); - - assertReceivedExactly(0, 2, 3, 5); - - sleep(reasonableTime * 1.1); - - assertReceivedExactly(0, 2, 3, 5); - } - - @Test - public void testCloseWithNothingPending() { - limiter.setCooldownForEventKey("course|exercise|one", reasonableTime); - - sendEvents(3, "one"); - sleep(reasonableTime * 1.1); - - limiter.close(); - - assertReceivedExactly(0, 2); - - sleep(reasonableTime * 1.1); - - assertReceivedExactly(0, 2); - } - - private void sleep(double time) { - try { - Thread.sleep((long)time); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } -} diff --git a/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventSendBufferTest.java b/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventSendBufferTest.java index 1a0dc411..30a1453d 100644 --- a/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventSendBufferTest.java +++ b/tmc-plugin/test/unit/src/fi/helsinki/cs/tmc/spyware/EventSendBufferTest.java @@ -11,6 +11,7 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,8 +43,8 @@ public class EventSendBufferTest { private long sendDuration; private Semaphore sendStartSemaphore; // released when a send starts private Exception sendException; - private volatile int sendOperationsStarted; - private volatile int sendOperationsFinished; + private AtomicInteger sendOperationsStarted; + private AtomicInteger sendOperationsFinished; @Captor private ArgumentCaptor savedEvents; @@ -64,18 +65,18 @@ public void setUp() throws IOException { sendDuration = 0; sendStartSemaphore = new Semaphore(0); sendException = null; - sendOperationsStarted = 0; - sendOperationsFinished = 0; + sendOperationsStarted = new AtomicInteger(0); + sendOperationsFinished = new AtomicInteger(0); when(serverAccess.getSendEventLogJob(spywareServerUrl.capture(), sentEvents.capture())).thenReturn(new CancellableCallable() { @Override public Object call() throws Exception { - sendOperationsStarted++; + sendOperationsStarted.incrementAndGet(); sendStartSemaphore.release(); Thread.sleep(sendDuration); if (sendException != null) { throw sendException; } - sendOperationsFinished++; + sendOperationsFinished.incrementAndGet(); return null; } @@ -165,7 +166,7 @@ public void autosendsPeriodically() throws InterruptedException { sender.receiveEvent(ev2); Thread.sleep(250); - assertTrue(sendOperationsFinished >= 2); + assertTrue(sendOperationsFinished.get() >= 2); } @Test @@ -192,12 +193,12 @@ public void autosendsWhenNumberOfEventsGoesOverThreshold() throws TimeoutExcepti sender.receiveEvent(ev1); sender.receiveEvent(ev2); Thread.sleep(50); - assertEquals(0, sendOperationsFinished); + assertEquals(0, sendOperationsFinished.get()); sender.receiveEvent(ev3); sender.waitUntilCurrentSendingFinished(1000); - assertEquals(1, sendOperationsFinished); + assertEquals(1, sendOperationsFinished.get()); LoggableEvent[] expecteds = new LoggableEvent[] { ev1, ev2, ev3 }; assertArrayEquals(expecteds, sentEvents.getValue().toArray(new LoggableEvent[0])); } @@ -214,12 +215,26 @@ public void autosendingWhenOverThresholdHasACooldown() throws TimeoutException, sender.receiveEvent(ev4); sender.waitUntilCurrentSendingFinished(1000); - assertEquals(1, sendOperationsStarted); + assertEquals(1, sendOperationsStarted.get()); LoggableEvent[] expecteds = new LoggableEvent[] { ev1, ev2, ev3 }; assertArrayEquals(expecteds, sentEvents.getValue().toArray(new LoggableEvent[0])); } - @Test + @Test // Issue #125 + public void retryLoopRespectAutosendIntervalOnFailureEvenIfThereIsMoreToSend() throws TimeoutException, InterruptedException { + sendException = new RuntimeException("Sending failed"); + sender.setMaxEventsPerSend(2); + sender.receiveEvent(ev1); + sender.receiveEvent(ev2); + sender.receiveEvent(ev3); + sender.sendNow(); + sender.waitUntilCurrentSendingFinished(1000); + + assertEquals(1, sendOperationsStarted.get()); + assertEquals(0, sendOperationsFinished.get()); + } + + @Test // FIXME: this test appears to be flaky public void discardsOldestEventsOnOverflow() throws TimeoutException, InterruptedException { sender.setMaxEvents(3); @@ -251,7 +266,7 @@ public void sendsEventsReceivedDuringSendingInSubsequentSend() throws TimeoutExc sender.sendNow(); sender.waitUntilCurrentSendingFinished(1000); - assertEquals(2, sendOperationsFinished); + assertEquals(2, sendOperationsFinished.get()); assertEquals(2, sentEvents.getAllValues().size()); assertArrayEquals(new LoggableEvent[] { ev1 }, sentEvents.getAllValues().get(0).toArray(new LoggableEvent[0])); @@ -286,7 +301,7 @@ public void toleratesOverflowDuringSending() throws TimeoutException, Interrupte sender.sendNow(); sender.waitUntilCurrentSendingFinished(1000); - assertEquals(3, sendOperationsFinished); + assertEquals(3, sendOperationsFinished.get()); assertEquals(3, sentEvents.getAllValues().size()); assertArrayEquals(new LoggableEvent[] { ev1 }, sentEvents.getAllValues().get(0).toArray(new LoggableEvent[0])); @@ -320,7 +335,7 @@ public void retainsEventsForNextSendIfSendingFails() throws TimeoutException, In sender.waitUntilCurrentSendingFinished(1000); assertEquals(2, sendStartSemaphore.availablePermits()); - assertEquals(1, sendOperationsFinished); + assertEquals(1, sendOperationsFinished.get()); assertArrayEquals(new LoggableEvent[] { ev1 }, sentEvents.getValue().toArray(new LoggableEvent[0])); Thread.sleep(100); // Wait for save