Skip to content

Commit

Permalink
Merge pull request ReactiveX#567 from akarnokd/TimestampWithScheduler
Browse files Browse the repository at this point in the history
Operation Timestamp with Scheduler
  • Loading branch information
benjchristensen committed Dec 5, 2013
2 parents 9e7d56d + 14dd0b1 commit 0026ea4
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 0 deletions.
13 changes: 13 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,19 @@ public Observable<Timestamped<T>> timestamp() {
return create(OperationTimestamp.timestamp(this));
}

/**
* Wraps each item emitted by a source Observable in a {@link Timestamped}
* object with timestamps provided by the given Scheduler.
*
* @param scheduler the {@link Scheduler} to use as a time source.
* @return an Observable that emits timestamped items from the source
* Observable with timestamps provided by the given Scheduler
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229003.aspx'>MSDN: Observable.Timestamp</a>
*/
public Observable<Timestamped<T>> timestamp(Scheduler scheduler) {
return create(OperationTimestamp.timestamp(this, scheduler));
}

/**
* Converts a {@link Future} into an Observable.
* <p>
Expand Down
12 changes: 12 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationTimestamp.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Scheduler;
import rx.util.Timestamped;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -44,4 +45,15 @@ public Timestamped<T> call(T value) {
}
});
}
/**
* Timestamp the source elements based on the timing provided by the scheduler.
*/
public static <T> OnSubscribeFunc<Timestamped<T>> timestamp(Observable<? extends T> source, final Scheduler scheduler) {
return OperationMap.map(source, new Func1<T, Timestamped<T>>() {
@Override
public Timestamped<T> call(T value) {
return new Timestamped<T>(scheduler.now(), value);
}
});
}
}
86 changes: 86 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import static org.mockito.Mockito.*;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import rx.Observer;
import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;
import rx.util.Timestamped;

public class OperationTimestampTest {
@Mock
Observer<Object> observer;
@Before
public void before() {
MockitoAnnotations.initMocks(this);
}
@Test
public void timestampWithScheduler() {
TestScheduler scheduler = new TestScheduler();

PublishSubject<Integer> source = PublishSubject.create();
Observable<Timestamped<Integer>> m = source.timestamp(scheduler);
m.subscribe(observer);

source.onNext(1);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
source.onNext(2);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
source.onNext(3);


InOrder inOrder = inOrder(observer);

inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 1));
inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(100, 2));
inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(200, 3));

verify(observer, never()).onError(any(Throwable.class));
verify(observer, never()).onCompleted();
}
@Test
public void timestampWithScheduler2() {
TestScheduler scheduler = new TestScheduler();

PublishSubject<Integer> source = PublishSubject.create();
Observable<Timestamped<Integer>> m = source.timestamp(scheduler);
m.subscribe(observer);

source.onNext(1);
source.onNext(2);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
source.onNext(3);


InOrder inOrder = inOrder(observer);

inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 1));
inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(0, 2));
inOrder.verify(observer, times(1)).onNext(new Timestamped<Integer>(200, 3));

verify(observer, never()).onError(any(Throwable.class));
verify(observer, never()).onCompleted();
}
}

0 comments on commit 0026ea4

Please sign in to comment.