Skip to content

Latest commit

 

History

History
executable file
·
1105 lines (869 loc) · 28.8 KB

6.Design Pattern.md

File metadata and controls

executable file
·
1105 lines (869 loc) · 28.8 KB

6.设计模式

[toc]

设计模式相关教程挺多的:

https://geek-docs.com/design-pattern

https://www.runoob.com/design-pattern/design-pattern-tutorial.html

创建型设计模式

抽象工厂模式

提供一个创建一系列相关或相互依赖对象的接口,而无须指定他们具体的类

工厂方法模式

是简单工厂模式的进一步抽象和推广,是GoF设计模式的一种,由于使用了面向对象的多态性,工厂方法模式保持了简单工厂模式的优点,而且克服了它的缺点

建造者模式

将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示

原型模式

是一种创建型设计模式,使你能够复制已有的对象,而无需使代码依赖它们所属的类

单例模式

是一种创建型设计模式,它能让你保证一个类只有一个实例,并提供一个访问该实例的全局节点

「单例模式示例代码」

结构型设计模式

适配器模式

是一种结构型设计模式,它能使接口不兼容的对象能够互相合作

桥接模式

是一种结构型设计模式,可将一个大类或一系列紧密相关的类拆分为抽象和实现两个独立的层次结构,从而能在开发时分别使用

组合模式

是一种结构型设计模式,你可以使用它将对象组合成树状结构,并且能像使用独立对象一样使用他们

装饰者模式

是一种结构型设计模式,允许你通过将对象放入包含行为的特殊封装对象中来为原对象绑定新的行为

  • 几个角色:

    组件接口:

    组件接口是装饰者和被装饰者的超类或者接口,它定义了被装饰者的核心功能和装饰者需要加强的功能点

    具体组件:

    具体组件实现了组件接口的核心方法,完成某一个具体的业务逻辑,它也是被装饰的对象

    装饰者:

    实现组件接口,并持有一个具体的被装饰者对象

    具体装饰者:

    具体实现装饰的业务逻辑,即实现了被分离的各个增强功能点。各个具体装饰者是可以相互叠加的,从而可以构成一个功能更强大的组件对象

「装饰者模式示例代码」

外观模式

是一种结构型设计模式,能为程序库、框架或其他复杂类提供一个简单的接口

享元模式

是一种结构型设计模式,它摈弃了在每个对象中保存所有数据的方式,通过共享多个对象所共有的相同状态,让你能在有限的内存容量中载入更多对象

  • 几个角色:

    享元工厂:

    用于创建具体享元类,维护相同的享元对象。它保证相同的享元对象可以被系统共享。其内部使用了类似单例模式的方式,当请求对象已经存在时,直接返回对象,不存在时,再创建对象 抽象享元:

    定义需要共享的对象的业务接口。享元类被创建出来总是为了实现某些特定的业务逻辑,而抽象享元便定义这些逻辑的语义行为

    具体享元类:

    实现抽象享元类,完成某一具体逻辑

    调用类

    通过享元工厂取得享元对象

「享元模式示例代码」

代理模式

是一种结构型设计模式,让你能够提供对象的替代品或其占位符,代理控制着对于原对象的访问,并允许在将请求提交给对象前后进行一些处理

「代理模式示例代码」

行为型设计模式

责任链模式

是一种行为型设计模式,允许你将请求沿着处理者链进行发送。收到请求后,每个处理者均可对请求进行处理,或将其传递给链上的下一个处理者

命令模式

是一种行为设计模式,它可将请求转换为一个包含在请求相关的所有信息的独立对象

解释器模式

给定一种语言,定义它的文法表示,并定义一个解释器,该解释器根据文法表示来解释语言中的句子

迭代器模式

是一种行为设计模式,让你能在不暴露集合低层表现形式(列表、栈、树等)的情况下遍历集合中的所有元素

中介者模式

是一种行为设计模式,能让你减少对象之间混乱无序的依赖关系,该模式会限制对象之间的直接交互,迫使它们通过一个中介对象进行合作

备忘录模式

是一种行为设计模式,允许在不暴露对象实现细节的情况下保存和恢复对象之间的状态

观察者模式

