diff --git a/pom.xml b/pom.xml index 2ca39cd..f43e7a1 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,8 @@ limitations under the License. --> - + org.sonatype.oss @@ -62,10 +63,10 @@ UTF-8 UTF-8 1.9.17 - 5.0.3 - 5.0.2 + [5.1.0,5.2.0) - 2.6 + 18.0 + 2.7 1.14.8 3.1 @@ -145,7 +146,7 @@ org.mockito mockito-core - 1.10.8 + 1.10.19 test @@ -161,9 +162,15 @@ - com.sappenin.objectify - objectify-utils - ${objectify-utils.version} + joda-time + joda-time + ${joda-time.version} + + + + com.google.guava + guava + ${guava.version} diff --git a/src/main/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImpl.java b/src/main/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImpl.java index af55ee8..7785672 100644 --- a/src/main/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImpl.java +++ b/src/main/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImpl.java @@ -430,13 +430,23 @@ public Counter increment(final String counterName, final long amount, boolean is // "atomicIncrementShardWork" completes. This is because we don't want to increment memcache (below) until // after a real increment in the datastore has taken place. final Long amountIncrementedInTx = ObjectifyService.ofy().transact(atomicIncrementShardWork); + this.incrementMemcacheAtomic2(counterName, amountIncrementedInTx.longValue()); // ///////////////// - // Increment this counter in memcache atomically, with retry until it succeeds (with some governor). If this - // fails, it's ok because memcache is merely a cache of the actual count data, and will eventually become - // accurate when the cache is reloaded via a call to getCount. - // ///////////////// - this.incrementMemcacheAtomic2(counterName, amountIncrementedInTx.longValue()); + // Increment this counter in memcache atomically, but only if we're not inside of an existing transaction. + // Otherwise, clear out memcache. +// if (isParentTransactionActive()) +// { +// // If we're in a transaction, then don't update memcache. Instead, clear it out since we can't know if the +// // parent transaction will abort after our call to memcache. +// this.memcacheSafeDelete(counterName); +// } +// else +// { +// // If this fails, it's ok because memcache is merely a cache of the actual count data, and will eventually +// // become accurate when the cache is reloaded via a call to getCount. +// this.incrementMemcacheAtomic2(counterName, amountIncrementedInTx.longValue()); +// } // return #getCount because this will either return the memcache value or get the actual count from the // Datastore, which will do the same thing. @@ -449,7 +459,7 @@ public void incrementInExistingTX(String counterName, long amount) { // The increment functionality of this forwarding call will work properly. We discard the results of that call, // however, because in certain cases that result cannot be relied upon. See Github issue #17 for more details. - this.increment(counterName, amount); + this.increment(counterName, amount, false); } /** @@ -950,7 +960,7 @@ protected CounterData getOrCreateCounterData(final String counterName) // stomp on each other. If two threads conflict with each other, one // will win and create the CounterData, and the other thread will retry // and return the loaded CounterData. - return ObjectifyService.ofy().transactNew(new Work() + return ObjectifyService.ofy().transact(new Work() { @Override public CounterData run() @@ -1243,4 +1253,17 @@ protected void assertCounterDetailsMutatable(final String counterName, final Cou + CounterStatus.AVAILABLE + " or " + CounterStatus.READ_ONLY_COUNT + " state!"); } } + + /** + * A helper method to determine if Objectify is currently operating in a parent-provided transactional context or + * not. + * + * @return + */ + @VisibleForTesting + protected boolean isParentTransactionActive() + { + return ObjectifyService.ofy().getTransaction() == null ? false : ObjectifyService.ofy().getTransaction() + .isActive(); + } } diff --git a/src/test/java/com/theupswell/appengine/counter/service/AbstractShardedCounterServiceTest.java b/src/test/java/com/theupswell/appengine/counter/service/AbstractShardedCounterServiceTest.java index ca1d807..d3cdf0d 100644 --- a/src/test/java/com/theupswell/appengine/counter/service/AbstractShardedCounterServiceTest.java +++ b/src/test/java/com/theupswell/appengine/counter/service/AbstractShardedCounterServiceTest.java @@ -1,20 +1,22 @@ /** * Copyright (C) 2014 UpSwell LLC (developers@theupswell.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package com.theupswell.appengine.counter.service; +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; + import com.google.appengine.api.capabilities.CapabilitiesService; import com.google.appengine.api.capabilities.CapabilitiesServiceFactory; import com.google.appengine.api.capabilities.Capability; @@ -27,16 +29,12 @@ import com.google.appengine.tools.development.testing.LocalMemcacheServiceTestConfig; import com.google.appengine.tools.development.testing.LocalServiceTestHelper; import com.google.appengine.tools.development.testing.LocalTaskQueueTestConfig; -import com.googlecode.objectify.ObjectifyFilter; import com.googlecode.objectify.ObjectifyService; -import com.sappenin.objectify.translate.UTCReadableInstantDateTranslatorFactory; +import com.googlecode.objectify.impl.translate.opt.joda.JodaTimeTranslators; +import com.googlecode.objectify.util.Closeable; import com.theupswell.appengine.counter.Counter; import com.theupswell.appengine.counter.data.CounterData; import com.theupswell.appengine.counter.data.CounterShardData; -import org.junit.After; -import org.junit.Before; - -import static org.junit.Assert.*; /** * An abstract base class for testing {@link com.theupswell.appengine.counter.service.ShardedCounterServiceImpl} @@ -56,10 +54,10 @@ public abstract class AbstractShardedCounterServiceTest protected LocalTaskQueueTestConfig.TaskCountDownLatch countdownLatch; protected LocalServiceTestHelper helper = new LocalServiceTestHelper( - // Our tests assume strong consistency, but a bug in the appengine test - // harness forces us to set this value to at least 1. - new LocalDatastoreServiceTestConfig().setDefaultHighRepJobPolicyUnappliedJobPercentage(0.01f), - new LocalMemcacheServiceTestConfig(), new LocalTaskQueueTestConfig()); + // Our tests assume strong consistency, but a bug in the appengine test + // harness forces us to set this value to at least 1. + new LocalDatastoreServiceTestConfig().setDefaultHighRepJobPolicyUnappliedJobPercentage(0.01f), + new LocalMemcacheServiceTestConfig(), new LocalTaskQueueTestConfig()); protected MemcacheService memcache; @@ -99,21 +97,23 @@ public void setUp() throws Exception // below // http://stackoverflow.com/questions/11197058/testing-non-default-app-engine-task-queues final LocalTaskQueueTestConfig localTaskQueueConfig = new LocalTaskQueueTestConfig() - .setDisableAutoTaskExecution(false).setQueueXmlPath("src/test/resources/queue.xml") - .setTaskExecutionLatch(countdownLatch).setCallbackClass(DeleteShardedCounterDeferredCallback.class); + .setDisableAutoTaskExecution(false).setQueueXmlPath("src/test/resources/queue.xml") + .setTaskExecutionLatch(countdownLatch).setCallbackClass(DeleteShardedCounterDeferredCallback.class); // Use a different queue.xml for testing purposes helper = new LocalServiceTestHelper( - new LocalDatastoreServiceTestConfig().setDefaultHighRepJobPolicyUnappliedJobPercentage(0.01f), - new LocalMemcacheServiceTestConfig(), new LocalCapabilitiesServiceTestConfig(), localTaskQueueConfig); + new LocalDatastoreServiceTestConfig().setDefaultHighRepJobPolicyUnappliedJobPercentage(0.01f), + new LocalMemcacheServiceTestConfig(), new LocalCapabilitiesServiceTestConfig(), localTaskQueueConfig); helper.setUp(); memcache = MemcacheServiceFactory.getMemcacheService(); capabilitiesService = CapabilitiesServiceFactory.getCapabilitiesService(); - ObjectifyService.ofy().clear(); - // Must be added before registering entities... - ObjectifyService.factory().getTranslators().add(new UTCReadableInstantDateTranslatorFactory()); + // New Objectify 5.1 Way. See https://groups.google.com/forum/#!topic/objectify-appengine/O4FHC_i7EGk + this.session = ObjectifyService.begin(); + + // Enable Joda Translators + JodaTimeTranslators.add(ObjectifyService.factory()); ObjectifyService.factory().register(CounterData.class); ObjectifyService.factory().register(CounterShardData.class); @@ -121,12 +121,14 @@ public void setUp() throws Exception shardedCounterService = new ShardedCounterServiceImpl(); } + // New Objectify 5.1 Way. See https://groups.google.com/forum/#!topic/objectify-appengine/O4FHC_i7EGk + protected Closeable session; + @After public void tearDown() { - // This is normally done in ObjectifyFilter but that doesn't exist for - // tests - ObjectifyFilter.complete(); + // New Objectify 5.1 Way. See https://groups.google.com/forum/#!topic/objectify-appengine/O4FHC_i7EGk + this.session.close(); this.helper.tearDown(); } @@ -158,10 +160,10 @@ protected void assertCounter(Counter counter, String expectedCounterName, long e protected ShardedCounterService initialShardedCounterService(int numInitialShards) { ShardedCounterServiceConfiguration config = new ShardedCounterServiceConfiguration.Builder() - .withNumInitialShards(numInitialShards).build(); + .withNumInitialShards(numInitialShards).build(); ShardedCounterService service = new ShardedCounterServiceImpl(MemcacheServiceFactory.getMemcacheService(), - CapabilitiesServiceFactory.getCapabilitiesService(), config); + CapabilitiesServiceFactory.getCapabilitiesService(), config); return service; } @@ -182,19 +184,19 @@ protected void disableMemcache() // below // http://stackoverflow.com/questions/11197058/testing-non-default-app-engine-task-queues final LocalTaskQueueTestConfig localTaskQueueConfig = new LocalTaskQueueTestConfig() - .setDisableAutoTaskExecution(false).setQueueXmlPath("src/test/resources/queue.xml") - .setTaskExecutionLatch(countdownLatch).setCallbackClass(DeleteShardedCounterDeferredCallback.class); + .setDisableAutoTaskExecution(false).setQueueXmlPath("src/test/resources/queue.xml") + .setTaskExecutionLatch(countdownLatch).setCallbackClass(DeleteShardedCounterDeferredCallback.class); Capability testOne = new Capability("memcache"); CapabilityStatus testStatus = CapabilityStatus.DISABLED; // Initialize LocalCapabilitiesServiceTestConfig capabilityStatusConfig = new LocalCapabilitiesServiceTestConfig() - .setCapabilityStatus(testOne, testStatus); + .setCapabilityStatus(testOne, testStatus); // Use a different queue.xml for testing purposes helper = new LocalServiceTestHelper( - new LocalDatastoreServiceTestConfig().setDefaultHighRepJobPolicyUnappliedJobPercentage(0.01f), - new LocalMemcacheServiceTestConfig(), localTaskQueueConfig, capabilityStatusConfig); + new LocalDatastoreServiceTestConfig().setDefaultHighRepJobPolicyUnappliedJobPercentage(0.01f), + new LocalMemcacheServiceTestConfig(), localTaskQueueConfig, capabilityStatusConfig); helper.setUp(); } diff --git a/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImplTest.java b/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImplTest.java index 557d86f..3d1cdbc 100644 --- a/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImplTest.java +++ b/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceImplTest.java @@ -14,6 +14,7 @@ import com.google.common.base.Optional; import com.googlecode.objectify.Key; import com.googlecode.objectify.ObjectifyService; +import com.googlecode.objectify.VoidWork; import com.theupswell.appengine.counter.Counter; import com.theupswell.appengine.counter.data.CounterData; import com.theupswell.appengine.counter.data.CounterData.CounterStatus; @@ -559,4 +560,25 @@ public void testDecrementShardWork_PreExistingCounter() throws Exception assertThat(actual, is(0L)); assertThat(impl.getCounter(TEST_COUNTER1).getCount(), is(1L)); } + + // /////////////////////////////// + // test isParentTransactionActive + // /////////////////////////////// + + @Test + public void testIsParentTransactionActive() + { + assertThat(impl.isParentTransactionActive(), is(false)); + + ObjectifyService.ofy().transactNew(new VoidWork() + { + @Override + public void vrun() + { + assertThat(impl.isParentTransactionActive(), is(true)); + } + }); + + assertThat(impl.isParentTransactionActive(), is(false)); + } } \ No newline at end of file diff --git a/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceIncrementInExistingTXTest.java b/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceIncrementInExistingTXTest.java index 48a79dc..b2e7980 100644 --- a/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceIncrementInExistingTXTest.java +++ b/src/test/java/com/theupswell/appengine/counter/service/ShardedCounterServiceIncrementInExistingTXTest.java @@ -12,18 +12,26 @@ */ package com.theupswell.appengine.counter.service; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; import java.util.UUID; -import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import com.google.appengine.api.capabilities.CapabilitiesService; +import com.google.appengine.api.memcache.MemcacheService; +import com.googlecode.objectify.Key; import com.googlecode.objectify.ObjectifyService; +import com.googlecode.objectify.VoidWork; import com.googlecode.objectify.Work; import com.theupswell.appengine.counter.Counter; import com.theupswell.appengine.counter.data.CounterShardData; +import com.theupswell.appengine.counter.service.ShardedCounterServiceConfiguration.Builder; /** * Unit tests for incrementing a counter via {@link ShardedCounterServiceImpl}. @@ -33,18 +41,16 @@ public class ShardedCounterServiceIncrementInExistingTXTest extends ShardedCounterServiceIncrementTest { + protected ShardedCounterService singleShardShardedCounterService; + @Before public void setUp() throws Exception { super.setUp(); - super.shardedCounterService = new ShardedCounterServiceTxWrapper(); - } - - @After - public void tearDown() - { - super.tearDown(); + final ShardedCounterServiceConfiguration config = new Builder().withNumInitialShards(1).build(); + this.singleShardShardedCounterService = new ShardedCounterServiceTxWrapper(super.memcache, + super.capabilitiesService, config); } // ///////////////////////// @@ -68,6 +74,18 @@ private static class ShardedCounterServiceTxWrapper extends ShardedCounterServic private static final boolean NOT_ISOLATED = false; + /** + * Default Constructor for Dependency-Injection. + * + * @param memcacheService + * @param config The configuration for this service + */ + public ShardedCounterServiceTxWrapper(final MemcacheService memcacheService, + final CapabilitiesService capabilitiesService, final ShardedCounterServiceConfiguration config) + { + super(memcacheService, capabilitiesService, config); + } + /** * Overidden so that all calls to {@link #increment} occur inside of an existing TX. * @@ -91,7 +109,6 @@ public Counter run() // 2.) Operate on the counter and return. Counter counter = ShardedCounterServiceTxWrapper.super.increment(counterName, amount, NOT_ISOLATED); - assert (counter.getCount() == 0L); return counter; } }); @@ -143,8 +160,8 @@ public void testIncrementDecrementInterleaving() shardedCounterService.increment(TEST_COUNTER1, 1); shardedCounterService.increment(TEST_COUNTER2, 1); - assertEquals(2, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); - assertEquals(3, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); + assertEquals(3, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); + assertEquals(4, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); shardedCounterService.increment(TEST_COUNTER1, 1); shardedCounterService.increment(TEST_COUNTER2, 1); @@ -154,30 +171,132 @@ public void testIncrementDecrementInterleaving() shardedCounterService.increment(TEST_COUNTER1, 1); shardedCounterService.increment(TEST_COUNTER2, 1); - assertEquals(5, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); - assertEquals(7, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); + assertEquals(6, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); + assertEquals(8, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); - shardedCounterService.decrement(TEST_COUNTER1); - shardedCounterService.decrement(TEST_COUNTER2); - shardedCounterService.decrement(TEST_COUNTER1); - shardedCounterService.decrement(TEST_COUNTER2); - shardedCounterService.decrement(TEST_COUNTER2); - shardedCounterService.decrement(TEST_COUNTER1); - shardedCounterService.decrement(TEST_COUNTER2); + shardedCounterService.decrement(TEST_COUNTER1, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); + shardedCounterService.decrement(TEST_COUNTER1, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); + shardedCounterService.decrement(TEST_COUNTER1, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); - assertEquals(2, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); - assertEquals(3, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); + assertEquals(3, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); + assertEquals(4, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); - shardedCounterService.decrement(TEST_COUNTER1); - shardedCounterService.decrement(TEST_COUNTER2); - shardedCounterService.decrement(TEST_COUNTER1); - shardedCounterService.decrement(TEST_COUNTER2); - shardedCounterService.decrement(TEST_COUNTER2); - shardedCounterService.decrement(TEST_COUNTER1); - shardedCounterService.decrement(TEST_COUNTER2); + shardedCounterService.decrement(TEST_COUNTER1, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); + shardedCounterService.decrement(TEST_COUNTER1, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); + shardedCounterService.decrement(TEST_COUNTER1, 1); + shardedCounterService.decrement(TEST_COUNTER2, 1); assertEquals(0, shardedCounterService.getCounter(TEST_COUNTER1).getCount()); assertEquals(0, shardedCounterService.getCounter(TEST_COUNTER2).getCount()); } + @Ignore + @Test + public void incrementInParentTX_NoValueInMemcache() + { + // Perform another increment in a Work, but abort it before it can commit. + ObjectifyService.ofy().transactNew(new VoidWork() + { + @Override + public void vrun() + { + singleShardShardedCounterService.incrementInExistingTX(TEST_COUNTER1, 10L); + } + }); + + // The increment should have succeeded + final Key counterShardDataKey = CounterShardData.key(TEST_COUNTER1, 0); + final CounterShardData counterShard = ObjectifyService.ofy().load().key(counterShardDataKey).now(); + assertThat(counterShard, is(not(nullValue()))); + assertThat(counterShard.getCount(), is(10L)); + assertThat(this.singleShardShardedCounterService.getCounter(TEST_COUNTER1).getCount(), is(10L)); + } + + @Ignore + @Test + public void incrementInParentTX_WithValueInMemcache() + { + // Make sure the counter exists in the cache + this.singleShardShardedCounterService.getCounter(TEST_COUNTER1); + + // Increment the counter's 1 shard so it has a count of 1. + this.singleShardShardedCounterService.increment(TEST_COUNTER1, 1); + assertThat(this.singleShardShardedCounterService.getCounter(TEST_COUNTER1).getCount(), is(1L)); + + final Key counterShardDataKey = CounterShardData.key(TEST_COUNTER1, 0); + CounterShardData counterShard = ObjectifyService.ofy().load().key(counterShardDataKey).now(); + assertThat(counterShard, is(not(nullValue()))); + assertThat(counterShard.getCount(), is(1L)); + + // Perform another increment in a Work, but abort it before it can commit. + ObjectifyService.ofy().transactNew(new VoidWork() + { + @Override + public void vrun() + { + singleShardShardedCounterService.incrementInExistingTX(TEST_COUNTER1, 10L); + } + }); + + // Both increments should have succeeded + counterShard = ObjectifyService.ofy().load().key(counterShardDataKey).now(); + assertThat(counterShard, is(not(nullValue()))); + assertThat(counterShard.getCount(), is(11L)); + assertThat(this.singleShardShardedCounterService.getCounter(TEST_COUNTER1).getCount(), is(11L)); + } + + @Test + @Ignore + public void incrementInAbortedParentTX() + { + // Make sure the counter exists + this.singleShardShardedCounterService.getCounter(TEST_COUNTER1); + + // Increment the counter's 1 shard so it has a count of 1. + this.singleShardShardedCounterService.increment(TEST_COUNTER1, 1); + assertThat(this.singleShardShardedCounterService.getCounter(TEST_COUNTER1).getCount(), is(1L)); + + final Key counterShardDataKey = CounterShardData.key(TEST_COUNTER1, 0); + CounterShardData counterShard = ObjectifyService.ofy().load().key(counterShardDataKey).now(); + assertThat(counterShard, is(not(nullValue()))); + assertThat(counterShard.getCount(), is(1L)); + + // Perform another increment in a Work, but abort it before it can commit. + try + { + ObjectifyService.ofy().transactNew(new VoidWork() + { + @Override + public void vrun() + { + singleShardShardedCounterService.incrementInExistingTX(TEST_COUNTER1, 10L); + throw new RuntimeException("Aborting this increment on purpose!"); + } + }); + fail(); + } + catch (Exception e) + { + // We should get here. The Counter should not have incremented, and should still have a count of 1. + counterShard = ObjectifyService.ofy().load().key(counterShardDataKey).now(); + assertThat(counterShard, is(not(nullValue()))); + assertThat(counterShard.getCount(), is(1L)); + assertThat(this.singleShardShardedCounterService.getCounter(TEST_COUNTER1).getCount(), is(1L)); + } + + // We should get here. The Counter should not have incremented, and should still have a count of 1. + counterShard = ObjectifyService.ofy().load().key(counterShardDataKey).now(); + assertThat(counterShard, is(not(nullValue()))); + assertThat(counterShard.getCount(), is(1L)); + assertThat(this.singleShardShardedCounterService.getCounter(TEST_COUNTER1).getCount(), is(1L)); + + } + }