From 1c636211b3acdd8993a9d55fb830c06ed65e5d08 Mon Sep 17 00:00:00 2001 From: Ming Luo Date: Thu, 6 Aug 2020 12:24:20 -0400 Subject: [PATCH] s3 object time based rollover --- README.md | 27 +++++++++ s3/config/pulsar-s3-io.yaml | 2 + .../kesque/pulsar/sink/s3/AWSS3Config.java | 32 ++++++++-- .../com/kesque/pulsar/sink/s3/AWSS3Sink.java | 58 ++++++++++--------- .../java/com/kesque/pulsar/sink/s3/Util.java | 25 +++++--- .../pulsar/sink/s3/format/RecordWriter.java | 3 +- .../format/parquet/ParquetRecordWriter.java | 43 ++++++++++++-- 7 files changed, 144 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 55649a6..9760d20 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,33 @@ $ bin/pulsar-admin sinks delete --name aws-s3-test "Deleted successfully" ``` +### Sink configuration +AWS S3 configuration such as accessKeyId, secretAccessKey, region, and bucketname needs to be specifed in [the configuration yaml](./s3/config/pulsar-s3-io.yaml) + +When set `logLevel: "debug"`, debug logs will be printed by the sink. + +Pulsar messages under the same ledger ID are grouped under a single S3 Object file. S3 object file follows the naming convention prefix with the input topic with the ledger Id. All messages are under the same ledger Id are written this file. + +Since the sink uses the latest Pulsar message's ledger ID to detect the ledger rollover, a time based S3 Object rollover is also required to write the last ledger's messages into S3 in the case the messages over an topic are permanently stopped. `s3ObjectRolloverMinutes` in the config must be greater than `managedLedgerMaxLedgerRolloverTimeMinutes` set up in the Pulsar's broker.conf. + +Because of ledger Id is used to identify an S3 object, the sink currently only supports a single input topic. + +How Pulsar managed the ledger is configurable. Here are the default settings (from broker.conf): +``` +# Max number of entries to append to a ledger before triggering a rollover +# A ledger rollover is triggered on these conditions +# * Either the max rollover time has been reached +# * or max entries have been written to the ledged and at least min-time +# has passed +managedLedgerMaxEntriesPerLedger=50000 + +# Minimum time between ledger rollover for a topic +managedLedgerMinLedgerRolloverTimeMinutes=10 + +# Maximum time before forcing a ledger rollover for a topic +managedLedgerMaxLedgerRolloverTimeMinutes=240 +``` + ### Topic schema registry It is mandatory a schema is enforced over the input topics. The Sink would have fatal error to create parquet format when it receives messages with different schemas. diff --git a/s3/config/pulsar-s3-io.yaml b/s3/config/pulsar-s3-io.yaml index b9535a4..1ed12ef 100644 --- a/s3/config/pulsar-s3-io.yaml +++ b/s3/config/pulsar-s3-io.yaml @@ -4,3 +4,5 @@ configs: awsregion: "us-east-2" bucketName: "bucket-name" partSize: 5242880 + s3ObjectRolloverMinutes: 10 + logLevel: "debug" diff --git a/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java b/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java index a107b7d..e8226f6 100644 --- a/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java +++ b/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java @@ -65,13 +65,33 @@ public void setSecretAccessKey(String secretKey) { this.secretAccessKey = secretKey; } - // support trigger types are ledger, time based, size based - private String triggerType = "ledger"; - public String getTriggerType() { - return this.triggerType; + // a timer interval for s3 Object rollover in minutes + private int s3ObjectRolloverMinutes = 10; + public int getS3ObjectRolloverMinutes() { + return this.s3ObjectRolloverMinutes; + } + public void setS3ObjectRolloverMinutes(int s3ObjectRolloverMinutes) { + if (s3ObjectRolloverMinutes>0) { + this.s3ObjectRolloverMinutes = s3ObjectRolloverMinutes; + } + } + + private boolean isDebug = false; + public boolean debugLoglevel() { + return isDebug; + } + + // currently only support debug level + private String logLevel = ""; + public String getLogLevel() { + return this.logLevel; } - public void setTriggerType(String triggerType) { - this.triggerType = triggerType; + /** + * @param logLevel the logLevel to set + */ + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + this.isDebug = this.logLevel.equalsIgnoreCase("debug"); } @FieldDoc( diff --git a/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java b/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java index 38c596e..9acd243 100644 --- a/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java +++ b/s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java @@ -13,6 +13,7 @@ import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import com.amazonaws.services.s3.AmazonS3; import com.google.common.collect.Lists; @@ -40,9 +41,8 @@ import org.slf4j.LoggerFactory; /** - * A Simple Redis sink, which stores the key/value records from Pulsar in redis. - * Note that records from Pulsar with null keys or values will be ignored. - * This class expects records from Pulsar to have a key and value that are stored as bytes or a string. + * This is an AWS S3 sink that receives JSON message and store them + * Apache Parquet format in AWS S3. */ @Connector( name = "aws-s3", @@ -54,15 +54,13 @@ public class AWSS3Sink implements Sink { private static final Logger LOG = LoggerFactory.getLogger(AWSS3Sink.class); - private AWSS3Config s3Config; private String bucketName; private String filePrefix = ""; - + private int fileSizeBytes = 10 * 1024 * 1024; - private List> incomingList; - private ScheduledExecutorService flushExecutor; + private ScheduledExecutorService s3RolloverExecutor; private SchemaInfo schemaInfo; private org.apache.avro.Schema avroSchema; @@ -73,35 +71,36 @@ public class AWSS3Sink implements Sink { private long lastRecordEpoch = 0; - private Duration timeTriggerDuration = Duration.ofHours(1); + private long s3ObjectRolloverMinutes = 10; + private long MILLIS_IN_MINUTE = 60 * 1000; /** - * Write a message to Sink - * @param inputRecordContext Context of input record from the source - * @param record record to write to sink - * @throws Exception - */ + * Write a message to Sink + * + * @param inputRecordContext Context of input record from the source + * @param record record to write to sink + * @throws Exception + */ @Override public void write(Record record) throws Exception { synchronized (this) { - Optional eventTimeOptional = record.getEventTime(); - if (eventTimeOptional.isPresent()) { - this.lastRecordEpoch = eventTimeOptional.get(); - } + this.lastRecordEpoch = Util.getNowMilli(); Long ledgerId = getLedgerId(record.getRecordSequence().get()); - LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch); + if (this.s3Config.debugLoglevel()) { + LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch); + } // Optional> msgOption = record.getMessage(); //.get(); // LOG.error("message option isPresent {}", msgOption.isPresent()); - + this.filename = getFilename(this.filePrefix, ledgerId); this.recordWriter.write(record, this.filename); } - } @Override public void close() throws IOException { LOG.info("s3 sink stopped..."); + s3RolloverExecutor.shutdown(); } /** @@ -122,12 +121,13 @@ public void open(Map config, SinkContext sinkContext) throws Exc } LOG.info("filePrefix is " + this.filePrefix); - incomingList = Lists.newArrayList(); - - flushExecutor = Executors.newScheduledThreadPool(1); - S3Storage storage = new S3Storage(this.s3Config, ""); this.recordWriter = RecordWriterProvider.createParquetRecordWriter(s3Config, storage); + + this.s3ObjectRolloverMinutes = s3Config.getS3ObjectRolloverMinutes(); + LOG.info("s3 object rollover interval {} minutes", this.s3ObjectRolloverMinutes); + s3RolloverExecutor = Executors.newScheduledThreadPool(1); + s3RolloverExecutor.scheduleAtFixedRate(() -> triggerS3ObjectRollover(), 0, s3Config.getS3ObjectRolloverMinutes(), TimeUnit.MINUTES); } public static long getLedgerId(long sequenceId) { @@ -149,7 +149,13 @@ private static String getFilename(String prefix, Long ledger) { return prefix + Long.toString(ledger); } - private boolean isTimeTriggered() { - return Util.isOver(Instant.ofEpochMilli(lastRecordEpoch), timeTriggerDuration); + private void triggerS3ObjectRollover() { + if (this.lastRecordEpoch == 0) { + return; + } + + if (MILLIS_IN_MINUTE * s3ObjectRolloverMinutes < (Util.getNowMilli() - this.lastRecordEpoch)) { + this.recordWriter.commit(); + } } } \ No newline at end of file diff --git a/s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java b/s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java index 1275473..36b221e 100644 --- a/s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java +++ b/s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java @@ -3,20 +3,21 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.time.Instant; +import java.util.Calendar; public class Util { - public static String ensureValidBucketName(String bucketName) { - String formatted = bucketName.replaceAll("\\s+","_"); + public static String ensureValidBucketName(String bucketName) { + String formatted = bucketName.replaceAll("\\s+", "_"); int length = bucketName.length(); - if(length >= 62) + if (length >= 62) length = 62; - formatted = formatted.substring(0,length); - formatted = formatted.replace(".","d"); + formatted = formatted.substring(0, length); + formatted = formatted.replace(".", "d"); formatted = formatted.toLowerCase(); - if(formatted.endsWith("-")) - formatted = formatted.substring(0,length - 1); - + if (formatted.endsWith("-")) + formatted = formatted.substring(0, length - 1); + return formatted; } @@ -32,6 +33,7 @@ public static String getMinuteTimestamp(long epoch) { /** * Check if the current time is over the duration limite since the start. + * * @param start * @param limit * @return @@ -39,4 +41,11 @@ public static String getMinuteTimestamp(long epoch) { public static boolean isOver(Instant start, Duration limit) { return Duration.between(start, Instant.now()).compareTo(limit) > 0; } + + /** + * Get the current time in millieseconds. + */ + public static long getNowMilli() { + return Calendar.getInstance().getTimeInMillis(); + } } \ No newline at end of file diff --git a/s3/src/main/java/com/kesque/pulsar/sink/s3/format/RecordWriter.java b/s3/src/main/java/com/kesque/pulsar/sink/s3/format/RecordWriter.java index 8e89a55..7408b9d 100644 --- a/s3/src/main/java/com/kesque/pulsar/sink/s3/format/RecordWriter.java +++ b/s3/src/main/java/com/kesque/pulsar/sink/s3/format/RecordWriter.java @@ -21,9 +21,8 @@ public interface RecordWriter extends Closeable { void close(); /** - * Flush writer's data and commit the records in Kafka. Optionally, this operation might also + * Flush writer's data and commit the records in Pulsar. Optionally, this operation might also * close the writer. */ void commit(); - } \ No newline at end of file diff --git a/s3/src/main/java/com/kesque/pulsar/sink/s3/format/parquet/ParquetRecordWriter.java b/s3/src/main/java/com/kesque/pulsar/sink/s3/format/parquet/ParquetRecordWriter.java index a0f8ac2..de977a1 100644 --- a/s3/src/main/java/com/kesque/pulsar/sink/s3/format/parquet/ParquetRecordWriter.java +++ b/s3/src/main/java/com/kesque/pulsar/sink/s3/format/parquet/ParquetRecordWriter.java @@ -73,7 +73,9 @@ public void write(Record record, String file) { String convJson = new String(data); // StandardCharsets.UTF_8); JsonNode datum = JsonUtil.parse(convJson); this.avroSchema = JsonUtil.inferSchema(JsonUtil.parse(convJson), "schemafromjson"); - log.info(avroSchema.toString()); + if (this.config.debugLoglevel()) { + log.info(avroSchema.toString()); + } GenericData.Record convertedRecord = (org.apache.avro.generic.GenericData.Record) JsonUtil.convertToAvro(GenericData.get(), datum, avroSchema); writeParquet(convertedRecord, file); @@ -81,7 +83,9 @@ public void write(Record record, String file) { } private synchronized void writeParquet(GenericData.Record record, String file) { - log.info("currentFile is {} file name is {}", this.currentFile, file); + if (this.config.debugLoglevel()) { + log.info("currentFile is {} file name is {}", this.currentFile, file); + } String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread Record lastRecord = this.currentRecord; // ditto save a copy if (Strings.isNotBlank(lastFile) && !file.equals(lastFile)) { @@ -152,8 +156,39 @@ public void close() { } @Override - public void commit() { + public synchronized void commit() { log.info("ParquetRecordWriter commit()"); + if (Strings.isBlank(this.currentFile)) { + return; + } + // TODO: reduce the synchronized block by only protect these two variables + String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread + Record lastRecord = this.currentRecord; // ditto save a copy + + ParquetWriter writer = writerMap.get(lastFile); + if (writer == null) { + log.error("fatal error - failed to find parquet writer to match file {}", lastFile); + return; + } + S3ParquetOutputFile s3ParquetOutputFile = s3ParquetOutputFileMap.get(lastFile); + if (s3ParquetOutputFile == null) { + log.error("fatal error - failed to find s3ParquetOutputFile to match file {}", lastFile); + return; + } + + // when a new file and parquet writer is required + s3ParquetOutputFile.s3out.setCommit(); + try { + writer.close(); + } catch (IOException e) { + log.error("close parquet writer exception {}", e.getMessage()); + e.printStackTrace(); + } + writerMap.remove(lastFile); + s3ParquetOutputFileMap.remove(lastFile); + log.info("cumulative ack all pulsar messages and write to existing parquet writer, map size {}", writerMap.size()); + lastRecord.ack(); // depends on cumulative ack + + this.currentFile = ""; } - } \ No newline at end of file