Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reductions #33

Closed
mratsim opened this issue Dec 7, 2019 · 2 comments
Closed

Implement reductions #33

mratsim opened this issue Dec 7, 2019 · 2 comments

Comments

@mratsim
Copy link
Owner

mratsim commented Dec 7, 2019

No description provided.

@mratsim
Copy link
Owner Author

mratsim commented Dec 7, 2019

The reduction system should accomodate complex reductions, for example parallel variance, logsumexp or parallel softmax cross-entropy.

Arraymancer examples

In Arraymancer this is how parallel variance is done (https://github.com/mratsim/Arraymancer/blob/9a56648850fc34fdd9da1f9c6874a87ddecc8932/src/tensor/aggregate.nim#L100-L130):

On a whole tensor

proc variance*[T: SomeFloat](t: Tensor[T]): T =
  ## Compute the sample variance of all elements
  ## The normalization is by (n-1), also known as Bessel's correction,
  ## which partially correct the bias of estimating a population variance from a sample of this population.
  let mean = t.mean()
  result = t.fold_inline() do:
    # Initialize to the first element
    x = square(y - mean)
  do:
    # Fold in parallel by summing remaning elements
    x += square(y - mean)
  do:
    # Merge parallel folds
    x += y
  result /= (t.size-1).T

On an axis

proc variance*[T: SomeFloat](t: Tensor[T], axis: int): Tensor[T] {.noInit.} =
  ## Compute the variance of all elements
  ## The normalization is by the (n-1), like in the formal definition
  let mean = t.mean(axis)
  result = t.fold_axis_inline(Tensor[T], axis) do:
    # Initialize to the first element
    x = square(y - mean)
  do:
    # Fold in parallel by summing remaning elements
    for ex, ey, em in mzip(x,y,mean):
      ex += square(ey - em)
  do:
    # Merge parallel folds
    x += y
  result /= (t.shape[axis]-1).T

With a 2-pass logsumexp

proc logsumexp_classic[T: SomeFloat](t: Tensor[T]): T =
  # Advantage:
  #  - OpenMP parallel
  #  - No branching in a tight loop
  # Disadvantage:
  #  - Two loops over the data, might be costly if tensor is big
  let alpha = t.max # first loop over data

  result = t.fold_inline() do:
    # Init first element
    x = exp(y - alpha)
  do:
    # Process next elements
    x += exp(y - alpha)
  do:
    # Merge the partial folds
    x += y

  result = alpha + ln(result)

Similarly for softmax cross-entropy implemented via parallel one-pass logsumexp

proc frobenius_inner_prod*[T](a,b: Tensor[T]): T =
  sum:
    map2_inline(a,b):
      x * y

proc streaming_max_sumexp*[T](t: Tensor[T]): tuple[max:T, sumexp: T] {.noSideEffect, inline.}=
  # Advantage:
  #  - Only one loop over the data
  #  - Can be done "on-the-fly"
  # Disadvantage:
  #  - no parallelization
  #  - branching in tight loop
  result.max = -Inf.T   # will store the streaming max of the tensor 
  result.sumexp = 0.T   # will store the streaming sum of exp of the tensor

  for x in t:
    if x <= result.max:
      result.sumexp += exp(x - result.max)
    else:
      result.sumexp = result.sumexp * exp(result.max - x) + 1
      result.max = x

proc logsumexp*[T: SomeFloat](t: Tensor[T]): T =
  # Alternative one-pass log sumexp
  let (max, sumexp) = t.streaming_max_sumexp
  result = max + ln(sumexp)

proc softmax_cross_entropy*[T](input, target: Tensor[T]): T =
  ## Softmax function + Cross-Entropy loss fused in one layer.
  ##
  ## Input:
  ##   - A Tensor of shape [batch_size, predicted_labels_probabilities]
  ##   - The target values of shape [batchsize, truth_labels_probability]
  ## Returns:
  ##   - Apply a softmax activation and returns the cross-entropy loss.
  when compileOption("boundChecks"):
    check_input_target(input, target)

  let batch_size = input.shape[0]
  result = frobenius_inner_prod(input, target)

  let sum_logsumexp = fold_axis_inline(input, T, fold_axis=0) do:
    x = y.logsumexp
  do:
    x += y.logsumexp
  do:
    x += y

  result = (sum_logsumexp - result) / T(batch_size)

Laser example

This is a thin wrapper over OpenMP and relies on OpenMP static scheduling.

https://github.com/numforge/laser/blob/d1e6ae6106564bfb350d4e566261df97dbb578b3/examples/ex05_tensor_parallel_reduction.nim#L47-L59

proc reduction_localsum_system_atomic[T](x, y: Tensor[T]): T =
  forEachStaged xi in x, yi in y:
    openmp_config:
      use_simd: false
      nowait: true
    iteration_kind:
      contiguous
    before_loop:
      var local_sum = 0.T
    in_loop:
      local_sum += xi + yi
    after_loop:
      result.atomicInc local_sum

Proposed syntax

Unfortunately Arraymancer syntax is a bit too magic with x and y appearing out of nowhere.

And it's hard to see when the element is a float32 or a Tensor[float32] for example. Instead a lightweight DSL could be written with something resembling that:

For a parallel sum

var finalResult: int

parallelFor i in 0 ..< 100:
  reduce(finalResult):
    neutralAccum(sum: int): 0
    foldOperation(accum: var int, nextElem: int): accum += nextElem
    mergeAccum(sum1, sum2: int): sum1+sum2

For a parallel variance

let mean = mean(mySeq)
var finalResult: int

parallelFor i in 0 ..< mySeq.len:
  captures: {mean}
  reduce(finalResult):
    neutralAccum(accum: int): 0
    foldOperation(accum: var int, x: int): accum += (x-mean)^2
    mergeAccum(acc1, acc2: int): acc1 + acc2

And the code transformation will generate a lazy tree of tasks that ends with

...
...
finalResult = sync(task0) + sync(task1)

@mratsim
Copy link
Owner Author

mratsim commented Dec 8, 2019

Closed by #34

Final syntax (high-level exported wrapper TBD):

proc sumReduce(n: int): int =
  var waitableSum: Flowvar[int]

  parallelReduceImpl i in 0 .. n, stride = 1:
    reduce(waitableSum):
      prologue:
        var localSum = 0
      fold:
        localSum += i
      merge(remoteSum):
        localSum += sync(remoteSum)
      return localSum

  result = sync(waitableSum)

init(Weave)
let sum1M = sumReduce(1000000)
echo "Sum reduce(0..1000000): ", sum1M
doAssert sum1M == 500_000_500_000
exit(Weave)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant