Skip to content

langhuihui/RxGo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

31 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RxGo 非官方实现版本

Build Status Go Report Card codecov

  • 代码精简、可读性强(尽可能用最少的代码实现)
  • 设计精妙、实现优雅(尽可能利用golang的语言特点和优势)
  • 可扩展性强(可自定义Observable以及Operator)
  • 占用系统资源低(尽一切可能减少创建goroutine和其他对象)
  • 性能强(尽一切可能减少计算量)

每一行代码都是深思熟虑……

已实现的功能

Observable

FromSlice FromChan Of Range Subject Timeout Interval Merge Concat Race CombineLatest Empty Never Throw

Operator

Do Take TakeWhile TakeUntil Skip SkipWhile SkipUntil IgnoreElements Share StartWith Zip Filter Distinct DistinctUntilChanged Debounce DebounceTime Throttle ThrottleTime First Last Count Max Min Reduce Map MapTo MergeMap MergeMapTo SwitchMap SwitchMapTo Scan Repeat PairWise Buffer

使用方法

链式调用方式

import (
    . "github.com/langhuihui/RxGo/rx"
)
func main(){
    err := Of(1, 2, 3, 4).Take(2).Subscribe(NextFunc(func(event *Event) {
        
    }))
}

管道模式

import (
    . "github.com/langhuihui/RxGo/rx"
    . "github.com/langhuihui/RxGo/pipe"
)
func main(){
    err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(NextFunc(func(event *Event) {
        
    }))
}

管道模式相比链式模式,具有操作符可扩展性,用户可以按照规则创建属于自己的操作符

type Operator func(Observable) Observable

操作符只需要返回Operator这个类型即可,例如 实现一个happy为false就立即完成的操作符

func MyOperator(happy bool) Operator {
	return func(source Observable) Observable {
		return func (sink *Observer) error {
            if happy{
                return source(sink)
            }
            return nil
		}
	}
}

创建自定义Observable

在任何时候,您都可以创建自定义的Observable,用来发送任何事件

import (
    . "github.com/langhuihui/RxGo/rx"
)
func MyObservable (sink *Control) error {
    sink.Next("hello")
    return nil
}
func main(){
    ob := Observable(MyObservable)
    ob.Subscribe(NextFunc(func(event *Event) {
        //event.Dispose()
    }))
    subscribtion:= ob.SubscribeAsync(NextFunc(func(event *Event) {}),func(err error){},func(){})
    //subscribtion.Dispose()
}

设计思想

基本知识

所谓Observable,就是一个可以被订阅,然后不断发送事件的事件源,见如下示意图

                                time -->

(*)-------------(o)--------------(o)---------------(x)----------------|>
 |               |                |                 |                 |
Start          value            value             error              Done

该示意图代表了,事件被订阅后(Start)开始不停发送事件的过程,直到发出error或者Done(完成)为止

有的Observable并不会发出完成事件,比如Never

参考网站: rxmarbles

总体方案

实现Rx的关键要素,是要问几个问题

  1. 如何定义Observable,?(一个结构体?一个函数?一个接口?一个Channel?)
  2. 如何实现订阅逻辑?(调用函数?发送数据?)
  3. 如何实现接受数据?(如何实现Observer?)
  4. 如何实现完成/错误的传递?
  5. 如何实现取消订阅?(难点:在事件响应中取消,以及在任何其他goroutine中取消订阅)
  6. 如何实现操作符
  7. 操作符如何处理连锁反应,比如后面描述的情况
  8. 如何实现链式和管道两种编程模式
  9. 如何让用户扩展(自定义)Observable操作符
  10. 如何向普通用户解释复杂的概念
  • 当用户需要订阅或者终止事件流,则进行链路传递,订阅或者终止所有中间过程中的事件源
Observable---------Operator----------Operator-----------Observer
             <|                <|                <|          
           订阅/取消          订阅/取消          订阅/取消         
  • 当事件流完成或者报错时,需要通知下游事件流的完成或者报错
Observable---------Operator----------Operator-----------Observer
             |>                |>                |>          
           完成/错误          完成/错误          完成/错误         

实际情况远比这个复杂,后面会进行分析

可观察对象(事件源)Observable

Observable 被定义成为一个函数,该函数含有一个类型为*Observer的参数。

type Observable func(*Observer) error

任何事件源都是这样的一个函数,当调用该函数即意味着订阅了该事件源,入参为一个Observer,具体功能见下面

如果该函数返回nil,即意味着事件流完成

否则意味着事件流异常

观察者对象Observer

type Observer struct {
    context.Context //组合继承方式,用于监听被取消订阅的事件
	cancel          context.CancelFunc //缓存取消函数
	next            NextHandler //缓存当前的NextHandler,后续可以被替换
}

该控制器为一个结构体,其中next记录了当前的NextHandler,

取消订阅的行为调用cancel函数,golang提供的context可以完美的适配这种场景。

Observer对象为Observable和事件处理逻辑共同持有,是二者沟通的桥梁 可以把Observer对象直接理解为context,只是多了用来发送数据的next函数。 把cancel函数缓存在结构体中不是业界推荐的做法,但是考虑到框架调用的便利性还是这么做了。

NextHandler

type Event struct {
    Data    interface{}
    Target  *Observer
}
NextHandler interface {
    OnNext(*Event)
}

NextHandler是一个接口,实现OnNext函数,当Observable数据推送到Observer中时,即调用了该函数

Target属性用于存储当前发送事件的Observer对象,有两大重要使命

  1. 更换NextHandler,用于减少数据传递过程
  2. 在NextHandler过程中终止事件流

这样做的好处是可以实现不同的观察者,比如函数或者channel

type(
    NextFunc func(*Event)
    NextChan chan *Event
)
func (next NextFunc) OnNext(event *Event) {
	next(event)
}
func (next NextChan) OnNext(event *Event) {
	next <- event
}

实现案例TakeUntil

//TakeUntil 一直获取事件直到unitl传来事件为止
func (ob Observable) TakeUntil(until Observable) Observable {
	return func(sink *Observer) error {
		ctx, cancel := context.WithCancel(sink)
		go until(&Observer{ctx, cancel, NextCancel(cancel)})
		return ob(&Observer{ctx, cancel, sink.next})
	}
}

TakeUnitl的用途是,传入一个until事件源,当这个until事件源接受到事件时,就会导致当前的事件源"完成”。相当于某种中断信号。

看似简短的代码,确考虑各种不同的情形

几大实现细节:

  1. 订阅until事件源,通过go关键字创建goroutine防止阻塞当前goroutine
  2. NextCancel包装的cancel函数,作用就是当until事件源发送任意事件时,调用cancel函数,达到取消订阅的目的
  3. 两个事件源订阅都复用了同一个派生的context,这样的话,如果下游取消订阅就能同时取消这两个事件源的订阅。
  4. 最后一步是订阅上游事件源,并将返回结果返回————上游Observable完成也意味着本Observable完成即完成信号的向下传播
  5. 任何情况取消订阅,或者上游事件源完成都可以使得事件源函数返回,接着TakeUntil函数也会返回,即意味着完成
  6. until事件源的完成或者错误,都将忽略,所以我们没有去获取until函数返回值

About

Reactive Extension for golang

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages