# oneTBB generic parallel algorithms

##### Sections
- [oneTBB Generic Algorithms](#oneTBB-Generic-Algorithms)
- _Code_: [Calculating pi with tbb::parallel_reduce](#Calculating-pi-with-tbb::parallel_reduce)
- _Code_: [Processing text with tbb::parallel_pipeline](#Processing-text-with-tbb::parallel_pipeline)

## Learning Objectives

* Gain experince with oneTBB generic algorithms 
* Use tbb::parallel_reduce to estimate pi as the area of a unit circle
* Use tbb::parallel_pipeline to create a simple text processing pipeline 

# oneTBB Generic Algorithms

While it's possible to implement a parallel application by using oneTBB to specify each individual task that can run
concurrently, it is more common to make use of one of its data parallel generic algorithms. The oneTBB library provides 
a number of [generic parallel algorithms](https://spec.oneapi.com/versions/latest/elements/oneTBB/source/algorithms.html),
including `parallel_for`, `parallel_reduce`, `parallel_scan`, `parallel_invoke` and `parallel_pipeline`. These functions 
capture many of the common parallel patterns that are key to unlocking multithreaded performance. 

In this section, we provide two exercises that will introduce you to two of these functions.

## Calculating pi with tbb::parallel_reduce

In this exercise, we calculate pi using the approach shown in the figure below. The idea is to
compute the area of a unit circle, which is equal to pi. We do this by approximating the area of 
1/4th of a unit circle, summing up the areas of ``num_intervals`` rectangles that have
a height of ``sqrt(1-x*x)`` and a width of ``dx == 1.0/num_intervals``. This sum is multiplied by 
4 to compute the total area of the unit circle, providing us with an approximation for pi.

![Algorithm to compute pi](img/pi.png)

### Run the sequential baseline implementation

Before we add any parallelism, let's validate this approach by running a baseline sequential implementation. Inspect 
the sequential code below - there are no modifications necessary. Run the first cell to create the file, then run the 
cell below it to compile and execute the code. This represents the baseline sequential result and time for our pi 
computation exercise.

1. Inspect the code cell below, then click run ▶ to save the code to a file
2. Run ▶ the cell in the __Build and Run the baseline__ section below the code snippet to compile and execute the code in the saved file

In [None]:
%%writefile lab/pi-serial.cpp
//==============================================================
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// =============================================================

#include <chrono>
#include <cmath>
#include <iostream>
#include <limits>

double calc_pi(int num_intervals) {
  double dx = 1.0 / num_intervals;
  double sum = 0.0;
  for (int i = 0; i < num_intervals; ++i) {
    double x = (i+0.5)*dx;
    double h = std::sqrt(1-x*x);
    sum += h*dx;
  }
  double pi = 4 * sum;
  return pi;
}

int main() {
  const int num_intervals = std::numeric_limits<int>::max();
  double serial_time = 0.0;
  {
    auto st0 = std::chrono::high_resolution_clock::now();
    double pi = calc_pi(num_intervals);
    serial_time = 1e-9*(std::chrono::high_resolution_clock::now() - st0).count();
    std::cout << "serial pi == " << pi << std::endl;
  }

  std::cout << "serial_time == " << serial_time << " seconds" << std::endl;
  return 0;
}

### Build and Run the baseline
Select the cell below and click Run ▶ to compile and execute the code above:

In [None]:
! chmod 755 q; chmod 755 ./scripts/run_pi-serial.sh; if [ -x "$(command -v qsub)" ]; then ./q scripts/run_pi-serial.sh; else ./scripts/run_pi-serial.sh; fi

### Implement a parallel version with tbb::parallel_reduce

Our sequential code accumulates values into a single final sum, making it a reduction operation and a match for ``tbb::parallel_reduce``.
You can find detailed documentation for ``parallel_reduce`` [here](https://software.intel.com/content/www/us/en/develop/documentation/tbb-documentation/top/intel-threading-building-blocks-developer-reference/algorithms/parallelreduce-template-function.html). Briefly though, a ``parallel_reduce`` runs a user-provided function
on chunks of the iteration space, potentially concurrently, resulting in several partial results. In our example, these partial results will be partial sums. These partial results are combined using a user-provided reduction function, in our pi example, `std::plus` might be used (hint). 

The interface of ``parallel_reduce`` needed for this example is shown below:

```cpp
template<typename Range, typename Value, typename Func, typename Reduction>
Value parallel_reduce( const Range& range, const Value& identity,
                       const Func& func, const Reduction& reduction );
```

The ``range`` object provides the iteration space, which in our example is 0 to num_intervals - 1. ``identity`` is the identity value for the 
operation that is being parallelized; for a summation, the identity value is 0, since ``sum == sum + 0``. We provide a lambda expression for 
``func`` to compute the partial results, which in our example will return a partial sum for a given range ``r``, accumulating into the 
starting value ``init``. Finally, ``reduction`` is the operation to use to combine the partial results.

For this exercise, complete the following steps:

1. Inspect the code cell below and make the following modifications.
  1. Fix the upper bound in the ``tbb::blocked_range``
  2. Fix the identity value
  3. Add the loop body code
  4. Fix the reduction function
2. When the modifications are complete, click run ▶ to save the code to a file.
3. Run ▶ the cell in the __Build and Run the modified code__ section below the code snippet to compile and execute the code in the saved file.

In [None]:
%%writefile lab/pi-parallel.cpp
//==============================================================
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// =============================================================

#include <chrono>
#include <cmath>
#include <iostream>
#include <limits>
#include <thread>
#include <tbb/tbb.h>

#define INCORRECT_VALUE 1
#define INCORRECT_FUNCTION std::minus<double>()

double calc_pi(int num_intervals) {
  double dx = 1.0 / num_intervals;
  double sum = tbb::parallel_reduce(
    /* STEP 1: fix the upper bound: */ tbb::blocked_range<int>(0, INCORRECT_VALUE), 
    /* STEP 2: provide a proper identity value for summation */ INCORRECT_VALUE,
    /* func */ 
    [=](const tbb::blocked_range<int>& r, double init) -> double {
      for (int i = r.begin(); i != r.end(); ++i) {
        // STEP 3: Add the loop body code:
        //         Hint: it will look a lot like the the sequential code.
        //               the returned value should be (init + the_partial_sum)
      }
      return init;
    },
    // STEP 4: provide the reduction function
    //         Hint, maybe std::plus<double>{}
    INCORRECT_FUNCTION
  );
  double pi = 4 * sum;
  return pi;
}

static void warmupTBB() {
  int num_threads = std::thread::hardware_concurrency();
  tbb::parallel_for(0, num_threads,
    [](unsigned int) { 
      std::this_thread::sleep_for(std::chrono::milliseconds(10)); 
  });
}

int main() {
  const int num_intervals = std::numeric_limits<int>::max();
  double parallel_time = 0.0;
  warmupTBB();
  {
    auto pt0 = std::chrono::high_resolution_clock::now();
    double pi = calc_pi(num_intervals);
    parallel_time = 1e-9*(std::chrono::high_resolution_clock::now() - pt0).count();
    std::cout << "parallel pi == " << pi << std::endl;
  }

  std::cout << "parallel_time == " << parallel_time << " seconds" << std::endl;
  return 0;
}

### Build and Run the modified code

Select the cell below and click Run ▶ to compile and execute the code that you modified above:

In [None]:
! chmod 755 q; chmod 755 ./scripts/run_pi-parallel.sh; if [ -x "$(command -v qsub)" ]; then ./q scripts/run_pi-parallel.sh; else ./scripts/run_pi-parallel.sh; fi

### Pi Example Solution (Don't peak, unless you have to)

In [None]:
%%writefile solutions/pi-parallel.cpp
//==============================================================
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// =============================================================

#include <chrono>
#include <cmath>
#include <iostream>
#include <limits>
#include <thread>
#include <tbb/tbb.h>

double calc_pi(int num_intervals) {
  double dx = 1.0 / num_intervals;
  double sum = tbb::parallel_reduce(
    /* range = */ tbb::blocked_range<int>(0, num_intervals ), 
    /* identity = */ 0.0,
    /* func */ 
    [=](const tbb::blocked_range<int>& r, double init) -> double {
      for (int i = r.begin(); i != r.end(); ++i) {
        double x = (i+0.5)*dx;
        double h = std::sqrt(1-x*x);
        init += h*dx;
      }
      return init;
    },
    std::plus<double>{}
  );
  double pi = 4 * sum;
  return pi;
}

static void warmupTBB() {
  int num_threads = std::thread::hardware_concurrency();
  tbb::parallel_for(0, num_threads,
    [](unsigned int) { 
      std::this_thread::sleep_for(std::chrono::milliseconds(10)); 
  });
}

int main() {
  const int num_intervals = std::numeric_limits<int>::max();
  double parallel_time = 0.0;
  warmupTBB();
  {
    auto pt0 = std::chrono::high_resolution_clock::now();
    double pi = calc_pi(num_intervals);
    parallel_time = 1e-9*(std::chrono::high_resolution_clock::now() - pt0).count();
    std::cout << "parallel pi == " << pi << std::endl;
  }

  std::cout << "parallel_time == " << parallel_time << " seconds" << std::endl;
  return 0;
}


In [None]:
! chmod 755 q; chmod 755 ./scripts/run_pi-solution.sh; if [ -x "$(command -v qsub)" ]; then ./q scripts/run_pi-solution.sh; else ./scripts/run_pi-solution.sh; fi

### 

## Processing text with tbb::parallel_pipeline

The loop algorithms provided by oneTBB, like tbb::parallel_reduce, operate over a range that is know in advance. In this section, 
we will look at an example where we don't know the range in advance, and instead continue processing data until we dynamically discover 
that there is no more data to process. Our example processes a large text file, a chunk at a time, until there are no more chunks to read. 
To do so, we will use a simple three-stage pipeline that (1) gets a buffer of text if one exists, (2) processes the text buffer and 
(3) writes the processed buffer to an output file.

![A simple three-stage pipeline](img/pipeline.png)

### Run the sequential baseline implementation

Again, let's start with a sequential implementation that serves as a baseline for both a correct result and the execution 
time.

Inspect the sequential code below - there are no modifications necessary. We can notice that the baseline code uses a 
``while``-loop since the number of chunks is not known upfront. Run the first cell to create the source file, then run the 
cell below it to compile and execute the code. This represents the baseline for the computation that we will convert into 
a parallel pipeline.

1. Inspect the code cell below, then click run ▶ to save the code to a file
2. Run ▶ the cell in the __Build and Run the baseline__ section below the code snippet to compile and execute the code in the saved file
3. Inspect the two new files ``serial_pipeline_before.txt`` and ``serial_pipeline_after.txt`` to see the results of changing the case of the text.

In [None]:
%%writefile lab/pipeline-serial.cpp
//==============================================================
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// =============================================================

#include <algorithm>
#include <cctype>
#include <chrono>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>

using CaseStringPtr = std::shared_ptr<std::string>;

//
// These functions are defined in common/case.cpp
//
void initCaseChange(int num_strings, int string_len, int free_list_size);
CaseStringPtr getCaseString(std::ofstream& f); 
void writeCaseString(std::ofstream& f, CaseStringPtr s);
    
void change_case_serial(std::ofstream& caseBeforeFile, std::ofstream& caseAfterFile) {
  while (CaseStringPtr s_ptr = getCaseString(caseBeforeFile)) {
    std::transform(s_ptr->begin(), s_ptr->end(), s_ptr->begin(), 
      [](char c) -> char {
        if (std::islower(c))
          return std::toupper(c);
        else if (std::isupper(c))
          return std::tolower(c);
        else
          return c;
      }
    );
    writeCaseString(caseAfterFile, s_ptr);
  }
}

int main() {
  int num_strings = 100; 
  int string_len = 100000;
  int free_list_size = 1;

  std::ofstream caseBeforeFile("lab/serial_pipeline_before.txt");
  std::ofstream caseAfterFile("lab/serial_pipeline_after.txt");
  initCaseChange(num_strings, string_len, free_list_size);

  double serial_time = 0.0;
  {
    auto pt0 = std::chrono::high_resolution_clock::now();
    change_case_serial(caseBeforeFile, caseAfterFile);
    serial_time = 1e-9*(std::chrono::high_resolution_clock::now() - pt0).count();
  }
  std::cout << "serial_time == " << serial_time << " seconds" << std::endl;
  return 0;
}

### Build and Run the baseline
Select the cell below and click Run ▶ to compile and execute the code above:

In [None]:
! chmod 755 q; chmod 755 ./scripts/run_pipeline-serial.sh; if [ -x "$(command -v qsub)" ]; then ./q scripts/run_pipeline-serial.sh; else ./scripts/run_pipeline-serial.sh; fi

### Implement a parallel version with tbb::parallel_pipeline

The oneTBB library provides a function ``tbb::parallel_pipeline`` which is for exactly cases like this. In this section, 
we will use a ``tbb::parallel_pipeline`` to create a parallel implementation of the text processing computation.

You can find detailed documentation for parallel_pipeline [here](https://software.intel.com/content/www/us/en/develop/documentation/tbb-documentation/top/intel-threading-building-blocks-developer-reference/algorithms/parallelpipeline-function.html). But briefly, to create a oneTBB
pipeline, we need to provide a ``max_number_of_live_tokens`` value and a chain of pipeline filter objects. ``max_number_of_live_tokens`` 
provides a limit on the number of items that can be in flight in the pipeline concurrently. 

And we create filters using the ``make_filter`` function:

```cpp
template<typename T, typename U, typename Func> 
filter_t<T,U> make_filter(filter::mode mode, const Func& f)
```

The ``mode`` argument can be ``tbb::filter_mode::serial_in_order``, ``tbb::filter_mode::serial_out_of_order``, or ``tbb::filter_mode::parallel``. 
A chain of filters is created by concatenating filters using `operator&`. 

A summary of the interface for ``parallel_pipeline`` needed to complete this exercise is shown below:

```cpp
void parallel_pipeline( size_t max_number_of_live_tokens, 
                        const filter_t<void,void>& filter_chain);
```

In the code cell below, we provide a skeleton of the solution. You need to complete the following steps:

1. Inspect the code cell below and make the following modifications.
  1. Set the number of tokens to use in the ``tbb::parallel_pipeline`` to ``num_tokens``
  2. Change the mode of the second filter to ``tbb::filter_mode::parallel``
  3. Add the body code to the second filter
  4. Change the mode of the final filter to ``tbb::filter_mode::serial_in_order``
  5. Add the body code to the final filter
2. When the modifications are complete, click run ▶ to save the code to a file.
3. Run ▶ the cell in the __Build and Run the modified code__ section below the code snippet to compile and execute the code in the saved file.
4. Inspect the two new files ``parallel_pipeline_before.txt`` and ``parallel_pipeline_after.txt`` to see the results of changing the case of the text.

In [None]:
%%writefile lab/pipeline-parallel.cpp
//==============================================================
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// =============================================================

#include <algorithm>
#include <cctype>
#include <chrono>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
#include <thread>

#include <tbb/tbb.h>

#define INCORRECT_VALUE 1
#define INCORRECT_MODE_A tbb::filter_mode::serial_in_order
#define INCORRECT_MODE_B tbb::filter_mode::parallel

using CaseStringPtr = std::shared_ptr<std::string>;

//
// These functions are defined in common/case.cpp
//
void initCaseChange(int num_strings, int string_len, int free_list_size);
CaseStringPtr getCaseString(std::ofstream& f); 
void writeCaseString(std::ofstream& f, CaseStringPtr s);

void change_case_parallel(int num_tokens, std::ofstream& caseBeforeFile, std::ofstream& caseAfterFile) {
  tbb::parallel_pipeline(
    /* STEP A: set the number of tokens */ INCORRECT_VALUE,
    tbb::make_filter<void, CaseStringPtr>(
      tbb::filter_mode::serial_in_order,
      [&](tbb::flow_control& fc) -> CaseStringPtr {
        CaseStringPtr s_ptr = getCaseString(caseBeforeFile);
        if (!s_ptr) 
          fc.stop();
        return s_ptr; 
      }) &
    tbb::make_filter<CaseStringPtr, CaseStringPtr>(
      /* STEP B: Change the mode to be parallel */ INCORRECT_MODE_A,
      [](CaseStringPtr s_ptr) -> CaseStringPtr {
        /* STEP C: add the body that converts the incoming s_ptr */
        return s_ptr;
      }) & // concatenation operation
    tbb::make_filter<CaseStringPtr, void>(
      /* STEP D: Change the mode to serial_in_order */ INCORRECT_MODE_B,
      [&](CaseStringPtr s_ptr) -> void {
        /* STEP E: add the body to write the output */
      }) 
  );
}

static void warmupTBB() {
  int num_threads = std::thread::hardware_concurrency();
  tbb::parallel_for(0, num_threads,
    [](unsigned int) { 
      std::this_thread::sleep_for(std::chrono::milliseconds(10)); 
  });
}

int main() {
  int num_tokens = std::thread::hardware_concurrency();
  int num_strings = 100; 
  int string_len = 100000;
  int free_list_size = num_tokens;

  std::ofstream caseBeforeFile("lab/parallel_pipeline_before.txt");
  std::ofstream caseAfterFile("lab/parallel_pipeline_after.txt");
  initCaseChange(num_strings, string_len, free_list_size);

  warmupTBB();
  double parallel_time = 0.0;
  {
    auto pt0 = std::chrono::high_resolution_clock::now();
    change_case_parallel(num_tokens, caseBeforeFile, caseAfterFile);
    parallel_time = 1e-9*(std::chrono::high_resolution_clock::now() - pt0).count();
  }
  std::cout << "parallel_time == " << parallel_time << " seconds" << std::endl;
  return 0;
}

### Build and Run the modified code

Select the cell below and click Run ▶ to compile and execute the code that you modified above:

In [None]:
! chmod 755 q; chmod 755 ./scripts/run_pipeline-parallel.sh; if [ -x "$(command -v qsub)" ]; then ./q scripts/run_pipeline-parallel.sh; else ./scripts/run_pipeline-parallel.sh; fi

### Case-Change Pipeline Solution (Don't peak, unless you have to)

In [None]:
%%writefile solutions/pipeline-solution.cpp
//==============================================================
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// =============================================================

#include <algorithm>
#include <cctype>
#include <chrono>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
#include <thread>

#include <tbb/tbb.h>

using CaseStringPtr = std::shared_ptr<std::string>;

//
// These functions are defined in common/case.cpp
//
void initCaseChange(int num_strings, int string_len, int free_list_size);
CaseStringPtr getCaseString(std::ofstream& f); 
void writeCaseString(std::ofstream& f, CaseStringPtr s);

void change_case_parallel(int num_tokens, std::ofstream& caseBeforeFile, std::ofstream& caseAfterFile) {
  tbb::parallel_pipeline(
    /* tokens */ num_tokens,
    /* the get filter */
    tbb::make_filter<void, CaseStringPtr>(
      /* filter node */ tbb::filter_mode::serial_in_order,
      /* filter body */
      [&](tbb::flow_control& fc) -> CaseStringPtr {
        CaseStringPtr s_ptr = getCaseString(caseBeforeFile);
        if (!s_ptr) 
          fc.stop();
        return s_ptr; 
      }) & // concatenation operation
    /* make the change case filter */
    tbb::make_filter<CaseStringPtr, CaseStringPtr>(
      /* filter node */ tbb::filter_mode::parallel,
      /* filter body */
      [](CaseStringPtr s_ptr) -> CaseStringPtr {
        std::transform(s_ptr->begin(), s_ptr->end(), s_ptr->begin(), 
          [](char c) -> char {
            if (std::islower(c))
              return std::toupper(c);
            else if (std::isupper(c))
              return std::tolower(c);
            else
              return c;
          });
        return s_ptr;
      }) & // concatenation operation
    /* make the write filter */
    tbb::make_filter<CaseStringPtr, void>(
      /* filter node */ tbb::filter_mode::serial_in_order,
      /* filter body */
      [&](CaseStringPtr s_ptr) -> void {
        writeCaseString(caseAfterFile, s_ptr);
      }) 
  );
}

static void warmupTBB() {
  int num_threads = std::thread::hardware_concurrency();
  tbb::parallel_for(0, num_threads,
    [](unsigned int) { 
      std::this_thread::sleep_for(std::chrono::milliseconds(10)); 
  });
}

int main() {
  int num_tokens = std::thread::hardware_concurrency();
  int num_strings = 100; 
  int string_len = 100000;
  int free_list_size = num_tokens;

  std::ofstream caseBeforeFile("solutions/solution_pipeline_before.txt");
  std::ofstream caseAfterFile("solutions/solution_pipeline_after.txt");
  initCaseChange(num_strings, string_len, free_list_size);

  warmupTBB();
  double parallel_time = 0.0;
  {
    auto pt0 = std::chrono::high_resolution_clock::now();
    change_case_parallel(num_tokens, caseBeforeFile, caseAfterFile);
    parallel_time = 1e-9*(std::chrono::high_resolution_clock::now() - pt0).count();
  }
  std::cout << "parallel_time == " << parallel_time << " seconds" << std::endl;
  return 0;
}

In [None]:
! chmod 755 q; chmod 755 ./scripts/run_pipeline-solution.sh; if [ -x "$(command -v qsub)" ]; then ./q scripts/run_pipeline-solution.sh; else ./scripts/run_pipeline-solution.sh; fi

## Next steps

If you are ready, go to [the next module](../03_oneTBB_flow_graph/oneTBB_flow_graph.ipynb).