Skip to content

Jetstream to HDFS

kevinthfang edited this page Jul 28, 2015 · 2 revisions

Overview

This project was conceived to enable users to collect, transform, and store Jetstream events into HDFS. Events are collected in Kafka queues and then forwarded to HDFSBatchProcessor in order to transform and write batches of events to HDFS. The application is built using the following framework provided channel & processor implementation viz AbstractBatchInboundChannel & HDFSBatchProcessor.

Design

UML

HdfsBatchProcessor is a specialized Jetstream batch processor that transforms the data it receives through the batch interface and stores the data into HDFS. It loads the data from different partitions into different files in HDFS. It provides several extension points to let the user control how to organize the folder structure in HDFS, the format of the data in HDFS, an injectable transformer to transform from Jetstream event format to other formats.

The HdfsBatchProcessor can be linked to a batch inbound channel. The batch inbound channel is the source of the data that can be loaded to HDFS. InboundKafkaChannel is a batch inbound channel. It consumes data from Kafka. For the details of InboundKafkaChannel.

Capabilities

  • No Data Loss. HdfsBatchProcesser takes the at-least-once delivery semantics to consume data from the batch source. It only commits the offset of the upstream channel when it successfully commits data to a HDFS file. In case of exeptions, the offset will not be committed so that the unprocessed events can be replayed.
  • Batch Listeners. Custom processing logic can be injected into the batch pipeline using a framework provided BatchListener interface. E.g. a specialized batch listener can add stats during the data loading and output the stats into some store. Jetstream provides several default listeners to record stats, such as BaseStatsRecorder, EventTsBasedStatsRecorder.
  • Extensible event serialization. The HdfsBatchProcessor provides a plugin mechanism to inject a transformer to transform the jetstream event to the destination data format. Several default event transformers are provided TextEventWriter, SequenceEventWriter, AvroEventWriter. They transform the Jetstream event to text, sequence file and avro format.

FolderResolver decides the key part of the folder structure of the files that are generated by the HdfsBatchProcessor. For each file the file naming convension is:

<rootFolder>/<resolvedFolder>/<eventType>/<destFileName>
  • rootFolder is the root HDFS folder under which the events are stored. E.g. /user/batch1/behaviour_events/
  • resolvedFolder is sub folder under root folder and is dynamically generated by the FolderResolver. E.g. A dynamically resolved folder name might correspond to a timestamp like yyyy/MM/dd/hh
  • eventType corresponds to the type of the Jetstream event. This is specified in the event through the reserved key "js_ev_type". This is specified as part of the event configuration.
  • destFileName is the fileName. The naming of the file must follow the following convention - {prefix}topic-partition-startOffset-endOffset{suffix}, where "prefix" and "suffix" can be any meaningful string relevant to your use case.

Configuration

Spring Xml Specification for Hdfs client config

<bean id="yourHdfsClientConfig"
    class="com.ebay.jetstream.event.processor.hdfs.HdfsClientConfig">
    <!-- [Required] Url string of your HDFS name node. -->
    <property name="hdfsUrl" value="hdfs://your.hdfs.host:port" />
    <!-- [Required] The user name that has the access to the HDFS. -->
    <property name="user" value="user1" />
    <!-- [Optional] Properties of hadoop Configuration. -->
    <property name="hadoopProperties">
        <props>
            <prop key="dfs.datanode.socket.write.timeout">60000</prop>
            <prop key="dfs.replication">3</prop>
            <prop key="dfs.support.append">true</prop>
        </props>
    </property>
</bean>

Spring Xml Specification for Hdfs client

<bean id="yourHdfsClient" class="com.ebay.jetstream.event.processor.hdfs.HdfsClient">
    <property name="config" ref="yourHdfsClientConfig" />
</bean>

Spring Xml Specification for HDFS processor

