Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
weisong44 committed Oct 18, 2018
2 parents 7c777fe + 0c2076c commit c9e8bf7
Show file tree
Hide file tree
Showing 72 changed files with 2,658 additions and 397 deletions.
Expand Up @@ -23,19 +23,29 @@


/**
* The base interface for all user-implemented applications in Samza.
* A {@link SamzaApplication} describes the inputs, outputs, state, configuration and the logic
* for processing data from one or more streaming sources.
* <p>
* The main processing logic of the user application should be implemented in {@link SamzaApplication#describe(ApplicationDescriptor)}
* method. Sub-classes {@link StreamApplication} and {@link TaskApplication} are specific interfaces for applications
* written in high-level DAG and low-level task APIs, respectively.
* This is the base {@link SamzaApplication}. Implement a {@link StreamApplication} to describe the
* processing logic using Samza's High Level API in terms of {@link org.apache.samza.operators.MessageStream}
* operators, or a {@link TaskApplication} to describe it using Samza's Low Level API in terms of per-message
* processing logic.
* <p>
* A {@link SamzaApplication} implementation must have a no-argument constructor, which will be used by the framework
* to create new instances and call {@link #describe(ApplicationDescriptor)}.
* <p>
* Per container context may be managed using {@link org.apache.samza.context.ApplicationContainerContext} and
* set using {@link ApplicationDescriptor#withApplicationContainerContextFactory}. Similarly, per task context
* may be managed using {@link org.apache.samza.context.ApplicationTaskContext} and set using
* {@link ApplicationDescriptor#withApplicationTaskContextFactory}.
*/
@InterfaceStability.Evolving
public interface SamzaApplication<S extends ApplicationDescriptor> {

/**
* Describes the user processing logic via {@link ApplicationDescriptor}
* Describes the inputs, outputs, state, configuration and processing logic using the provided {@code appDescriptor}.
*
* @param appDesc the {@link ApplicationDescriptor} object to describe user application logic
* @param appDescriptor the {@link ApplicationDescriptor} to use for describing the application.
*/
void describe(S appDesc);
void describe(S appDescriptor);
}
Expand Up @@ -21,56 +21,59 @@
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;