是一种行为设计模式,允许你定义一种订阅机制,可在对象事件发生时通知多个“观察”该对象的其他对象

  • 几个角色

    主题接口:

    指被观察的对象。当其状态发生改变或者某事件发生时,它会将这个变化通知观察者。它维护了观察者所需要依赖的状态。

    具体主题:

    实现了主题接口中的方法。如新增观察者,删除观察者,通知观察者。其内部维护一个观察者列表

    观察者接口:

    定义了观察者的基本方法。当依赖状态发生改变时,主题接口就会调用观察者的update()方法

    具体观察者:

    实现了观察者接口的update()方法,具体处理当被观察者状态改变或者某一个事件发生时的业务逻辑

观察者模式示例代码

状态模式

是一种行为设计模式,让你能在一个对象的内部状态变化时改变其行为,使其看上去就像改变了自身所属类一样

策略模式

是一种行为设计模式,能让你定义一系列算法,并将每种算法分别放入独立的类中,以使算法的对象能够互相替换

模版方法模式

是一种行为设计模式,它在超类中定义一个算法框架,允许子类在不修改结构的情况下重写算法的特定步骤

访问者模式

一种行为型设计模式,能将操作与其所作用的对象隔离开

单例模式代码

//1. 饿汉模式
public class InstanceFactory {
    
    // 利用静态变量来记录唯一实例,直接初始化静态变量
    private static final Single instance = new Single();
    
    //构造器私有化
    private InstanceFactory() {}
    
    public static Single getInstance() {
        return this.instance;
    }
}

//2. 懒汉模式,懒加载,双重校验锁
public class InstanceFactory {
    
    //volatile防止重排序
    private volatile static Single instance;
    
    private static final Object object = new Object();
    
    //构造器私有化
    private InstanceFactory(){}
    
    public static Single getInstance() {
        if(instance == null) {
            synchronized(object) {
                if (instance == null) {
                    instance = new Single();
                }
            }
        }
        return instance;
    }
}

//3. 静态内部类,懒加载
public class InstanceFactory {
    private static class InnerClass {
        private static final Single INSTANCE = new Single();
    }
    
    private InstanceFacotry() {}
    
    public static final Single getInstance() {
        return InnerClass.INSTANCE;
    } 
}

//以上方法如果在考虑反射的情况下,依然是能够创建不同的实例的

//4. 枚举实现,最佳单例实现,懒加载
public enum InstanceFactory {
    INSTANCE;
    public InstanceFactory getInstance() {
        return INSTANCE;
    }
}
//枚举实现单例demo
public class User {
    //私有化构造函数
    private User(){ }
 
    //定义一个静态枚举类
    static enum SingletonEnum{
        //创建一个枚举对象,该对象天生为单例
        INSTANCE;
        private User user;
        //私有化枚举的构造函数
        private SingletonEnum(){
            user=new User();
        }
        public User getInstnce(){
            return user;
        }
    }
 
    //对外暴露一个获取User对象的静态方法
    public static User getInstance(){
        return SingletonEnum.INSTANCE.getInstnce();
    }
}

public class Test {
    public static void main(String [] args){
        System.out.println(User.getInstance());
        System.out.println(User.getInstance());
        System.out.println(User.getInstance()==User.getInstance());
    }
}
//结果为true

代理模式示例代码

//定义一个数据库操作的接口
public interface IDBQuery {
    String request();
}

静态代理

/**
 * 重量级对象,假设构建会比较慢
 * @author jujun chen
 * @date 2020/07/25
 */
public class DBQuery implements IDBQuery {

    public DBQuery() {
        try {
            //假设包含耗时操作
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String request() {
        return "request string";
    }
}

/**
 * 轻量级代理对象,只有在需要的时候才会创建真实对象
 * @author jujun chen
 * @date 2020/07/25
 */
public class DBQueryProxy implements IDBQuery{

    private DBQuery real;

    @Override
    public String request() {
        if (real == null)
            real = new DBQuery();

        return real.request();
    }
}

//静态代理测试
public class StaticProxyTest {

    public static void main(String[] args) {
        IDBQuery query = new DBQueryProxy();
        query.request();
    }
}

JDK代理

//实现方法处理类
public class JdkDbQueryHandler implements InvocationHandler {

    IDBQuery real = null;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (real == null)
            real = new DBQuery();
        return real.request();
    }
}

//创建JDK动态代理
public static IDBQuery createJdkProxy() {
    IDBQuery jdkProxy = (IDBQuery) Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(),
                                                          new Class[]{IDBQuery.class}, new JdkDbQueryHandler());
    return jdkProxy;
}

**CGLIB代理

