@@ -82,131 +82,202 @@ enum FlowControllerError: Error {
case compositeActionError(erroredAction: RxActionType, error: Error)
}

/**
This class is responsible for storing state and dispatching new actions.
You initialize flow controller with initial state and reducer.
*/
public class RxDataFlowController<State: RxStateType> {
/**
Observable sequence that emits new state changes
*/
public var state: Observable<(setBy: RxActionType, state: State)> {
return currentStateSubject.asObservable().startWith(currentState).observeOn(scheduler)
struct Queue<T> {
var array = [T?]()
var head = 0

let currentItemSubject = PublishSubject<T>()

var isEmpty: Bool {
return count == 0
}

/**
Returns current state
*/
public private(set) var currentState: (setBy: RxActionType, state: State) {
didSet { currentStateSubject.onNext(currentState) }
}
var count: Int {
return array.count - head
}

mutating func enqueue(_ element: T) {
array.append(element)
if count == 1 {
self.currentItemSubject.onNext(element)
}
}

mutating func dequeue() -> T? {
guard head < array.count, let element = array[head] else { return nil }

array[head] = nil
head += 1

let percentage = Double(head)/Double(array.count)
if array.count > 50 && percentage > 0.25 {
array.removeFirst(head)
head = 0
}

if let current = peek() {
currentItemSubject.onNext(current)
}

return element
}

func peek() -> T? {
if isEmpty {
return nil
} else {
return array[head]
}
}
}

public class RxDataFlowController<State: RxStateType> {
public var state: Observable<(setBy: RxActionType, state: State)> { return currentStateSubject.asObservable().observeOn(scheduler) }
// public var currentState: (setBy: RxActionType, state: State) { return stateStack.peek()! }
/**
Observable sequence that emits errors
Returns current state
*/
public private(set) var currentState: (setBy: RxActionType, state: State) {
didSet { currentStateSubject.onNext(currentState) }
}
public var errors: Observable<(state: State, action: RxActionType, error: Error)> { return errorsSubject }

let bag = DisposeBag()
let reducer: RxReducer<State>
let scheduler: ImmediateSchedulerType

let actionsSubject: PublishSubject<RxActionType> = PublishSubject()

// let concurrentActionScheduler: ImmediateSchedulerType
var actionsQueue = Queue<RxActionType>()
// var isActionExecuting = BehaviorSubject(value: false)
// let currentStateSubject: BehaviorSubject<(setBy: RxActionType, state: State)>
let currentStateSubject = PublishSubject<(setBy: RxActionType, state: State)>()
let errorsSubject = PublishSubject<(state: State, action: RxActionType, error: Error)>()

/**
Initialized new instance of RxDataFlowController
- parameter reducer: Reducer-function that will be executed for produce a new state
- parameter initialState: Initial state instance
- dispatchAction: Action that will be dispatched immediately after initialization
*/
public convenience init(reducer: @escaping RxReducer<State>,
initialState: State,
dispatchAction: RxActionType? = nil) {
initialState: State,
dispatchAction: RxActionType? = nil) {
self.init(reducer: reducer,
initialState: initialState,
scheduler: SerialDispatchQueueScheduler(qos: .utility,
internalSerialQueueName: "com.RxDataFlowController.Scheduler"),
dispatchAction: dispatchAction)
initialState: initialState,
scheduler: SerialDispatchQueueScheduler(qos: .utility,
internalSerialQueueName: "com.RxDataFlowController.Scheduler"),
dispatchAction: dispatchAction)
}


// public convenience init(reducer: @escaping RxReducer<State>,
// initialState: State,
// dispatchAction: RxActionType? = nil) {
// self.init(reducer: reducer,
// initialState: initialState,
// serialActionScheduler: SerialDispatchQueueScheduler(qos: .utility, internalSerialQueueName: "com.RxDataFlowController.SerialActionScheduler"),
// concurrentActionScheduler: SerialDispatchQueueScheduler(qos: .utility, internalSerialQueueName: "com.RxDataFlowController.ConcurrentActionScheduler"),
// dispatchAction: dispatchAction)
// }
init(reducer: @escaping RxReducer<State>,
initialState: State,
scheduler: ImmediateSchedulerType,
dispatchAction: RxActionType? = nil) {
initialState: State,
scheduler: ImmediateSchedulerType,
// serialActionScheduler: ImmediateSchedulerType,
// concurrentActionScheduler: ImmediateSchedulerType,
dispatchAction: RxActionType? = nil) {
// self.serialActionScheduler = serialActionScheduler
// self.concurrentActionScheduler = concurrentActionScheduler
self.scheduler = scheduler
self.reducer = reducer

self.reducer = reducer//AnyRxReducer(reducer: reducer)
// stateStack = FixedStack(capacity: maxHistoryItems)
// stateStack.push((setBy: RxInitializationAction(), state: initialState))
// currentStateSubject = BehaviorSubject(value: (setBy: RxInitializationAction(), state: initialState))
currentState = (setBy: RxInitializationAction(), state: initialState)

actionsSubject
.map { [weak self] action -> Observable<Void> in return self?.observe(action: action) ?? .empty() }
.merge(maxConcurrent: 1)
.subscribe()
.disposed(by: bag)


subscribe()

if let dispatchAction = dispatchAction {
dispatch(dispatchAction)
}
}


// init(reducer: @escaping RxReducer<State>,
// initialState: State,
// scheduler: ImmediateSchedulerType,
// dispatchAction: RxActionType? = nil) {
// self.scheduler = scheduler
// self.reducer = reducer
//
// currentState = (setBy: RxInitializationAction(), state: initialState)
//
// actionsSubject
// .map { [weak self] action -> Observable<Void> in return self?.observe(action: action) ?? .empty() }
// .merge(maxConcurrent: 1)
// .subscribe()
// .disposed(by: bag)
//
// if let dispatchAction = dispatchAction {
// dispatch(dispatchAction)
// }
// }
private func propagate(error: Error, from action: RxActionType) {
if case FlowControllerError.compositeActionError(let data) = error {
errorsSubject.onNext((state: currentState.state, action: data.erroredAction, error: data.error))
} else {
errorsSubject.onNext((state: currentState.state, action: action, error: error))
}
}

private func scheduler(for action: RxActionType, owner: RxCompositeAction? = nil) -> ImmediateSchedulerType {
// let actionScheduler = action.scheduler ?? owner?.scheduler
// return actionScheduler ?? (action.isSerial ? serialActionScheduler : concurrentActionScheduler)
let actionScheduler = action.scheduler ?? owner?.scheduler
return actionScheduler ?? scheduler
}

private func descriptor(for action: RxActionType, owner: RxCompositeAction? = nil)
-> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {

private func descriptor(for action: RxActionType, owner: RxCompositeAction? = nil) -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
let schedulerForAction = scheduler(for: action, owner: owner)
let object = self
return Observable<RxActionType>.from([action], scheduler: schedulerForAction)
.flatMap { [weak self] act -> Observable<RxStateMutator<State>> in
return self == nil ? .empty() : self!.reducer(act, self!.currentState.state).subscribeOn(schedulerForAction)
}
.flatMap { act in object.reducer(act, object.currentState.state).subscribeOn(schedulerForAction) }
.observeOn(schedulerForAction)
.flatMap { result -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> in
return .just((setBy: action, mutator: result))
}
.flatMap { result -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> in return .just((setBy: action, mutator: result)) }
}

private func setNewState(mutator: RxStateMutator<State>, action: RxActionType) {
currentState = (setBy: action, mutator(currentState.state))
private func subscribe() {
// currentStateSubject.skip(1).subscribe(onNext: { [weak self] newState in self?.stateStack.push(newState) }).disposed(by: bag)
actionsQueue.currentItemSubject.observeOn(scheduler)
.flatMap { [weak self] action -> Observable<Void> in
return self?.observe(action: action) ?? .empty()
}.subscribe().disposed(by: bag)
}

private func dispatchFallbackAction(for action: RxActionType) {
guard let action = (action as? RxCompositeAction)?.fallbackAction else { return }
dispatch(action)
private func mutateState(with mutator: RxStateMutator<State>) -> State {
return mutator(currentState.state)
}

private func schedule(actionDescriptor: Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)>,
for action: RxActionType)

private func schedule(actionDescriptor: Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)>, for action: RxActionType)
-> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
guard !action.isSerial else { return actionDescriptor.observeOn(scheduler) }

return Observable.create { [weak self] observer in
guard let object = self else { return Disposables.create() }
actionDescriptor
.observeOn(object.scheduler)
.do(onNext: { [weak self] next in self?.setNewState(mutator: next.mutator, action: next.setBy) },
onError: { [weak self] in self?.propagate(error: $0, from: action) })
.subscribeOn(action.scheduler ?? object.scheduler)
.subscribe()
.disposed(by: object.bag)

observer.onCompleted()

return Disposables.create()
}

actionDescriptor
.observeOn(scheduler)
.do(onNext: { [weak self] next in
guard let newState = self?.mutateState(with: next.mutator) else { return }
self?.currentStateSubject.onNext((setBy: next.setBy, state: newState))
},
onError: { [weak self] in self?.propagate(error: $0, from: action) })
.subscribeOn(action.scheduler ?? scheduler)
.subscribe()
.disposed(by: bag)

return .empty()
}

private func observe(action: RxActionType) -> Observable<Void> {
let descriptor: Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> = { [weak self] in
guard let object = self else { return .empty() }
@@ -216,53 +287,247 @@ public class RxDataFlowController<State: RxStateType> {
}

return object.observe(compositeAction: compositeAction)
}()
}()

return schedule(actionDescriptor: descriptor, for: action)
.observeOn(scheduler)
.do(onNext: { [weak self] next in self?.setNewState(mutator: next.mutator, action: next.setBy) },
onError: { [weak self] in self?.propagate(error: $0, from: action); self?.dispatchFallbackAction(for: action) })
.do(onNext: { [weak self] next in
guard let newState = self?.mutateState(with: next.mutator) else { return }
self?.currentStateSubject.onNext((setBy: next.setBy, state: newState))
},
onError: { [weak self] in
self?.propagate(error: $0, from: action)
if let fallback = (action as? RxCompositeAction)?.fallbackAction {
self?.dispatch(fallback)
}
},
onDispose: { [weak self] in _ = self?.actionsQueue.dequeue() })
.flatMap { _ in return Observable<Void>.just(()) }
.catchError { _ in .just(()) }
}