<bean id="yourHdfsBatchProcessorConfig"
    class="com.ebay.jetstream.event.processor.hdfs.HdfsBatchProcessorConfig">
    <!-- [Required] The output folder in HDFS. -->
    <property name="outputFolder" value="/your/output" />
    <!-- [Required] The working folder in HDFS, which stores the tmp files. -->
    <property name="workingFolder" value="/your/working" />
    <!-- [Optional] The working folder in HDFS, which stores the error files. Required only when logErrorEvents is true -->
    <property name="errorFolder" value="/your/error" />
    <!-- [Optional] When HDFS can't be accessed, how long will it wait to retry. Default is 60000. -->
    <property name="waitForFsAvaliableInMs" value="60000" />
    <!-- [Optional] Whether error events should be saved. Defualt is true-->
    <property name="logErrorEvents" value="true" />
    <!-- [Optional] The suffix of the error file. Defualt is '.error'-->
    <property name="errorFileSuffix" value=".error" />
</bean>

Spring Xml Specification for FolderResolver

<bean id="yourFolderResolver"
    class="com.ebay.jetstream.event.processor.hdfs.resolver.EventTimestampFolderResolver">
    <!-- [Optional] The key of the timestamp property in the jetstream event. If it is not specified, the controller uses the system timestamp when the event is passed to the processor-->
    <property name="timestampKey" value="timestamp" />
    <!-- [Optional] The interval of each folder. Default is 3600000 (1 hour)-->
    <property name="folderIntervalInMs" value="3600000" />
    <!-- [Optional] This controls when to move to the next folder. If the ratio of the event from next time slot excceed this, the controller will tell the processor to move to the next folder. Default is 0.2-->
    <property name="moveToNextRatio" value="0.2" />
    <!-- [Optional] The format of the folder, in the Java date format. Defualt is 'yyyyMMdd/HH_mm' -->
    <property name="folderPathFormat" value="yyyyMMdd/HH_mm" />
    <!-- [Optional] When there is no previous folders. The controller will sample the events to caculate the current time slot. How many events will the controller take 1 sample across. Default is 5 -->
    <property name="eventSampleFactor" value="5" />
</bean>

Jetstream provides 2 build-in FolderResolvers. On is SystemTimeFolderResolver, which decides folders based on the system time that the events are written to HDFS. The other one is EventTimestampFolderResolver, which decides folders based on the a timestamp property in the JetstreamEvent. You can also implements your own FolderResolver and configure it in the spring context.

Spring Xml Specification for event writer

TextEventWriter

TextEventWriter save the data as text files. Here is an example of using it.

<bean id="yourEventWriter"
    class="com.ebay.jetstream.event.processor.hdfs.writer.TextEventWriter">
    <property name="transformer" ref="yourEventTransformer"></property>
</bean>

You need to specify a EventTransformer implementation which transforms a Jestream event to a String.

SequenceEventWriter

SequenceEventWriter loads the data into sequence files. Here is an example of using it.

<bean id="yourEventWriter"
    class="com.ebay.jetstream.event.processor.hdfs.writer.SequenceEventWriter">
    <!-- [Required] Link to the HdfsClient. The configuration of the HdfsClient can be used to created the sequence file. -->
    <property name="hdfs" ref="yourHdfsClient"></property>
    <!-- [Optional] Full class name of your key class. Default is org.apache.hadoop.io.NullWritable -->
    <property name="keyClassName" value="your.key.class"></property>
    <!-- [Optional] Full class name of your value class. Default is org.apache.hadoop.io.Text -->
    <property name="valueClassName" value="your.value.class"></property>
    <!-- [Optional] Link to the transformer which transforms the JestreamEvent to the keyClassName. Default is a transformer which always returns NullWritable -->
    <property name="keyTransformer" ref="yourKeyTransformer"></property>
    <!-- [Optional] Link to the transformer which transforms the JestreamEvent to the valueClassName. Default is a JsonEventTransformer -->
    <property name="valueTransformer" ref="yourValueTransformer"></property>
    <!-- [Optional] Compression type of the sequence file. Can be 'NONE', 'RECORD' or 'BLOCK'. Default is 'NONE' -->
    <property name="compressionType" ref="NONE"></property>