public class CglibDbQueryInterceptor implements MethodInterceptor {

    IDBQuery real = null;

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
        if (real == null)
            real = new DBQuery();
        return real.request();
    }
}


//创建方法
//cglib创建动态代理
public static IDBQuery createCglibProxy() {
    Enhancer enhancer = new Enhancer();
    enhancer.setCallback(new CglibDbQueryInterceptor());
    enhancer.setInterfaces(new Class[]{IDBQuery.class});
    IDBQuery cglibProxy = (IDBQuery) enhancer.create();
    return cglibProxy;
}

Javassist代理

工厂方式创建动态代理

public class JavassistDynDbQueryHandler implements MethodHandler {

    IDBQuery real = null;

    @Override
    public Object invoke(Object o, Method method, Method method1, Object[] objects) throws Throwable {
        if (real == null)
            real = new DBQuery();
        return real.request();
    }
}


//创建方法
//Javassist创建动态代理,工厂方式创建
public static IDBQuery createJavassistDynProxy() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
    ProxyFactory proxyFactory = new ProxyFactory();
    //指定接口
    proxyFactory.setInterfaces(new Class[]{IDBQuery.class});
    Class proxyClass = proxyFactory.createClass();
    IDBQuery javassistProxy = (IDBQuery) proxyClass.getDeclaredConstructor().newInstance();
    //设置Handler处理器
    ((ProxyObject)javassistProxy).setHandler(new JavassistDynDbQueryHandler());
    return javassistProxy;
}

动态代码方式创建代理

//Javassist使用动态Java代码创建
public static IDBQuery createJavassistBytecodeDynamicProxy() throws NotFoundException, CannotCompileException,
        NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
    ClassPool mPool = new ClassPool(true);
    //定义类名
    CtClass mCtc = mPool.makeClass(IDBQuery.class.getName() + "JavassistBytecodeProxy");
    //需要实现的接口
    mCtc.addInterface(mPool.get(IDBQuery.class.getName()));
    //添加构造函数
    mCtc.addConstructor(CtNewConstructor.defaultConstructor(mCtc));
    //添加类的字段信息,使用动态java代码
    mCtc.addField(CtField.make("public " + IDBQuery.class.getName() + " real;", mCtc));
    String dbqueryname = DBQuery.class.getName();
    //添加方法,这里使用动态Java代码指定内部逻辑
    mCtc.addMethod(CtNewMethod.make("public String request() {if(real==null)real=new " + dbqueryname + "();return real.request();}", mCtc));
    //基于以上信息,生成动态类
    Class pc = mCtc.toClass();
    //生成动态类的实例
    IDBQuery bytecodeProxy = (IDBQuery) pc.getDeclaredConstructor().newInstance();
    return bytecodeProxy;
}

动态代理性能比较

//几种创建方法性能比较
//创建时间:CGLIB > Javassist动态代码创建 > Javassist工厂类创建 > Jdk
//调用时间:Javassist工厂类创建 > Jdk > CGLIB > Javassist动态代码创建
public static final int CIRCLE = 3000000;
public static void main(String[] args) throws InvocationTargetException, NoSuchMethodException,
InstantiationException, IllegalAccessException, NotFoundException, CannotCompileException {
    IDBQuery d = null;
    long begin = System.currentTimeMillis();
    d = createJdkProxy(); //测试JDK动态代理
    System.out.println("createJdkProxy:" + (System.currentTimeMillis() - begin));
    System.out.println("JdkProxy class:" + d.getClass().getName());
    begin = System.currentTimeMillis();
    for (int i = 0; i < CIRCLE; i++) {
        d.request();
    }
    System.out.println("callJdkProxy:" + (System.currentTimeMillis() - begin));

    //测试CGLIB动态代理
    begin = System.currentTimeMillis();
    d = createCglibProxy();
    System.out.println("createCGlibProxy:" + (System.currentTimeMillis() - begin));
    System.out.println("CglibProxy class:" + d.getClass().getName());
    begin=System.currentTimeMillis();
    for (int i = 0; i < CIRCLE; i++) {
        d.request();
    }
    System.out.println("callCglibProxy:" + (System.currentTimeMillis()-begin));


    //测试Javassist动态代理
    begin = System.currentTimeMillis();
    d = createJavassistDynProxy();
    System.out.println("createJavassistDynProxy:" + (System.currentTimeMillis() - begin));
    System.out.println("JavassistDynProxy class:" + d.getClass().getName());
    begin = System.currentTimeMillis();
    for (int i = 0; i < CIRCLE; i++) {
        d.request();
    }
    System.out.println("callJavassistDynProxy:" + (System.currentTimeMillis() - begin));

    //测试Javassist动态java代码创建
    begin = System.currentTimeMillis();
    d = createJavassistBytecodeDynamicProxy();
    System.out.println("createJavassistBytecodeDynamicProxy:" + (System.currentTimeMillis() - begin));
    System.out.println("JavassistBytecodeDynamicProxy class:" + d.getClass().getName());
    begin = System.currentTimeMillis();
    for (int i = 0; i < CIRCLE; i++) {
        d.request();
    }
    System.out.println("callJavassistBytecodeDynamicProxy:" + (System.currentTimeMillis() - begin));
}