private func observe(compositeAction: RxCompositeAction)
-> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
private func observe(compositeAction: RxCompositeAction) -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
return Observable.create { [weak self] observer in
guard let object = self else { return Disposables.create() }

let scheduledActions = compositeAction.actions.map { action in
object
.schedule(actionDescriptor: object.descriptor(for: action, owner: compositeAction), for: action)
.catchError { .error(FlowControllerError.compositeActionError(erroredAction: action, error: $0)) }
}

let disposable = Observable
.from(scheduledActions)
.observeOn(object.scheduler)
.merge(maxConcurrent: 1)
.observeOn(object.scheduler)
.do(onNext: { [weak self] next in self?.setNewState(mutator: next.mutator, action: next.setBy) },
onError: { observer.onError($0) },
onDispose: { observer.onCompleted() })
.subscribe()

var compositeQueue = Queue<RxActionType>()

let disposable = compositeQueue.currentItemSubject.observeOn(object.scheduler).flatMap { action -> Observable<RxStateType> in
return Observable.create { _ in
let descriptor = object.descriptor(for: action, owner: compositeAction).catchError { error -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> in
return .error(FlowControllerError.compositeActionError(erroredAction: action, error: error))
}
let subscription = object.schedule(actionDescriptor: descriptor, for: action)
.observeOn(object.scheduler)
.do(onNext: { observer.onNext((setBy: $0.setBy, mutator: $0.mutator)) },
onError: { observer.onError($0) },
onCompleted: { _ = compositeQueue.dequeue() },
onDispose: { if compositeQueue.count == 0 { observer.onCompleted() } })
.subscribe()
return Disposables.create { subscription.dispose() }
}
}.subscribe()

for a in compositeAction.actions { compositeQueue.enqueue(a) }

return Disposables.create {
disposable.dispose()
}
}
}