</bean>

GenericAvroEventWriter

GenericAvroEventWriter loads the data into avro files using the generic api that avro provides.

<bean id="yourEventWriter"
    class="com.ebay.jetstream.event.processor.hdfs.writer.GenericAvroEventWriter">
    <!-- [Required] Link to the transformer which transforms the JestreamEvent to avro GenericRecord. -->
    <property name="transformer" ref="yourEventTransformer"></property>
    <!-- [Optional] Content of the avro schema. One of 'schemaContent' or 'schemaLocation' should be specified. -->
    <property name="schemaContent" value="your schema content"></property>
    <!-- [Optional] Location of the avro schema file in the class path. If 'schemaContent' is specified then schemaLocation is ignored. -->
    <property name="schemaLocation" value="your.schema.location"></property>
    <!-- [Optional] Codec that used to create the avro writer. Default is deflate -->
    <property name="codec" value="deflate"></property>
    <!-- [Optional] Only used when the codec is deflate. -->
    <property name="codecLevel" ref="-1"></property>
</bean>

SpecificAvroEventWriter

SpecificAvroEventWriter loads the data into avro files using the specific api that avro provides.

<bean id="yourEventWriter"
    class="com.ebay.jetstream.event.processor.hdfs.writer.SpecificAvroEventWriter">
    <!-- [Required] Full class name of your specific avro class. E.g. the code gen class of avsc.-->
    <property name="className" ref="your.specific.avro.class"></property>
    <!-- [Required] Link to the transformer that transforms the JestreamEvent to the class your specified-->
    <property name="transformer" ref="yourEventTransformer"></property>
    <!-- [Optional] Content of the avro schema.-->
    <property name="schemaContent" value="your schema content"></property>
    <!-- [Optional] Location of the avro schema file in the class path. If 'schemaContent' is specified then schemaLocation is ignored. -->
    <property name="schemaLocation" value="your.schema.location"></property>
    <!-- [Optional] Codec that used to create the avro writer. Default is deflate -->
    <property name="codec" value="deflate"></property>
    <!-- [Optional] Only used when the codec is deflate. -->
    <property name="codecLevel" ref="-1"></property>
</bean>

If none of 'schemaContent' and 'schemaLocation' is specified. It will try to call the getClassSchema of your avro class.

Spring Xml Specification for HdfsBatchProcessor

<bean id="yourHdfsBatchProcessor"
    class="com.ebay.jetstream.event.processor.hdfs.HdfsBatchProcessor">
    <property name="config" value="yourHdfsBatchProcessorConfig"></property>
    <property name="hdfs" value="yourHdfsClient"></property>
    <!-- [Optional] The EventWriter that is used when the eventType of a Jetstream event can't be found in the configuration of eventWriters. When the eventWriters property is empty. This property it's required. In this case, all the events are loaded to one folder. And there is no sub folders for each eventType.-->
    <property name="defaultEventWriter" ref="yourEventWriter"></property>
    <!-- [Optional] Jetstream allows you to define different EventWriters for different eventTypes. The eventWriters property is a map. The key is the eventType name, the value is the reference to your EventWriter bean. Jestream will create sub folders for each eventType which is defined here-->
    <property name="eventWriters">
        <map>
            <entry key="type1" value-ref="yourEventWriter1" />
            <entry key="type2" value-ref="yourEventWriter2" />
        </map>
    </property>
    <!-- [Optional] A EventWriter which is used to generate the error file. Default is an SequenceEventWriterFactory with all default configuration. It dumps the error events in JSON format to sequence files in the error folder-->
    <property name="errorEventWriter" value="yourErrorEventWriter"></property>
    <!-- [Optional] BatchLiteners. You can add 0 to n listeners here. -->
    <property name="listeners">
        <list>
            <ref bean="yourListener1" />
            <ref bean="yourListener2" />
        </list>
    </property>
</bean>
Clone this wiki locally