Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,33 @@ $ bin/pulsar-admin sinks delete --name aws-s3-test
"Deleted successfully"
```

### Sink configuration
AWS S3 configuration such as accessKeyId, secretAccessKey, region, and bucketname needs to be specifed in [the configuration yaml](./s3/config/pulsar-s3-io.yaml)

When set `logLevel: "debug"`, debug logs will be printed by the sink.

Pulsar messages under the same ledger ID are grouped under a single S3 Object file. S3 object file follows the naming convention prefix with the input topic with the ledger Id. All messages are under the same ledger Id are written this file.

Since the sink uses the latest Pulsar message's ledger ID to detect the ledger rollover, a time based S3 Object rollover is also required to write the last ledger's messages into S3 in the case the messages over an topic are permanently stopped. `s3ObjectRolloverMinutes` in the config must be greater than `managedLedgerMaxLedgerRolloverTimeMinutes` set up in the Pulsar's broker.conf.

Because of ledger Id is used to identify an S3 object, the sink currently only supports a single input topic.

How Pulsar managed the ledger is configurable. Here are the default settings (from broker.conf):
```
# Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions
# * Either the max rollover time has been reached
# * or max entries have been written to the ledged and at least min-time
# has passed
managedLedgerMaxEntriesPerLedger=50000

# Minimum time between ledger rollover for a topic
managedLedgerMinLedgerRolloverTimeMinutes=10