/**
* Describes and initializes the transforms for processing message streams and generating results in high-level API.
* A {@link StreamApplication} describes the inputs, outputs, state, configuration and the processing logic
* in Samza's High Level API.
* <p>
* A typical {@link StreamApplication} implementation consists of the following stages:
* <ol>
* <li>Configuring the inputs, outputs and state (tables) using the appropriate
* {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
* {@link org.apache.samza.system.descriptors.InputDescriptor}s,
* {@link org.apache.samza.system.descriptors.OutputDescriptor}s and
* {@link org.apache.samza.table.descriptors.TableDescriptor}s
* <li>Obtaining the corresponding
* {@link org.apache.samza.operators.MessageStream}s,
* {@link org.apache.samza.operators.OutputStream}s and
* {@link org.apache.samza.table.Table}s from the provided {@link StreamApplicationDescriptor}.
* <li>Defining the processing logic using operators and functions on the streams and tables thus obtained.
* E.g., {@link org.apache.samza.operators.MessageStream#filter(org.apache.samza.operators.functions.FilterFunction)}
* </ol>
* <p>
* The following example removes page views older than 1 hour from the input stream:
* The following example {@link StreamApplication} removes page views older than 1 hour from the input stream:
* <pre>{@code
* public class PageViewFilter implements StreamApplication {
* public void describe(StreamAppDescriptor appDesc) {
* KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
* public void describe(StreamApplicationDescriptor appDescriptor) {
* KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
* trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
*
* trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
* trackingSystem.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
* trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
*
* MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
* OutputStream<PageViewEvent> recentPageViewEvents = appDesc.getOutputStream(outputStreamDescriptor);
* MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
* OutputStream<PageViewEvent> recentPageViewEvents = appDescriptor.getOutputStream(outputStreamDescriptor);
*
* pageViewEvents
* .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
* .sendTo(recentPageViewEvents);
* }
* }
* }</pre>
*<p>
* The example above can be run using an ApplicationRunner:
* <pre>{@code
* public static void main(String[] args) {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
* PageViewFilter app = new PageViewFilter();
* ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
* runner.run();
* runner.waitForFinish();
* }
* }</pre>
*
* <p>
* All operator function implementations used in a {@link StreamApplication} must be {@link java.io.Serializable}. Any
* context required within an operator function may be managed by implementing the
* {@link org.apache.samza.operators.functions.InitableFunction#init} and
* {@link org.apache.samza.operators.functions.ClosableFunction#close} methods in the function implementation.
* <p>
* Functions may implement the {@link org.apache.samza.operators.functions.ScheduledFunction} interface
* to schedule and receive periodic callbacks from the Samza framework.
* <p>
* Implementation Notes: Currently {@link StreamApplication}s are wrapped in a {@link org.apache.samza.task.StreamTask}
* during execution. The execution planner will generate a serialized DAG which will be deserialized in each
* {@link org.apache.samza.task.StreamTask} instance used for processing incoming messages. Execution is synchronous
* and thread-safe within each {@link org.apache.samza.task.StreamTask}.
*
* <p>
* A {@link StreamApplication} implementation must have a proper fully-qualified class name and a default constructor
* with no parameters to ensure successful instantiation in both local and remote environments.
* Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},
* {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are initable and closable. They are initialized
* before messages are delivered to them and closed after their execution when the {@link org.apache.samza.task.StreamTask}
* instance is closed. See {@link org.apache.samza.operators.functions.InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
* Function implementations are required to be {@link java.io.Serializable}.
* and thread-safe within each {@link org.apache.samza.task.StreamTask}. Multiple tasks may process their
* messages concurrently depending on the job parallelism configuration.
*/
@InterfaceStability.Evolving
public interface StreamApplication extends SamzaApplication<StreamApplicationDescriptor> {
Expand Down
Expand Up @@ -23,64 +23,49 @@


/**
* Describes and initializes the transforms for processing message streams and generating results in low-level API. Your
* application is expected to implement this interface.
* A {@link TaskApplication} describes the inputs, outputs, state, configuration and the processing logic
* in Samza's Low Level API.
* A typical {@link TaskApplication} implementation consists of the following stages:
* <ol>
* <li>Configuring the inputs, outputs and state (tables) using the appropriate
* {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
* {@link org.apache.samza.system.descriptors.StreamDescriptor}s and
* {@link org.apache.samza.table.descriptors.TableDescriptor}s
* <li>Adding these descriptors to the provided {@link TaskApplicationDescriptor}.
* <li>Defining the processing logic by implementing a {@link org.apache.samza.task.StreamTask} or
* {@link org.apache.samza.task.AsyncStreamTask} that operates on each
* {@link org.apache.samza.system.IncomingMessageEnvelope} one at a time.
* <li>Setting a {@link org.apache.samza.task.TaskFactory} using
* {@link TaskApplicationDescriptor#setTaskFactory(org.apache.samza.task.TaskFactory)} that creates instances of the
* task above. The {@link org.apache.samza.task.TaskFactory} implementation must be {@link java.io.Serializable}.
* </ol>
* <p>
* The following example removes page views older than 1 hour from the input stream:
* The following example {@link TaskApplication} removes page views older than 1 hour from the input stream:
* <pre>{@code
* public class PageViewFilter implements TaskApplication {
* public void describe(TaskAppDescriptor appDesc) {
* KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor(PageViewTask.SYSTEM);
* public void describe(TaskApplicationDescriptor appDescriptor) {
* KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
* trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new JsonSerdeV2<>(PageViewEvent.class));
*
* trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
* trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new JsonSerdeV2<>(PageViewEvent.class)));
* trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
*
* appDesc.addInputStream(inputStreamDescriptor);
* appDesc.addOutputStream(outputStreamDescriptor);
* appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
* appDescriptor.addInputStream(inputStreamDescriptor);
* appDescriptor.addOutputStream(outputStreamDescriptor);
* appDescriptor.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
* }
* }
*
* public class PageViewTask implements StreamTask {
* final static String TASK_INPUT = "pageViewEvents";
* final static String TASK_OUTPUT = "recentPageViewEvents";
* final static String SYSTEM = "kafka";
*
* public void process(IncomingMessageEnvelope message, MessageCollector collector,
* TaskCoordinator coordinator) {
* public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
* PageViewEvent m = (PageViewEvent) message.getValue();
* if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) {
* collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, TASK_OUTPUT),
* message.getKey(), message.getKey(), m));
* collector.send(new OutgoingMessageEnvelope(
* new SystemStream("tracking", "recentPageViewEvent"), message.getKey(), message.getKey(), m));
* }
* }
* }
* }</pre>
*
*<p>
* The example above can be run using an ApplicationRunner:
* <pre>{@code
* public static void main(String[] args) {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
* PageViewFilter app = new PageViewFilter();
* ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
* runner.run();
* runner.waitForFinish();
* }
* }</pre>
*
* <p>
* Implementation Notes: {@link TaskApplication} allow users to instantiate {@link org.apache.samza.task.StreamTask} or
* {@link org.apache.samza.task.AsyncStreamTask} when describing the processing logic. A new {@link TaskApplicationDescriptor }
* instance will be created and described by the user-defined {@link TaskApplication} when planning the execution.
* {@link org.apache.samza.task.TaskFactory} is required to be serializable.
*
* <p>
* The user-implemented {@link TaskApplication} class must be a class with proper fully-qualified class name and
* a default constructor with no parameters to ensure successful instantiation in both local and remote environments.
*/
@InterfaceStability.Evolving
public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor> {
Expand Down
Expand Up @@ -29,32 +29,42 @@


/**
* The interface class to describe the configuration, input and output streams, and processing logic in a
* An {@link ApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
* processing logic for a {@link org.apache.samza.application.SamzaApplication}.
* <p>
* This is the base {@link ApplicationDescriptor} and provides functionality common to all
* {@link org.apache.samza.application.SamzaApplication}.
* {@link org.apache.samza.application.StreamApplication#describe} will provide access to a
* {@link StreamApplicationDescriptor} with additional functionality for describing High Level API applications.
* Similarly, {@link org.apache.samza.application.TaskApplication#describe} will provide access to a
* {@link TaskApplicationDescriptor} with additional functionality for describing Low Level API applications.
* <p>
* Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for
* applications written in high-level {@link org.apache.samza.application.StreamApplication} and low-level
* {@link org.apache.samza.application.TaskApplication} APIs, respectively.
*
* @param <S> sub-class of user application descriptor.
* Use the {@link ApplicationDescriptor} to set the container scope context factory using
* {@link ApplicationDescriptor#withApplicationContainerContextFactory}, and task scope context factory using
* {@link ApplicationDescriptor#withApplicationTaskContextFactory}. Please note that the terms {@code container}
* and {@code task} here refer to the units of physical and logical parallelism, not the programming API.
*/
@InterfaceStability.Evolving
public interface ApplicationDescriptor<S extends ApplicationDescriptor> {

/**
* Get the {@link Config} of the application
* @return config of the application
* Get the configuration for the application.
* @return config for the application
*/
Config getConfig();

/**
* Sets the default SystemDescriptor to use for the application. This is equivalent to setting
* {@code job.default.system} and its properties in configuration.
* Sets the {@link SystemDescriptor} for the default system for the application.
* <p>
* The default system is used by the framework for creating any internal (e.g., coordinator, changelog, checkpoint)
* streams. In an {@link org.apache.samza.application.StreamApplication}, it is also used for creating any
* intermediate streams; e.g., those created by the {@link org.apache.samza.operators.MessageStream#partitionBy} and
* {@link org.apache.samza.operators.MessageStream#broadcast} operators.
* <p>
* If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
*
* @param defaultSystemDescriptor the default system descriptor to use
* @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
* @param defaultSystemDescriptor the {@link SystemDescriptor} for the default system for the application
* @return this {@link ApplicationDescriptor}
*/
S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);

Expand All @@ -64,10 +74,11 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
* context can be accessed through the {@link org.apache.samza.context.Context}.
* <p>
* Setting this is optional.
* <p>
* The provided {@code factory} instance must be {@link java.io.Serializable}.
*
* @param factory the {@link ApplicationContainerContextFactory} for this application
* @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
* {@link ApplicationContainerContextFactory}
* @return this {@link ApplicationDescriptor}
*/
S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);

Expand All @@ -77,31 +88,37 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
* accessed through the {@link org.apache.samza.context.Context}.
* <p>
* Setting this is optional.
* <p>
* The provided {@code factory} instance must be {@link java.io.Serializable}.
*
* @param factory the {@link ApplicationTaskContextFactory} for this application
* @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
* {@link ApplicationTaskContextFactory}
* @return this {@link ApplicationDescriptor}
*/
S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);

/**
* Sets the {@link ProcessorLifecycleListenerFactory} for this application.
*
* <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
* <p>
* Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
* plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
* the application.
* <p>
* The provided {@code factory} instance must be {@link java.io.Serializable}.
*
* @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
* with callback methods before and after the start/stop of each StreamProcessor in the application
* @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
* @return this {@link ApplicationDescriptor}
*/
S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);

/**
* Sets a set of customized {@link MetricsReporterFactory}s in the application
* Sets the {@link org.apache.samza.metrics.MetricsReporterFactory}s for creating the
* {@link org.apache.samza.metrics.MetricsReporter}s to use for the application.
* <p>
* The provided {@link MetricsReporterFactory} instances must be {@link java.io.Serializable}.
*
* @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
* @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
* @param reporterFactories a map of {@link org.apache.samza.metrics.MetricsReporter} names to their factories.
* @return this {@link ApplicationDescriptor}
*/
S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);

Expand Down

0 comments on commit c9e8bf7

Please sign in to comment.