Points = SCAN(public:adhoc:sc_points);
aggs = [from Points emit sum(v) as _sum, sum(v*v) as sumsq, count(v) as cnt];
newBad = empty(id:int, v:float);
bounds = [from Points emit min(v) as lower, max(v) as upper];
-- number of allowed standard deviations
const Nstd: 2;
-- Incrementally update aggs and stats
new_aggs = [from newBad emit sum(v) as _sum, sum(v*v) as sumsq,
count(v) as cnt];
aggs = [from aggs, new_aggs
emit aggs._sum - new_aggs._sum as _sum,
aggs.sumsq - new_aggs.sumsq as sumsq,
aggs.cnt - new_aggs.cnt as cnt];
stats = [from aggs
emit _sum/cnt as mean,
SQRT(1.0/(cnt*(cnt-1)) * (cnt * sumsq - _sum * _sum)) as std];
-- Compute the new bounds
newBounds = [from stats emit mean - Nstd * std as lower,
mean + Nstd * std as upper];
newBad = [from Points, bounds, newBounds
where (newBounds.upper < v
and v <= bounds.upper)
or (newBounds.lower > v
and v >= bounds.lower)
emit Points.*];
bounds = newBounds;
continue = [from newBad emit count(v) > 0];
while continue;
output = [from Points, bounds
where Points.v > bounds.lower
and Points.v < bounds.upper
emit Points.*];
store(output, sc_points_clipped);