# Maximum time before forcing a ledger rollover for a topic
managedLedgerMaxLedgerRolloverTimeMinutes=240
```

### Topic schema registry
It is mandatory a schema is enforced over the input topics. The Sink would have fatal error to create parquet format when it receives messages with different schemas.

Expand Down
2 changes: 2 additions & 0 deletions s3/config/pulsar-s3-io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ configs:
awsregion: "us-east-2"
bucketName: "bucket-name"
partSize: 5242880
s3ObjectRolloverMinutes: 10
logLevel: "debug"
32 changes: 26 additions & 6 deletions s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,33 @@ public void setSecretAccessKey(String secretKey) {
this.secretAccessKey = secretKey;
}

// support trigger types are ledger, time based, size based
private String triggerType = "ledger";
public String getTriggerType() {
return this.triggerType;
// a timer interval for s3 Object rollover in minutes
private int s3ObjectRolloverMinutes = 10;
public int getS3ObjectRolloverMinutes() {
return this.s3ObjectRolloverMinutes;
}
public void setS3ObjectRolloverMinutes(int s3ObjectRolloverMinutes) {
if (s3ObjectRolloverMinutes>0) {
this.s3ObjectRolloverMinutes = s3ObjectRolloverMinutes;
}
}

private boolean isDebug = false;
public boolean debugLoglevel() {
return isDebug;
}

// currently only support debug level
private String logLevel = "";
public String getLogLevel() {
return this.logLevel;
}
public void setTriggerType(String triggerType) {
this.triggerType = triggerType;
/**
* @param logLevel the logLevel to set
*/
public void setLogLevel(String logLevel) {
this.logLevel = logLevel;
this.isDebug = this.logLevel.equalsIgnoreCase("debug");
}

@FieldDoc(
Expand Down
58 changes: 32 additions & 26 deletions s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.s3.AmazonS3;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -40,9 +41,8 @@
import org.slf4j.LoggerFactory;

/**
* A Simple Redis sink, which stores the key/value records from Pulsar in redis.
* Note that records from Pulsar with null keys or values will be ignored.
* This class expects records from Pulsar to have a key and value that are stored as bytes or a string.
* This is an AWS S3 sink that receives JSON message and store them
* Apache Parquet format in AWS S3.
*/
@Connector(
name = "aws-s3",
Expand All @@ -54,15 +54,13 @@ public class AWSS3Sink implements Sink<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(AWSS3Sink.class);


private AWSS3Config s3Config;
private String bucketName;
private String filePrefix = "";

private int fileSizeBytes = 10 * 1024 * 1024;

private List<Record<byte[]>> incomingList;
private ScheduledExecutorService flushExecutor;
private ScheduledExecutorService s3RolloverExecutor;

private SchemaInfo schemaInfo;
private org.apache.avro.Schema avroSchema;
Expand All @@ -73,35 +71,36 @@ public class AWSS3Sink implements Sink<byte[]> {

private long lastRecordEpoch = 0;

private Duration timeTriggerDuration = Duration.ofHours(1);
private long s3ObjectRolloverMinutes = 10;
private long MILLIS_IN_MINUTE = 60 * 1000;

/**
* Write a message to Sink
* @param inputRecordContext Context of input record from the source
* @param record record to write to sink
* @throws Exception
*/
* Write a message to Sink
*
* @param inputRecordContext Context of input record from the source
* @param record record to write to sink
* @throws Exception
*/
@Override
public void write(Record<byte[]> record) throws Exception {
synchronized (this) {
Optional<Long> eventTimeOptional = record.getEventTime();
if (eventTimeOptional.isPresent()) {
this.lastRecordEpoch = eventTimeOptional.get();
}
this.lastRecordEpoch = Util.getNowMilli();
Long ledgerId = getLedgerId(record.getRecordSequence().get());
LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch);
if (this.s3Config.debugLoglevel()) {
LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch);
}
// Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();
// LOG.error("message option isPresent {}", msgOption.isPresent());

this.filename = getFilename(this.filePrefix, ledgerId);
this.recordWriter.write(record, this.filename);
}

}

@Override
public void close() throws IOException {
LOG.info("s3 sink stopped...");
s3RolloverExecutor.shutdown();
}

/**
Expand All @@ -122,12 +121,13 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}
LOG.info("filePrefix is " + this.filePrefix);

incomingList = Lists.newArrayList();

flushExecutor = Executors.newScheduledThreadPool(1);

S3Storage storage = new S3Storage(this.s3Config, "");
this.recordWriter = RecordWriterProvider.createParquetRecordWriter(s3Config, storage);

this.s3ObjectRolloverMinutes = s3Config.getS3ObjectRolloverMinutes();
LOG.info("s3 object rollover interval {} minutes", this.s3ObjectRolloverMinutes);
s3RolloverExecutor = Executors.newScheduledThreadPool(1);
s3RolloverExecutor.scheduleAtFixedRate(() -> triggerS3ObjectRollover(), 0, s3Config.getS3ObjectRolloverMinutes(), TimeUnit.MINUTES);
}

public static long getLedgerId(long sequenceId) {
Expand All @@ -149,7 +149,13 @@ private static String getFilename(String prefix, Long ledger) {
return prefix + Long.toString(ledger);
}

private boolean isTimeTriggered() {
return Util.isOver(Instant.ofEpochMilli(lastRecordEpoch), timeTriggerDuration);
private void triggerS3ObjectRollover() {
if (this.lastRecordEpoch == 0) {
return;
}

if (MILLIS_IN_MINUTE * s3ObjectRolloverMinutes < (Util.getNowMilli() - this.lastRecordEpoch)) {
this.recordWriter.commit();
}
}
}
25 changes: 17 additions & 8 deletions s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Calendar;

public class Util {

public static String ensureValidBucketName(String bucketName) {
String formatted = bucketName.replaceAll("\\s+","_");
public static String ensureValidBucketName(String bucketName) {
String formatted = bucketName.replaceAll("\\s+", "_");
int length = bucketName.length();
if(length >= 62)
if (length >= 62)
length = 62;
formatted = formatted.substring(0,length);
formatted = formatted.replace(".","d");
formatted = formatted.substring(0, length);
formatted = formatted.replace(".", "d");
formatted = formatted.toLowerCase();
if(formatted.endsWith("-"))
formatted = formatted.substring(0,length - 1);
if (formatted.endsWith("-"))
formatted = formatted.substring(0, length - 1);

return formatted;
}

Expand All @@ -32,11 +33,19 @@ public static String getMinuteTimestamp(long epoch) {

/**
* Check if the current time is over the duration limite since the start.
*
* @param start
* @param limit
* @return
*/
public static boolean isOver(Instant start, Duration limit) {
return Duration.between(start, Instant.now()).compareTo(limit) > 0;
}

/**
* Get the current time in millieseconds.
*/
public static long getNowMilli() {
return Calendar.getInstance().getTimeInMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ public interface RecordWriter extends Closeable {
void close();

/**
* Flush writer's data and commit the records in Kafka. Optionally, this operation might also
* Flush writer's data and commit the records in Pulsar. Optionally, this operation might also
* close the writer.
*/
void commit();

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,19 @@ public void write(Record<byte[]> record, String file) {
String convJson = new String(data); // StandardCharsets.UTF_8);
JsonNode datum = JsonUtil.parse(convJson);
this.avroSchema = JsonUtil.inferSchema(JsonUtil.parse(convJson), "schemafromjson");
log.info(avroSchema.toString());
if (this.config.debugLoglevel()) {
log.info(avroSchema.toString());
}

GenericData.Record convertedRecord = (org.apache.avro.generic.GenericData.Record) JsonUtil.convertToAvro(GenericData.get(), datum, avroSchema);
writeParquet(convertedRecord, file);
this.currentRecord = record;
}

private synchronized void writeParquet(GenericData.Record record, String file) {
log.info("currentFile is {} file name is {}", this.currentFile, file);
if (this.config.debugLoglevel()) {
log.info("currentFile is {} file name is {}", this.currentFile, file);
}
String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread
Record<byte[]> lastRecord = this.currentRecord; // ditto save a copy
if (Strings.isNotBlank(lastFile) && !file.equals(lastFile)) {
Expand Down Expand Up @@ -152,8 +156,39 @@ public void close() {
}

@Override
public void commit() {
public synchronized void commit() {
log.info("ParquetRecordWriter commit()");
if (Strings.isBlank(this.currentFile)) {
return;
}
// TODO: reduce the synchronized block by only protect these two variables
String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread
Record<byte[]> lastRecord = this.currentRecord; // ditto save a copy

ParquetWriter<GenericData.Record> writer = writerMap.get(lastFile);
if (writer == null) {
log.error("fatal error - failed to find parquet writer to match file {}", lastFile);
return;
}
S3ParquetOutputFile s3ParquetOutputFile = s3ParquetOutputFileMap.get(lastFile);
if (s3ParquetOutputFile == null) {
log.error("fatal error - failed to find s3ParquetOutputFile to match file {}", lastFile);
return;
}

// when a new file and parquet writer is required
s3ParquetOutputFile.s3out.setCommit();
try {
writer.close();
} catch (IOException e) {
log.error("close parquet writer exception {}", e.getMessage());
e.printStackTrace();
}
writerMap.remove(lastFile);
s3ParquetOutputFileMap.remove(lastFile);
log.info("cumulative ack all pulsar messages and write to existing parquet writer, map size {}", writerMap.size());
lastRecord.ack(); // depends on cumulative ack

this.currentFile = "";
}

}