享元模式示例代码

/**
 * 享元接口,创建报表
 * @author jujun chen
 * @date 2020/07/25
 */
public interface IReportManager {
    public String createReport();
}
/**
 * 具体享元类,可以继承抽象享元,也可以直接实现,财务报表
 * @author jujun chen
 * @date 2020/07/25
 */
public class FinancialReportManager implements IReportManager {

    protected String tenantId = null;

    public FinancialReportManager(String tenantId) {
        this.tenantId = tenantId;
    }

    @Override
    public String createReport() {
        return "This is a financial report";
    }
}
/**
 * 具体享元类,可以继承抽象享元,也可以直接实现,员工报表
 * @author jujun chen
 * @date 2020/07/25
 */
public class EmployeeReportManager implements IReportManager {
    protected String tenantId = null;

    public EmployeeReportManager(String tenantId) {
        this.tenantId = tenantId;
    }

    @Override
    public String createReport() {
        return "This is a employee report";
    }
}
/**
 * 享元工厂类,报表工厂类
 * @author jujun chen
 * @date 2020/07/25
 */
public class ReportManagerFactory {

    Map<String, IReportManager> financialReportManager = new HashMap<>();
    Map<String, IReportManager> employeeReportManager = new HashMap<>();

    //获取财务报表管理类
    public IReportManager getFinancialReportManager(String tenantId) {
        IReportManager r = financialReportManager.get(tenantId);
        if (r == null) {
            r = new FinancialReportManager(tenantId);
            financialReportManager.put(tenantId, r);
        }
        return r;
    }

    //获取员工报表管理类
    public IReportManager getEmployeeReportReportManager(String tenantId) {
        IReportManager r = employeeReportManager.get(tenantId);
        if (r == null) {
            r = new EmployeeReportManager(tenantId);
            employeeReportManager.put(tenantId, r);
        }
        return r;
    }
}
/**
 * 具体实现类
 * @author jujun chen
 * @date 2020/07/25
 */
public class FlyWeightTest {

    public static void main(String[] args) {
        ReportManagerFactory rmf = new ReportManagerFactory();
        IReportManager rm = rmf.getFinancialReportManager("A");
        System.out.println(rm.createReport());
    }
}

装饰者模式示例代码

示例:现在需要将某一个结果通过HTML发布,那么首先需要将内容转化为一个HTML文本,同时由于需要在网络上传输,还需要为其增加HTTP头,我现在暂时先增加这两个部分。

image-20200726102641970

public interface IPacketCreator {
    /**
     * 用于处理内容
     * @return
     */
    public String handleContent();
}
/**
 * 处理HTTP主体内容
 * @author jujun chen
 * @date 2020/07/26
 */
public class PacketBodyCreator implements IPacketCreator {

    @Override
    public String handleContent() {
        //构造核心数据,但不包括格式
        return "Content of Packet";
    }
}
/**
 * 维护核心组件component,负责告诉子类,
 * 其核心业务逻辑应该全权委托component完成,自己仅做增强处理
 *
 * @author jujun chen
 * @date 2020/07/26
 */
public abstract class PacketDecorator implements IPacketCreator {
    IPacketCreator component;

    public PacketDecorator(IPacketCreator c) {
        component = c;
    }
}
/**
 * 处理HTML的具体装饰器
 * @author jujun chen
 * @date 2020/07/26
 */
public class PacketHTMLHeaderCreator extends PacketDecorator {

    public PacketHTMLHeaderCreator(IPacketCreator c) {
        super(c);
    }

