Skip to content

Commit

Permalink
Fix grouping logic failure when number of items matches the group size (
Browse files Browse the repository at this point in the history
#163)

* Modify the grouping logic to not fail when number of items is exact match of the group size
* Filter out empty lists
* Change constants to be package local
  • Loading branch information
apoi committed Oct 15, 2017
1 parent 512a3c0 commit 4968b17
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 11 deletions.
Expand Up @@ -60,7 +60,7 @@ public NetworkRequestStatusStoreCore(@NonNull final ContentResolver contentResol

@NonNull
@Override
protected Observable<List<CoreOperation>> groupOperations(@NonNull final Observable<CoreOperation> source) {
protected <R> Observable<List<R>> groupOperations(@NonNull Observable<R> source) {
// NetworkRequestStatus updates should not be grouped to ensure fast processing.
return source.map(Collections::singletonList);
}
Expand Down
Expand Up @@ -76,9 +76,9 @@ public abstract class ContentProviderStoreCoreBase<U> {

private final String TAG = getClass().getSimpleName();

private static final int DEFAULT_GROUPING_TIMEOUT_MS = 100;
static final int DEFAULT_GROUPING_TIMEOUT_MS = 100;

private static final int DEFAULT_GROUP_MAX_SIZE = 30;
static final int DEFAULT_GROUP_MAX_SIZE = 30;

@NonNull
private final ContentResolver contentResolver;
Expand Down Expand Up @@ -170,17 +170,18 @@ public static Handler createHandler(@NonNull final String name) {
/**
* Implements grouping logic for batching the content provider operations. The default
* logic buffers the operations with debounced timer while applying a hard limit for the
* number of operations. The data is serialized into a binder transaction. An attempt
* to pass a too large batch of operations will result in a failed binder transaction.
* number of operations.
*
* The data is serialized into a binder transaction. An attempt to pass here a too large
* batch of operations will result in a failed binder transaction.
*/
@NonNull
protected Observable<List<CoreOperation>> groupOperations(@NonNull final Observable<CoreOperation> source) {
protected <R> Observable<List<R>> groupOperations(@NonNull final Observable<R> source) {
return source.publish(stream -> stream.buffer(
Observable.amb(
stream.debounce(groupingTimeout, TimeUnit.MILLISECONDS),
stream.skip(groupMaxSize - 1))
.first() // Complete observable after the first reached trigger
.repeatWhen(observable -> observable))); // Resubscribe immediately for the next buffer
Observable.merge(
stream.window(groupMaxSize).skip(1),
stream.debounce(groupingTimeout, TimeUnit.MILLISECONDS))))
.filter(list -> !list.isEmpty());
}

@NonNull
Expand Down
@@ -0,0 +1,127 @@
package io.reark.reark.data.stores.cores;

import android.content.ContentResolver;
import android.content.ContentValues;
import android.database.ContentObserver;
import android.database.Cursor;
import android.net.Uri;
import android.support.annotation.NonNull;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.concurrent.TimeUnit;

import rx.Emitter.BackpressureMode;
import rx.Observable;

import static io.reark.reark.data.stores.cores.ContentProviderStoreCoreBase.DEFAULT_GROUPING_TIMEOUT_MS;
import static io.reark.reark.data.stores.cores.ContentProviderStoreCoreBase.DEFAULT_GROUP_MAX_SIZE;

public class ContentProviderStoreCoreBaseTest {

private ContentProviderStoreCoreBase<Integer> contentStoreCore;

@Before
public void setup() {
contentStoreCore = new NullContentStore();
}

private static Observable<Integer> createSource(int numItems) {
return Observable.fromEmitter(publisher -> {
for (int i = 1; i <= numItems; i++) {
publisher.onNext(i);
}
}, BackpressureMode.BUFFER);
}

@Test
public void groupOperations_WithNoElements_DoesNotEmit() {
contentStoreCore.groupOperations(Observable.never())
.test()
.awaitTerminalEvent(2 * DEFAULT_GROUPING_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.assertNotCompleted()
.assertNoValues();
}

@Test
public void groupOperations_WithOneElement_EmitsOneGroup() {
contentStoreCore.groupOperations(createSource(1))
.test()
.awaitTerminalEvent(2 * DEFAULT_GROUPING_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.assertNotCompleted()
.assertValueCount(1);
}

@Test
public void groupOperations_WithGroupMaxSizeElements_EmitsOneGroup() {
contentStoreCore.groupOperations(createSource(DEFAULT_GROUP_MAX_SIZE))
.test()
.awaitTerminalEvent(2 * DEFAULT_GROUPING_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.assertNotCompleted()
.assertValueCount(1);
}

@Test
public void groupOperations_WithOneOverGroupMaxSizeElements_EmitsTwoGroups() {
contentStoreCore.groupOperations(createSource(DEFAULT_GROUP_MAX_SIZE + 1))
.test()
.awaitTerminalEvent(2 * DEFAULT_GROUPING_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.assertNotCompleted()
.assertValueCount(2);
}

@Test
public void groupOperations_WithThreeTimesGroupMaxSizeElements_EmitsThreeGroups() {
contentStoreCore.groupOperations(createSource(3 * DEFAULT_GROUP_MAX_SIZE))
.test()
.awaitTerminalEvent(2 * DEFAULT_GROUPING_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.assertNotCompleted()
.assertValueCount(3);
}

@SuppressWarnings({"ReturnOfNull", "ConstantConditions", "ZeroLengthArrayAllocation"})
private static class NullContentStore extends ContentProviderStoreCoreBase<Integer> {

NullContentStore() {
super(Mockito.mock(ContentResolver.class));
}

@NonNull
@Override
protected String getAuthority() {
return null;
}

@NonNull
@Override
protected ContentObserver getContentObserver() {
return null;
}

@NonNull
@Override
protected Uri getContentUri() {
return null;
}

@NonNull
@Override
protected String[] getProjection() {
return new String[0];
}

@NonNull
@Override
protected Integer read(@NonNull Cursor cursor) {
return null;
}

@NonNull
@Override
protected ContentValues getContentValuesForItem(@NonNull Integer item) {
return null;
}
}
}

0 comments on commit 4968b17

Please sign in to comment.