# Flow API

Java flow has four major building block

> - Publisher
> - Subscriber
> - Processor
> - Subscription

## Publisher

__Publisher__ is function interface, _Publisher_ publishes asynchronously usually with help of _Executor_.
We need to implement the _subscribe()_ method to create our own publisher.




In [4]:
import java.util.concurrent.Flow;

public class MySubscriber<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
    }

    @Override
    public void onNext(T item) {
        System.out.println(item); // Print it.
        subscription.request(1); // Ask for one more.
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("DONE"); // Done with the stream of data.
    }
}

In [5]:
import java.util.List;
import java.util.concurrent.SubmissionPublisher;

public class Main {

    public static void main(String[] args) {
        System.out.println("Start Flow");
        var items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9");
        var publisher = new SubmissionPublisher<>();
        publisher.subscribe(new MySubscriber<>());

        items.forEach(s -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        publisher.offer(
            s,
            2,
            TimeUnit.SECONDS,
            (subscriber, msg) -> {
              subscriber.onError(
                new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
                  .getSubscriberName() + "! You are too slow getting magazines" +
                  " and we don't have more space for them! " +
                  "I'll drop your magazine: " + msg));
              return false; // don't retry, we don't believe in second opportunities
        });
            System.out.println("published " + s);
        });

        System.out.println("Start End");
        
        while (publisher.estimateMaximumLag() > 0) {
            try {
                Thread.sleep(500000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        publisher.close();
    }
}

In [6]:
Main.main(new String[]{""})

UnresolvedReferenceException: Attempt to use definition snippet with unresolved references in Snippet:ClassKey(Main)#16-

public class Main {

    public static void main(String[] args) {
        System.out.println("Start Flow");
        var items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9");
        var publisher = new SubmissionPublisher<>();
        publisher.subscribe(new MySubscriber<>());

        items.forEach(s -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        publisher.offer(
            s,
            2,
            TimeUnit.SECONDS,
            (subscriber, msg) -> {
              subscriber.onError(
                new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
                  .getSubscriberName() + "! You are too slow getting magazines" +
                  " and we don't have more space for them! " +
                  "I'll drop your magazine: " + msg));
              return false; // don't retry, we don't believe in second opportunities
        });
            System.out.println("published " + s);
        });

        System.out.println("Start End");
        
        while (publisher.estimateMaximumLag() > 0) {
            try {
                Thread.sleep(500000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        publisher.close();
    }
}