    /**
     * 用于处理内容
     *
     * @return
     */
    @Override
    public String handleContent() {
        StringBuffer sb = new StringBuffer();
        sb.append("<html>");
        sb.append("<body>");
        sb.append(component.handleContent());
        sb.append("</body>");
        sb.append("</html>\n");
        return sb.toString();
    }
}
/**
 * 完成数据包HTTP头部处理
 * @author jujun chen
 * @date 2020/07/26
 */
public class PacketHTTPHeaderCreator extends PacketDecorator {

    public PacketHTTPHeaderCreator(IPacketCreator c) {
        super(c);
    }

    /**
     * 用于处理内容
     *
     * @return
     */
    @Override
    public String handleContent() {
        StringBuffer sb = new StringBuffer();
        sb.append("Cache-Control:no-cache\n");
        sb.append("Date:Mon,31Dec202004:25:58GMT\n");
        sb.append(component.handleContent());
        return sb.toString();
    }
}
/**
 * 通过层层构造,将装饰者组装到一起,增强被装饰者
 * @author jujun chen
 * @date 2020/07/26
 */
public class DecoratorTest {
    public static void main(String[] args) {
        IPacketCreator pc = new PacketHTTPHeaderCreator((new PacketHTMLHeaderCreator(new PacketBodyCreator())));
        System.out.println(pc.handleContent());
    }
}

观察者模式示例代码

使用自定义

//主题接口
public interface ISubject {
    //添加观察者
    void attach(IObserver observer);

    //删除观察者
    void detach(IObserver observer);

    //通知所有观察者
    void inform();
}
/**
 * 观察者接口
 * @author jujun chen
 * @date 2020/07/26
 */
public interface IObserver {

    void update();
}
/**
 * 被观察者
 * @author jujun chen
 * @date 2020/07/26
 */
public class ConcreteSubject implements ISubject{

    CopyOnWriteArrayList<IObserver> observers = new CopyOnWriteArrayList<>();

    private int state;

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
        inform();
    }

    @Override
    public void attach(IObserver observer) {
        observers.add(observer);
    }

    @Override
    public void detach(IObserver observer) {
        observers.remove(observer);
    }

    @Override
    public void inform() {
        for (IObserver observer : observers) {
            observer.update();
        }
    }
}
/**
 * 具体观察者
 * @author jujun chen
 * @date 2020/07/26
 */
public class ConcreteObserver implements IObserver {

    @Override
    public void update() {
        System.out.println("observer receives information");
    }
}
public class ObserverTest {

    public static void main(String[] args) {
        ConcreteSubject subject = new ConcreteSubject();
        subject.attach(new ConcreteObserver());

        //改变被观察者的状态
        subject.setState(1);

    }
}

JDK自带

java9中已经过时

替代方法:使用PropertyChangeEvent,PropertyChangeListener

Flow类

/**
 * 观察者,实现Observer接口
 * @author jujun chen
 * @date 2020/07/26
 */
public class ConcreteObserver implements Observer {

    @Override
    public void update(Observable o, Object arg) {
        if (o instanceof ConcreteSubject) {
            System.out.println("observer recevice info: " + arg.toString());
        }
    }
}
/**
 * 被观察者,继承Observable
 * @author jujun chen
 * @date 2020/07/26
 */
public class ConcreteSubject extends Observable {

    public void changeState(int state) {
        this.setChanged();
        this.notifyObservers(state);
    }
}
/**
 * 使用JDK自带观察者
 * @author jujun chen
 * @date 2020/07/26
 */
public class ObserverTest {

    public static void main(String[] args) {
        ConcreteSubject subject = new ConcreteSubject();
        //通知顺序跟注册顺序相反
        subject.addObserver(new ConcreteObserver());
        subject.changeState(1);
    }
}

使用PropertyChangeListener 和 PropertyChangeSupport

/**
 * 观察者
 * @author jujun chen
 * @date 2020/07/26
 */
public class DeviceChangeListener implements PropertyChangeListener {

    @Override
    public void propertyChange(PropertyChangeEvent evt) {
        System.out.println(evt);
    }
}
/**
 * 被观察对象
 * @author jujun chen
 * @date 2020/07/26
 */
public class DeviceInfo {
    private final PropertyChangeSupport changeSupport = new PropertyChangeSupport(this);

    private int state;

    //增加监听
    public void addPropertyChangeListener(PropertyChangeListener listener) {
        changeSupport.addPropertyChangeListener(listener);
    }

