Skip to content

Commit

Permalink
Add ability for BatchPushSource to notify errors asynchronously (apac…
Browse files Browse the repository at this point in the history
…he#7865)

Co-authored-by: Jerry Peng <jerryp@splunk.com>
  • Loading branch information
jerrypeng and Jerry Peng committed Aug 24, 2020
1 parent 38df475 commit 63998af
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.io.batchdatagenerator;

import io.codearte.jfairy.Fairy;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.SourceContext;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@Slf4j
public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implements Runnable {

private Fairy fairy;
private SourceContext sourceContext;
private int maxRecordsPerCycle = 10;

private ExecutorService executor = Executors.newSingleThreadExecutor();

@Override
public void close() {
executor.shutdownNow();
}

@Override
public void open(Map config, SourceContext context) throws Exception {
this.fairy = Fairy.create();
this.sourceContext = context;
}

@Override
public void discover(Consumer taskEater) throws Exception {
log.info("Generating one task for each instance");
for (int i = 0; i < sourceContext.getNumInstances(); ++i) {
taskEater.accept(String.format("something-%d", System.currentTimeMillis()).getBytes());
}
}

@Override
public void prepare(byte[] instanceSplit) throws Exception {
log.info("Instance " + sourceContext.getInstanceId() + " got a new discovered task {}", new String(instanceSplit));
executor.submit(this);
}

@Override
public void run() {
try {
for (int i = 0; i < maxRecordsPerCycle; i++) {
Thread.sleep(50);
Record<Person> record = () -> new Person(fairy.person());
consume(record);
}
// this task is completed
consume(null);
} catch (Exception e) {
notifyError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ public Object getValue() {
}
}

private static class ErrorNotifierRecord implements Record {
private Exception e;
public ErrorNotifierRecord(Exception e) {
this.e = e;
}
@Override
public Object getValue() {
return null;
}

public Exception getException() {
return e;
}
}

private LinkedBlockingQueue<Record<T>> queue;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
private final NullRecord nullRecord = new NullRecord();
Expand All @@ -48,6 +63,9 @@ public BatchPushSource() {
@Override
public Record<T> readNext() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
}
if (record instanceof NullRecord) {
return null;
} else {
Expand Down Expand Up @@ -80,4 +98,12 @@ public void consume(Record<T> record) {
public int getQueueLength() {
return DEFAULT_QUEUE_LENGTH;
}

/**
* Allows the source to notify errors asynchronously
* @param ex
*/
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.core;

import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.function.Consumer;

public class BatchPushSourceTest {

BatchPushSource testBatchSource = new BatchPushSource() {
@Override
public void open(Map config, SourceContext context) throws Exception {

}

@Override
public void discover(Consumer taskEater) throws Exception {

}

@Override
public void prepare(byte[] task) throws Exception {

}

@Override
public void close() throws Exception {

}
};

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
public void testNotifyErrors() throws Exception {
testBatchSource.notifyError(new RuntimeException("test exception"));
testBatchSource.readNext();
}
}

0 comments on commit 63998af

Please sign in to comment.