/**
Dispatches an action.
Example of dispatching an action:
```
let data = ...
controller.dispatch(DataAction.updateData(data))
```
- parameter action: The action that is being dispatched by controller
*/
public func dispatch(_ action: RxActionType) {
actionsSubject.onNext(action)
scheduler.schedule((action, self)) { params in
return Observable<Void>.create { observer in
params.1.actionsQueue.enqueue(params.0)
return Disposables.create()
}.subscribe()
}.disposed(by: bag)
}
}

///**
//This class is responsible for storing state and dispatching new actions.
//You initialize flow controller with initial state and reducer.
//*/
//public class RxDataFlowController<State: RxStateType> {
// /**
// Observable sequence that emits new state changes
// */
// public var state: Observable<(setBy: RxActionType, state: State)> {
// return currentStateSubject.asObservable().startWith(currentState).observeOn(scheduler)
// }
//
// /**
// Returns current state
// */
// public private(set) var currentState: (setBy: RxActionType, state: State) {
// didSet { currentStateSubject.onNext(currentState) }
// }
//
// /**
// Observable sequence that emits errors
// */
// public var errors: Observable<(state: State, action: RxActionType, error: Error)> { return errorsSubject }
//
// let bag = DisposeBag()
// let reducer: RxReducer<State>
// let scheduler: ImmediateSchedulerType
//
// let actionsSubject: PublishSubject<RxActionType> = PublishSubject()
//
// let currentStateSubject = PublishSubject<(setBy: RxActionType, state: State)>()
// let errorsSubject = PublishSubject<(state: State, action: RxActionType, error: Error)>()
//
// /**
// Initialized new instance of RxDataFlowController
// - parameter reducer: Reducer-function that will be executed for produce a new state
// - parameter initialState: Initial state instance
// - dispatchAction: Action that will be dispatched immediately after initialization
// */
// public convenience init(reducer: @escaping RxReducer<State>,
// initialState: State,
// dispatchAction: RxActionType? = nil) {
// self.init(reducer: reducer,
// initialState: initialState,
// scheduler: SerialDispatchQueueScheduler(qos: .utility,
// internalSerialQueueName: "com.RxDataFlowController.Scheduler"),
// dispatchAction: dispatchAction)
// }
//
// init(reducer: @escaping RxReducer<State>,
// initialState: State,
// scheduler: ImmediateSchedulerType,
// dispatchAction: RxActionType? = nil) {
// self.scheduler = scheduler
// self.reducer = reducer
//
// currentState = (setBy: RxInitializationAction(), state: initialState)
//
// actionsSubject
// .map { [weak self] action -> Observable<Void> in return self?.observe(action: action) ?? .empty() }
// .merge(maxConcurrent: 1)
// .subscribe()
// .disposed(by: bag)
//
// if let dispatchAction = dispatchAction {
// dispatch(dispatchAction)
// }
// }
//
// private func propagate(error: Error, from action: RxActionType) {
// if case FlowControllerError.compositeActionError(let data) = error {
// errorsSubject.onNext((state: currentState.state, action: data.erroredAction, error: data.error))
// } else {
// errorsSubject.onNext((state: currentState.state, action: action, error: error))
// }
// }
//
// private func scheduler(for action: RxActionType, owner: RxCompositeAction? = nil) -> ImmediateSchedulerType {
// let actionScheduler = action.scheduler ?? owner?.scheduler
// return actionScheduler ?? scheduler
// }
//
// private func descriptor(for action: RxActionType, owner: RxCompositeAction? = nil)
// -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
// let schedulerForAction = scheduler(for: action, owner: owner)
// return Observable<RxActionType>.from([action], scheduler: schedulerForAction)
// .flatMap { [weak self] act -> Observable<RxStateMutator<State>> in
// return self == nil ? .empty() : self!.reducer(act, self!.currentState.state).subscribeOn(schedulerForAction)
// }
// .observeOn(schedulerForAction)
// .flatMap { result -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> in
// return .just((setBy: action, mutator: result))
// }
// }
//
// private func setNewState(mutator: RxStateMutator<State>, action: RxActionType) {
// currentState = (setBy: action, mutator(currentState.state))
// }
//
// private func dispatchFallbackAction(for action: RxActionType) {
// guard let action = (action as? RxCompositeAction)?.fallbackAction else { return }
// dispatch(action)
// }
//
// private func schedule(actionDescriptor: Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)>,
// for action: RxActionType)
// -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
// guard !action.isSerial else { return actionDescriptor.observeOn(scheduler) }
//
// return Observable.create { [weak self] observer in
// guard let object = self else { return Disposables.create() }
// actionDescriptor
// .observeOn(object.scheduler)
// .do(onNext: { [weak self] next in self?.setNewState(mutator: next.mutator, action: next.setBy) },
// onError: { [weak self] in self?.propagate(error: $0, from: action) })
// .subscribeOn(action.scheduler ?? object.scheduler)
// .subscribe()
// .disposed(by: object.bag)
//
// observer.onCompleted()
//
// return Disposables.create()
// }
// }
//
// private func observe(action: RxActionType) -> Observable<Void> {
// let descriptor: Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> = { [weak self] in
// guard let object = self else { return .empty() }
//
// guard let compositeAction = action as? RxCompositeAction else {
// return object.descriptor(for: action)
// }
//
// return object.observe(compositeAction: compositeAction)
// }()
//
// return schedule(actionDescriptor: descriptor, for: action)
// .observeOn(scheduler)
// .do(onNext: { [weak self] next in self?.setNewState(mutator: next.mutator, action: next.setBy) },
// onError: { [weak self] in self?.propagate(error: $0, from: action); self?.dispatchFallbackAction(for: action) })
// .flatMap { _ in return Observable<Void>.just(()) }
// .catchError { _ in .just(()) }
// }
//
// private func observe(compositeAction: RxCompositeAction)
// -> Observable<(setBy: RxActionType, mutator: RxStateMutator<State>)> {
// return Observable.create { [weak self] observer in
// guard let object = self else { return Disposables.create() }
//
// let scheduledActions = compositeAction.actions.map { action in
// object
// .schedule(actionDescriptor: object.descriptor(for: action, owner: compositeAction), for: action)
// .catchError { .error(FlowControllerError.compositeActionError(erroredAction: action, error: $0)) }
// }
//
// let disposable = Observable
// .from(scheduledActions)
// .observeOn(object.scheduler)
// .merge(maxConcurrent: 1)
// .observeOn(object.scheduler)
// .do(onNext: { [weak self] next in self?.setNewState(mutator: next.mutator, action: next.setBy) },
// onError: { observer.onError($0) },
// onDispose: { observer.onCompleted() })
// .subscribe()
//
// return Disposables.create {
// disposable.dispose()
// }
// }
// }
//
// /**
// Dispatches an action.
// Example of dispatching an action:
// ```
// let data = ...
// controller.dispatch(DataAction.updateData(data))
// ```
// - parameter action: The action that is being dispatched by controller
// */
// public func dispatch(_ action: RxActionType) {
// actionsSubject.onNext(action)
// }
//}
@@ -10,15 +10,46 @@ import XCTest
import RxSwift
@testable import RxDataFlow

