Skip to content

Commit

Permalink
#822 - Added initializeListener default method
Browse files Browse the repository at this point in the history
  • Loading branch information
vivekmuniyandi committed Sep 21, 2017
1 parent 6f3daa6 commit 2326339
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,20 @@ public ApplyTransformListener() {
"if you see this once/batch, fix your job configuration");
}

@Override
public void initializeListener(QueryBatcher queryBatcher) {
HostAvailabilityListener hostAvailabilityListener = HostAvailabilityListener.getInstance(queryBatcher);
if ( hostAvailabilityListener != null ) {
BatchFailureListener<QueryBatch> retryListener = hostAvailabilityListener.initializeRetryListener(this);
if( retryListener != null ) onFailure(retryListener);
}
}

/**
* The standard BatchListener action called by QueryBatcher.
*/
public void processEvent(QueryBatch batch) {
if ( HostAvailabilityListener.getInstance(batch.getBatcher()) != null ) {
BatchFailureListener<QueryBatch> retryListener = HostAvailabilityListener.getInstance(batch.getBatcher())
.initializeRetryListener(this);
if( retryListener != null ) onFailure(retryListener);
}
initializeListener(batch.getBatcher());
if ( ! (batch.getClient() instanceof DatabaseClientImpl) ) {
throw new IllegalStateException("DatabaseClient must be instanceof DatabaseClientImpl");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,21 @@ public DeleteListener() {
"if you see this once/batch, fix your job configuration");
}

@Override
public void initializeListener(QueryBatcher queryBatcher) {
HostAvailabilityListener hostAvailabilityListener = HostAvailabilityListener.getInstance(queryBatcher);
if ( hostAvailabilityListener != null ) {
BatchFailureListener<QueryBatch> retryListener = hostAvailabilityListener.initializeRetryListener(this);
if ( retryListener != null ) onFailure(retryListener);
}
}

/**
* The standard BatchListener action called by QueryBatcher.
*/
@Override
public void processEvent(QueryBatch batch) {
if ( HostAvailabilityListener.getInstance(batch.getBatcher()) != null ) {
BatchFailureListener<QueryBatch> retryListener = HostAvailabilityListener.getInstance(batch.getBatcher())
.initializeRetryListener(this);
if( retryListener != null ) onFailure(retryListener);
}
initializeListener(batch.getBatcher());
try {
batch.getClient().newDocumentManager().delete( batch.getItems() );
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ protected DocumentPage getDocs(QueryBatch batch) {
}
}

@Override
public void initializeListener(QueryBatcher queryBatcher) {
HostAvailabilityListener hostAvailabilityListener = HostAvailabilityListener.getInstance(queryBatcher);
if ( hostAvailabilityListener != null ) {
BatchFailureListener<QueryBatch> retryListener = hostAvailabilityListener.initializeRetryListener(this);
if ( retryListener != null ) onFailure(retryListener);
}
}

/**
* This is the method QueryBatcher calls for ExportListener to do its
* thing. You should not need to call it.
Expand All @@ -104,11 +113,7 @@ protected DocumentPage getDocs(QueryBatch batch) {
*/
@Override
public void processEvent(QueryBatch batch) {
if ( HostAvailabilityListener.getInstance(batch.getBatcher()) != null ) {
BatchFailureListener<QueryBatch> retryListener = HostAvailabilityListener.getInstance(batch.getBatcher())
.initializeRetryListener(this);
if( retryListener != null ) onFailure(retryListener);
}
initializeListener(batch.getBatcher());
try ( DocumentPage docs = getDocs(batch) ) {
while ( docs.hasNext() ) {
for ( Consumer<DocumentRecord> listener : exportListeners ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ public ExportToWriterListener(Writer writer) {
}

@Override
public void processEvent(QueryBatch batch) {
if ( HostAvailabilityListener.getInstance(batch.getBatcher()) != null ) {
BatchFailureListener<QueryBatch> retryListener = HostAvailabilityListener.getInstance(batch.getBatcher())
.initializeRetryListener(this);
if( retryListener != null ) onFailure(retryListener);
public void initializeListener(QueryBatcher queryBatcher) {
HostAvailabilityListener hostAvailabilityListener = HostAvailabilityListener.getInstance(queryBatcher);
if ( hostAvailabilityListener != null ) {
BatchFailureListener<QueryBatch> retryListener = hostAvailabilityListener.initializeRetryListener(this);
if ( retryListener != null ) onFailure(retryListener);
}
}

@Override
public void processEvent(QueryBatch batch) {
initializeListener(batch.getBatcher());
try ( DocumentPage docs = getDocs(batch) ) {
synchronized(writer) {
for ( DocumentRecord doc : docs ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,10 @@ public interface QueryBatchListener extends BatchListener<QueryBatch> {
* @param batch the batch of uris and some metadata about the current status of the job
*/
void processEvent(QueryBatch batch);

/**
* This default method should be implemented by custom listeners that should
* be retried in case of failover.
*/
default void initializeListener(QueryBatcher queryBatcher) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ public UrisToWriterListener(Writer writer) {
"if you see this once/batch, fix your job configuration");
}

@Override
public void initializeListener(QueryBatcher queryBatcher) {
HostAvailabilityListener hostAvailabilityListener = HostAvailabilityListener.getInstance(queryBatcher);
if ( hostAvailabilityListener != null ) {
BatchFailureListener<QueryBatch> retryListener = hostAvailabilityListener.initializeRetryListener(this);
if ( retryListener != null ) onFailure(retryListener);
}
}

@Override
public void processEvent(QueryBatch batch) {
try {
if ( HostAvailabilityListener.getInstance(batch.getBatcher()) != null ) {
BatchFailureListener<QueryBatch> retryListener = HostAvailabilityListener.getInstance(batch.getBatcher())
.initializeRetryListener(this);
if( retryListener != null ) onFailure(retryListener);
}
initializeListener(batch.getBatcher());
synchronized(writer) {
for ( String uri : batch.getItems() ) {
try {
Expand Down

0 comments on commit 2326339

Please sign in to comment.