Skip to content

Commit

Permalink
Merge pull request ReactiveX#211 from benjchristensen/issue-192
Browse files Browse the repository at this point in the history
Remove use of JSR 305 and dependency on com.google.code.findbugs
  • Loading branch information
benjchristensen committed Mar 27, 2013
2 parents e5f5b8b + 41a7cb7 commit 669f8a7
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 13 deletions.
1 change: 0 additions & 1 deletion rxjava-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}
Expand Down
13 changes: 7 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperationZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import org.junit.Test;
import org.mockito.InOrder;

Expand Down Expand Up @@ -68,7 +65,9 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> zip(Observabl
return a;
}

@ThreadSafe
/*
* ThreadSafe
*/
private static class ZipObserver<R, T> implements Observer<T> {
final Observable<T> w;
final Aggregator<R> a;
Expand Down Expand Up @@ -110,9 +109,10 @@ public void onNext(T args) {
/**
* Receive notifications from each of the Observables we are reducing and execute the zipFunction whenever we have received events from all Observables.
*
* This class is thread-safe.
*
* @param <T>
*/
@ThreadSafe
private static class Aggregator<T> implements Func1<Observer<T>, Subscription> {

private volatile SynchronizedObserver<T> observer;
Expand All @@ -132,10 +132,11 @@ public Aggregator(FuncN<T> zipFunction) {

/**
* Receive notification of a Observer starting (meaning we should require it for aggregation)
*
* Thread Safety => Invoke ONLY from the static factory methods at top of this class which are always an atomic execution by a single thread.
*
* @param w
*/
@GuardedBy("Invoked ONLY from the static factory methods at top of this class which are always an atomic execution by a single thread.")
private void addObserver(ZipObserver<T, ?> w) {
// initialize this ZipObserver
observers.add(w);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.concurrent.ThreadSafe;

import rx.Subscription;

/**
Expand All @@ -32,7 +30,6 @@
* <li>handle both synchronous and asynchronous subscribe() execution flows</li>
* </ul>
*/
@ThreadSafe
public final class AtomicObservableSubscription implements Subscription {

private AtomicReference<Subscription> actualSubscription = new AtomicReference<Subscription>();
Expand Down
3 changes: 0 additions & 3 deletions rxjava-core/src/main/java/rx/util/SynchronizedObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.concurrent.ThreadSafe;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand All @@ -50,7 +48,6 @@
*
* @param <T>
*/
@ThreadSafe
public final class SynchronizedObserver<T> implements Observer<T> {

/**
Expand Down

0 comments on commit 669f8a7

Please sign in to comment.