Skip to content
Branch: master
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Type Name Latest commit message Commit time
Failed to load latest commit information.



This module provides an integration of nuxeo-stream with Nuxeo. It adds 2 services:

  • a Kafka configuration service: to register Kafka and Zookeeper access and consumer producer properties.
  • a Stream service: to define LogManager configuration, initialize Log and start StreamProcessor.

Kafka Configurations

You can register one or multiple Kafka configurations using this Nuxeo extension point:

<?xml version="1.0"?>
<component name="my.project.kafka.contrib">
  <extension target="" point="kafkaConfig">
    <kafkaConfig name="default" zkServers="localhost:2181" topicPrefix="nuxeo-">      
        <property name="bootstrap.servers">localhost:9092</property>
        <property name="bootstrap.servers">localhost:9092</property>
        <property name="">65000</property>
        <property name="">60000</property>
        <property name="">20000</property>
        <property name="">1000</property>
        <property name="max.poll.records">50</property>

This Kafka configuration named default can be used in the Log configuration below.

Make sure you have read the nuxeo-stream README to setup properly Kafka.

Stream Service

This service enable to define Log configurations and to register stream processors.

The Log configuration

There are 2 types of Log configurations:

  • Chronicle: limited for single node: all producers and consumers must be on the same node.
  • Kafka: required for distributed producers and consumers.

You can define a Log configuration with the following Nuxeo extension point:

<?xml version="1.0"?>
<component name="">
  <extension target="" point="logConfig">
    <!-- Chronicle impl, default storage under default directory, default retention -->
    <logConfig name="default" />
    <!-- Chronicle impl, storage in /tmp/imp, a week of retention -->
    <logConfig name="import" type="chronicle">
      <option name="directory">imp</option>
      <option name="basePath">/tmp</option>
      <option name="retention">7d</option>
    <!-- Chronicle impl, default storage and retention,
         create a Log named myStream with 7 partitions if it does not exist. -->
    <logConfig name="custom">
      <log name="myStream" size="7" />
    <!-- Kafka impl, referencing the default Kafka config -->
    <logConfig name="work" type="kafka">
      <option name="kafkaConfig">default</option>
    <!-- Kafka impl,
         create a Log named pubSub if it does not exist. -->
    <logConfig name="nuxeo" type="kafka">
      <option name="kafkaConfig">default</option>
      <log name="pubSub" size="1" />


The default Log type is Chronicle.

The default retention for Chronicle is 4 days, this can be changed using the nuxeo.conf option:

The retention value is expressed as a string like: 12h or 7d, respectively for 12 hours and 7 days.

The default storage for Chronicle is: ${}/data/stream, this path can be changed using the nuxeo.conf option:

Using Log from Nuxeo

The Nuxeo Stream service enables to get and share access to LogManager:

  StreamService service = Framework.getService(StreamService.class);
  LogManager manager = service.getLogManager("custom");
  // write a record to myStream, the log exists because it has been initialized by the service 
  LogAppender<Record> appender = manager.getAppender("myStream");
  appender.append(key, Record.of(key, value.getBytes()));

  // read
  try (LogTailer<Record> tailer = manager.createTailer("myGroup", "myStream")) {
      LogRecord<Record> logRecord =;
      assertEquals(key, logRecord.message().key);
  // don't close the manager, this is done by the service

Stream processing

It is possible to register stream processors, this way they are initialized and started with Nuxeo.

The extension point refer to a class that returns the topology of computations, the settings are configurable in the contribution. Also the retry policy can be set to a specific computation or for all using default as policy name, the delay between retries, exponentially backing off to the maxDelay and multiplying successive delays by a 2 factor the maxRetries sets the max number of retries to perform. If continueOnFailure is true then the computation will checkpoint the record in error and continue processing new record. The default policy when unspecified is no retry and abort on failure (continueOnFailure is false).

<?xml version="1.0"?>
<component name="">
  <extension target="" point="streamProcessor">    
  <streamProcessor name="myStreamProcessor" logConfig="default" defaultConcurrency="4" defaultPartitions="12"
      <stream name="output" partitions="1" />
      <computation name="myComputation" concurrency="8" />
      <policy name="myComputation" continueOnFailure="false" maxRetries="3" delay="500ms" maxDelay="10s" />

Following Project QA Status

Build Status

About Nuxeo

Nuxeo dramatically improves how content-based applications are built, managed and deployed, making customers more agile, innovative and successful. Nuxeo provides a next generation, enterprise ready platform for building traditional and cutting-edge content oriented applications. Combining a powerful application development environment with SaaS-based tools and a modular architecture, the Nuxeo Platform and Products provide clear business value to some of the most recognizable brands including Verizon, Electronic Arts, Sharp, FICO, the U.S. Navy, and Boeing. Nuxeo is headquartered in New York and Paris. More information is available at

You can’t perform that action at this time.