In [4]:
using Transducers

zip(1:3, 10:2:14) |> MapSplat(*) |> sum

76

In [5]:
xf_printer(label) = Map() do x
    println(label, ": ", x)
    return x  # just return it as-is
end

xf_printer (generic function with 1 method)

In [29]:
zip(1:3, 10:2:14) |>
xf_printer(" input") |>
MapSplat(*) |>
xf_printer("output") |>
sum 


 input: (1, 10)
output: 10
 input: (2, 12)
output: 24
 input: (3, 14)
output: 42


76

In [23]:
1:40 |> Partition(7) |> Filter(x -> prod(x) % 11 == 0) |> Cat() |> Scan(+) |> sum

4123

In [28]:
1:40 |> Partition(7)|> xf_printer(" input")  # |> sum


40-element UnitRange{Int64} [90m[1m|>[22m[39m
    [36mPartition[39m[33m([39m7, 7, false[33m)[39m [90m[1m|>[22m[39m
    [36mMap[39m[33m([39mMain.λ❓[33m)[39m

In [30]:
using OnlineStats: Mean


foldl(reducingfunction(Mean()), Map(x -> x^2), 1:4)

Mean: n=4 | value=7.5

In [48]:
# Stateless transducer
using Transducers
using Transducers: Transducer, R_, next, inner, xform

struct AddOneIfInt <: Transducer end

# 没有start(), 只有next()

function Transducers.next(rf::R_{AddOneIfInt}, result, input)
    if input isa Int
        next(inner(rf), result, input + 1)
    else
        result
    end
end

collect(AddOneIfInt(), 1)


1-element Array{Int64,1}:
 2

In [58]:
# Stateful transducer
using Transducers: start, complete, wrap, unwrap, wrapping
using Transducers: Transducer, R_, next, inner, xform
using Random


struct RandomRecall <: Transducer
    history::Int
    seed::Int
end

RandomRecall() = RandomRecall(3, 0)

# 初始化
function Transducers.start(rf::R_{RandomRecall}, result)
    buffer = []
    rng = MersenneTwister(xform(rf).seed)  # 随机数
    private_state = (buffer, rng)   # 私有
    return wrap(rf, private_state, start(inner(rf), result))  # 打包
end

# iter
function Transducers.next(rf::R_{RandomRecall}, result, input)
    wrapping(rf, result) do (buffer, rng), iresult   # 打包
        if length(buffer) < xform(rf).history
            push!(buffer, input)
            iinput = rand(rng, buffer)
        else
            i = rand(rng, 1:length(buffer))
            iinput = buffer[i]
            buffer[i] = input
        end
        iresult = next(inner(rf), iresult, iinput)
        return (buffer, rng), iresult
    end
end

# 完成
function Transducers.complete(rf::R_{RandomRecall}, result)
    _private_state, inner_result = unwrap(rf, result) # 解包
    return complete(inner(rf), inner_result)
end


collect(RandomRecall(), 1:5)
collect(RandomRecall(), 1:5)


5-element Array{Int64,1}:
 1
 1
 2
 1
 3

In [109]:
# Stateful transducer
using Transducers: start, complete, wrap, unwrap, wrapping
using Transducers: Transducer, R_, next, inner, xform
using Random


struct RandomRecall <: Transducer
    history::Int
    seed::Int
end

RandomRecall() = RandomRecall(3, 0)

# 初始化  state
function Transducers.start(rf::R_{RandomRecall}, result)
    buffer = 0
    private_state = buffer
    return wrap(rf, private_state, start(inner(rf), result))  # 打包
end

# iter
function Transducers.next(rf::R_{RandomRecall}, result, input)
    wrapping(rf, result) do buffer, iresult   # 打包
        buffer += input
        iinput = buffer
        
        iresult = next(inner(rf), iresult, iinput)
        return buffer, iresult
    end
end

# 完成
function Transducers.complete(rf::R_{RandomRecall}, result)
    _private_state, inner_result = unwrap(rf, result) # 解包
    return complete(inner(rf), inner_result)
end



collect(RandomRecall(), 1)
# collect(RandomRecall(), 1)

1-element Array{Int64,1}:
 1

In [114]:
collect(RandomRecall(), 1)

1-element Array{Int64,1}:
 1

In [None]:

1:100 |> Filter(isodd) |> RandomRecall() |> Filter(x -> x > 10) |> Take(5) |> collect



In [41]:
?next

search: [0m[1mn[22m[0m[1me[22m[0m[1mx[22m[0m[1mt[22mpow [0m[1mn[22m[0m[1me[22m[0m[1mx[22m[0m[1mt[22mind [0m[1mn[22m[0m[1me[22m[0m[1mx[22m[0m[1mt[22mprod [0m[1mn[22m[0m[1me[22m[0m[1mx[22m[0m[1mt[22mfloat I[0m[1mn[22m[0m[1me[22m[0m[1mx[22mac[0m[1mt[22mError I[0m[1mn[22md[0m[1me[22m[0m[1mx[22mS[0m[1mt[22myle IOCo[0m[1mn[22mt[0m[1me[22m[0m[1mx[22m[0m[1mt[22m



```
Transducers.next(rf::R_{X}, state, input)
```

This is the only required interface.  It takes the following form (if `start` is not defined):

```julia
next(rf::R_{X}, result, input) =
    # code calling next(inner(rf), result, possibly_modified_input)
```

When calling `next`, it is almost always a better idea to use the macro form [`@next`](@ref).  See the details in its documentation.

See [`Map`](@ref), [`Filter`](@ref), [`Cat`](@ref), etc. for real-world examples.


In [117]:
# online MeanVar  2020.11.18
using Transducers
using Transducers: R_, start, next, complete, inner, xform, wrap, unwrap, wrapping

struct MeanVar <: Transducer
end


function Transducers.start(rf::R_{MeanVar}, result)
    private_state = (0, 0.0, 0.0)
    return wrap(rf, private_state, start(inner(rf), result))
end


function Transducers.next(rf::R_{MeanVar}, result, input)
    wrapping(rf, result) do st, iresult
        (n, μ, M2) = st
        n += 1
        δ = input - μ
        μ += δ/n
        δ2 = input - μ
        M2 += δ*δ2
        iinput = (μ, M2 / (n-1))
        iresult = next(inner(rf), iresult, iinput)
        return (n, μ, M2), iresult
    end
end

function Transducers.complete(rf::R_{MeanVar}, result)
    _private_state, inner_result = unwrap(rf, result)
    return complete(inner(rf), inner_result)
end


collect(MeanVar(),1:10)



10-element Array{Tuple{Float64,Float64},1}:
 (1.0, NaN)
 (1.5, 0.5)
 (2.0, 1.0)
 (2.5, 1.6666666666666667)
 (3.0, 2.5)
 (3.5, 3.5)
 (4.0, 4.666666666666667)
 (4.5, 6.0)
 (5.0, 7.5)
 (5.5, 9.166666666666666)

In [116]:
@time foldl(right, MeanVar(), 1:100)

  0.000001 seconds


(50.5, 841.6666666666666)