enum TestEnum {
case some(DeinitObject)
}

class DeinitTests: XCTestCase {
func testDeinit() {
let input = PublishSubject<TestEnum>()
let output = PublishSubject<TestEnum>()

_ = input.asObservable()
.map { Observable.just($0) }
.merge(maxConcurrent: 1)
.observeOn(SerialDispatchQueueScheduler(qos: .utility))
.do(onNext: { v in print(v) })
.subscribe(onNext: { obj in print(obj) })

_ = output.observeOn(SerialDispatchQueueScheduler(qos: .utility)).do(onNext: { print("output: \($0)") }).subscribe()

(0...10).forEach { value in input.onNext(TestEnum.some(DeinitObject({ print("Object \(value) deinited") }))) }

sleep(3)
}
}

class RxDataFlowTests: XCTestCase {
func testObjectPassedToControllerDeinited() {
let store: TestFlowController! = TestFlowController(reducer: testStoreReducer,
initialState: TestState(text: "Initial value"))

let deinitExpectation = expectation(description: "Object should be deinited")

_ = store.state.subscribe(onNext: { print($0.setBy) })
store.dispatch(ChangeTextValueAction(newText: "New text 1"))
store.dispatch(EnumAction.deinitObject(DeinitObject({ deinitExpectation.fulfill() })))
store.dispatch(ChangeTextValueAction(newText: "New text 2"))
store.dispatch(ChangeTextValueAction(newText: "New text 2"))
store.dispatch(ChangeTextValueAction(newText: "New text 2"))
store.dispatch(ChangeTextValueAction(newText: "New text 2"))
store.dispatch(ChangeTextValueAction(newText: "New text 2"))
// d.dispose()
let deinitResult = XCTWaiter().wait(for: [deinitExpectation], timeout: 3)
XCTAssertEqual(deinitResult, .completed)