    //移除监听
    public void removePropertyChangeListener(PropertyChangeListener listener) {
        changeSupport.removePropertyChangeListener(listener);
    }

    public int getState() {
        return state;
    }

    public void setState(int newState) {
        int oldState = this.state;
        this.state = newState;
        changeSupport.firePropertyChange("state", oldState, state);
    }
}
public class DeviceTest {

    public static void main(String[] args) {
        DeviceInfo d = new DeviceInfo();
        d.addPropertyChangeListener(new DeviceChangeListener());
        d.setState(1);
    }
}

使用FLow

Java9新增,详细分析见源码分析

package design.design.flow;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
 * 生产者
 * @author jujun chen
 * @date 2020/07/27
 */
class OneShotPublisher implements Flow.Publisher<Boolean> {
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
    private boolean subscribed; // true after first subscribe

    @Override
    public synchronized void subscribe(Flow.Subscriber<? super Boolean> subscriber) {
        if (subscribed)
            subscriber.onError(new IllegalStateException()); // only one allowed
        else {
            subscribed = true;
            //调用消费者
            subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
        }
    }

    static class OneShotSubscription implements Flow.Subscription {
        private final Flow.Subscriber<? super Boolean> subscriber;
        private final ExecutorService executor;
        private Future<?> future; // to allow cancellation
        private boolean completed;
        OneShotSubscription(Flow.Subscriber<? super Boolean> subscriber,
                            ExecutorService executor) {
            this.subscriber = subscriber;
            this.executor = executor;
        }
        @Override
        public synchronized void request(long n) {
            if (!completed) {
                completed = true;
                if (n <= 0) {
                    IllegalArgumentException ex = new IllegalArgumentException();
                    executor.execute(() -> subscriber.onError(ex));
                } else {
                    future = executor.submit(() -> {
                        //发送数据
                        subscriber.onNext(Boolean.TRUE);
                        subscriber.onComplete();
                    });
                }
            }
        }
        //取消数据发送
        @Override
        public synchronized void cancel() {
            completed = true;
            if (future != null) future.cancel(false);
        }
    }
}
package design.design.flow;

import java.util.concurrent.Flow;
import java.util.function.Consumer;

/**
 * 消费者
 * @author jujun chen
 * @date 2020/07/27
 */
class SampleSubscriber<T> implements Flow.Subscriber<T> {
    final Consumer<? super T> consumer;
    Flow.Subscription subscription;
    final long bufferSize;
    long count;

    SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
        this.bufferSize = bufferSize;
        this.consumer = consumer;
    }


    /**
     * Publisher在被指定一个新的Subscriber时调用此方法。
     * 一般来说你需要在subscriber内部保存这个subscription实例,因为后面会需要通过她向publisher
     * 发送信号来完成:请求更多数据,或者取消订阅。
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        long initialRequestSize = bufferSize;
        count = bufferSize - bufferSize / 2; // re-request when half consumed
        (this.subscription = subscription).request(initialRequestSize);
    }

    /**
     * 每当新的数据产生,这个方法会被调用
     * @param item
     */
    @Override
    public void onNext(T item) {
        if (--count <= 0)
            subscription.request(count = bufferSize - bufferSize / 2);
        consumer.accept(item);
    }

    /**
     * 当publisher出现异常时会调用subscriber的这个方法
     * @param ex
     */
    @Override
    public void onError(Throwable ex) { ex.printStackTrace(); }

    /**
     * 当publisher数据推送完毕时会调用此方法,于是整个订阅过程结束。
     */
    @Override
    public void onComplete() {
        System.out.println("has finished!");
    }
}
package design.design.flow;

/**
 * @author jujun chen
 * @date 2020/07/27
 */
public class FlowTest {

    public static void main(String[] args) {
        OneShotPublisher oneShotPublisher = new OneShotPublisher();
        oneShotPublisher.subscribe(new SampleSubscriber<>(Integer.MAX_VALUE,System.out::println));
    }
}

使用SubmissionPublisher

@Test
public void test1() {
    SubmissionPublisher<Integer> submissionPublisher = new SubmissionPublisher<>();
    submissionPublisher.consume(System.out::println);
    submissionPublisher.submit(1);
    submissionPublisher.submit(2);
    submissionPublisher.offer(3, (x, y) -> {
        System.out.println("xxx");
        return false;
    });
}