Skip to content
This repository has been archived by the owner on May 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3 from patb23/master
Browse files Browse the repository at this point in the history
* Changed the dependency from com.netflix to io.reactivex. Also handling size based rollover by checking content length against the offset

* Changed the dependency from 'com.netflix' to 'io.reactivex'. Also handling size based rollover by checking content length against the offset
  • Loading branch information
patb23 authored and valpackett committed Jun 7, 2016
1 parent 8606158 commit 5d48702
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 72 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ repositories {
}

dependencies {
compile group: 'com.netflix.rxjava', name: 'rxjava-core', version: '0.9.0'
compile group: 'com.google.http-client', name: 'google-http-client', version: '1.15.0-rc'
compile 'io.reactivex:rxjava:1.1.5'
compile group: 'com.google.http-client', name: 'google-http-client', version: '1.15.0-rc'
}

uploadArchives {
Expand Down
143 changes: 73 additions & 70 deletions src/main/java/rx/HttpTail.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package rx;

import rx.Observable;
import rx.util.functions.Func1;
import rx.Observer;
import rx.Subscriber;
import java.io.IOException;
import java.io.InputStream;
import java.util.Timer;
Expand All @@ -17,80 +18,82 @@
import com.google.api.client.http.apache.ApacheHttpTransport;
import com.google.api.client.util.ExponentialBackOff;

public class HttpTail {
public class HttpTail {

public static final class TailResult {
private final InputStream body;
private final long offset;
public static final class TailResult {
private final InputStream body;
private final long offset;

public InputStream getBody() { return body; }
public long getOffset() { return offset; }
public InputStream getBody() { return body; }
public long getOffset() { return offset; }

public TailResult(InputStream aBody, long anOffset) {
body = aBody;
offset = anOffset;
public TailResult(InputStream aBody, long anOffset) {
body = aBody;
offset = anOffset;
}
}
}

private final GenericUrl url;
private final HttpRequestFactory requestFactory;
private final long startingOffset;
private final long delay;

public HttpTail(String aUrl, long aStartingOffset, long aDelay) {
url = new GenericUrl(aUrl);
HttpTransport transport = new ApacheHttpTransport();
requestFactory = transport.createRequestFactory();
startingOffset = aStartingOffset;
delay = aDelay;
}
private final GenericUrl url;
private final HttpRequestFactory requestFactory;
private final long startingOffset;
private final long delay;

public HttpTail(String aUrl, long aStartingOffset, long aDelay) {
url = new GenericUrl(aUrl);
HttpTransport transport = new ApacheHttpTransport();
requestFactory = transport.createRequestFactory();
startingOffset = aStartingOffset;
delay = aDelay;
}

public Observable createObservable() {

public Observable createObservable() {
return Observable.create(new Func1<Observer<TailResult>, Subscription>() {
public Subscription call(Observer<TailResult> observer) {
final Observer fObserver = observer;
final Timer timer = new Timer();

final TimerTask task = new TimerTask() {
private long offset = startingOffset;

long getLength() throws IOException {
HttpRequest request = requestFactory.buildHeadRequest(url);
request.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(new ExponentialBackOff()));
HttpResponse response = request.execute();
return response.getHeaders().getContentLength();
}

HttpResponse getContent(long currentLength) throws IOException {
HttpRequest request = requestFactory.buildGetRequest(url);
request.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(new ExponentialBackOff()));
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.setRange(String.format("bytes=%d-%d", offset, currentLength));
request.setHeaders(requestHeaders);
return request.execute();
}

public void run() {
try {
long currentLength = getLength();
if (currentLength != offset) {
HttpResponse response = getContent(currentLength);
offset += response.getHeaders().getContentLength();
fObserver.onNext(new TailResult(response.getContent(), offset));
return Observable.create(new Observable.OnSubscribe<TailResult>() {
@Override
public void call(Subscriber<? super TailResult> subscriber) {
final Observer fObserver = subscriber;
final Timer timer = new Timer();
final TimerTask task = new TimerTask() {
@Override
public void run() {
try {
long currentLength = getLength();
if (currentLength < offset) //if size based trigger rolls over
offset = startingOffset;

if (currentLength != offset) {
HttpResponse response = getContent(currentLength);
offset += response.getHeaders().getContentLength();
fObserver.onNext(new TailResult(response.getContent(), offset));
}
} catch (IOException e) {
}
} catch (IOException e) { }
}
};

timer.schedule(task, 0, delay);

return new Subscription() {
public void unsubscribe() {
task.cancel();
timer.cancel();
}
};
}
});

}

private long offset = startingOffset;

long getLength() throws IOException {
HttpRequest request = requestFactory.buildHeadRequest(url);
request.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(new ExponentialBackOff()));
HttpResponse response = request.execute();
return response.getHeaders().getContentLength();
}

HttpResponse getContent(long currentLength) throws IOException {
HttpRequest request = requestFactory.buildGetRequest(url);
request.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(new ExponentialBackOff()));
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.setRange(String.format("bytes=%d-%d", offset, currentLength));
request.setHeaders(requestHeaders);
return request.execute();
}

};
timer.schedule(task, 0, delay);
}

});
}

}
}

0 comments on commit 5d48702

Please sign in to comment.