Skip to content

Concurrency example with worker threads

Kentrix edited this page Apr 4, 2018 · 34 revisions

This is a translation of the last snippet of code of the article Comparative Concurrency with Haskell.

The code is as close as possible to the original, but it follows the transient philosophy. The original is a canonical example of multi-threading created by @snoyberg to teach programmers coming from languages like Go or Elixir to program with multi-threading and channels in Haskell.

The program spawn a number of worker threads which will each sleep for a random period of time, grab an integer off of a shared work queue, square it, and put the result back on a result queue. Meanwhile, a master thread will fill up the work queue with integers, and read and print results.

You will see some differences:

  • There are no loops!
  • Exceptions treatment is explicit and in the hands of the programmer.
  • Threading is first class. there are no special combinators or primitives for multi-threading. It is an effect in the TransIO monad.
  • Number of threads to assign to each worker is explicit and in the hands of the programmer
  • The changes are minimally intrusive, only in order to add functionality. No reinvention of the wheel
  • No result channel is necessary: it is substituted by monadic composition: printResult get each result directly from each worker thread. This is the only major change necessary since it is how a transient programmer would leverage the capability of sequencing computations in presence of the multithreading effect.

Indeed this last point also avoid a subtle problem introduced by lazyness and an excessive thread compartimentation, that sometimes can be the result of the lack of composability of the IO monad when there are multiple threads. The problem is mentioned [here] (https://www.reddit.com/r/haskell/comments/5e97sq/comparative_concurrency_with_haskell_fp_complete/daauk64/): The result of each worker is not evaluated, since it is pure. Instead, it is printResults, in a single thread, the one that perform all the work. This destroy the intended parallelism.

Although this kind of problem is not as critical at it seems, since, in this context, the usual processing of a worker are IO computations, that are eagerly executed. But even a IO bound computations may leave unevaluated thunks.

In this snippet, since printResult run in each thread of the workers, this problems does not happens. This permits the exhaustive execution of the task within the desired thread. Accumulation of processing work in the coordinating thread may be an issue in packages like async that have no monadic composability with multi-threading.

Oops: this creates another problem: printing under linux does not block by default at the line level. Since printResult run here under each working thread, this mix the output and produces a mess. TO avoid mixing lines in the output, LineBuffering has been set.

Moreover:

  • Under keep, it is possible to compose this program with other parallel programs using monadic/applicative/alternative operators, since threading in the TransIO monad is first class.
  • It is possible to convert the program into a distributed and/or web application with little effort (example pending) since distributed and web computing is also first class.

Thank to the wonderful work of the FPcomplete people, the program is runnable in the command line if you have stack installed.

#!/usr/bin/env stack
{- stack --install-ghc --resolver lts-8.3 runghc
    --package random --package transient --package stm-chans
-}
{-#LANGUAGE ScopedTypeVariables #-}
import Control.Concurrent (threadDelay)
import Transient.Base(parallel,keep,threads,StreamData(..),killChilds)
import Transient.Backtrack(onFinish)
import Control.Exception
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (readTBMChan, writeTBMChan, newTBMChan, closeTBMChan)
import System.Random (randomRIO)
import Control.Applicative
import Control.Monad.IO.Class
import System.IO

workerCount = 250
workloadCount = 10000 :: Int
minDelay = 250000 -- in microseconds, == 0.25 seconds
maxDelay = 750000 --                  == 0.75 seconds


worker requestChan workerId = do
      -- set line buffering since `printResult` run in each worker thread
      liftIO $ hSetBuffering stdout LineBuffering

      -- run a parallel thread for each worker
      -- `parallel` return `empty` to the invoking thread and execute his parameter
      -- in an internal loop, spawning threads when results are returned
      -- since `threads` impose a limit of 1, the worker will perform the work 
      -- with one thread
      r <- threads 1 $ parallel $ do
            delay <- randomRIO (minDelay, maxDelay)
            threadDelay delay
            mint <- atomically $ readTBMChan requestChan
            case mint of
                -- stop the thread when the channel is closed
                Nothing -> return SDone
                Just int -> return $ SMore(workerId, int, int * int)
      case r of
         SDone -> empty  -- no response, stop further actions
         SMore r -> return r


main =  keep $ do   -- run the Transient monad
    -- Create our communication channels. Now the response channel is
    -- also bounded and closable.
    requestChan <- liftIO . atomically $ newTBMChan (workerCount * 2)
--    responseChan <- atomically $ newTBMChan (workerCount * 2)      -- mot needed

    -- We're going to have three main threads. Let's define them all
    -- here. Note that we're _defining_ an action to be run, not
    -- running it yet! We'll run them below.
    let
        -- runWorkers is going to run all of the worker threads
        runWorkers = do
            -- use the alternative operator to spawn all the workers
            -- since each worker return empty to the main thread
            -- all the workers will be initiated
            foldr  (<|>) empty $ map (worker requestChan) [1..workerCount]
            -- Workers are all done, so close the response channel
--            atomically $ closeTBMChan responseChan -- not needed

        -- Fill up the request channel, exactly the same as before
        fillRequests = do
            liftIO $ mapM_ (atomically . writeTBMChan requestChan) [1..workloadCount]
            liftIO $ atomically $ closeTBMChan requestChan
            -- this task stop here
            empty

        -- Print one result, no loop
        printResult  (workerId, int, square) = liftIO $ do
                    -- Print it...
                    putStrLn $ concat
                        [ "Worker #"
                        , show workerId
                        , ": square of "
                        , show int
                        , " is "
                        , show square
                        ]

    -- If any thread dies with an exception, the other threads are killed 
    -- and the exception also will execute al other finalization actions if present.
    -- `killChilds` will kill all the child threads created below
    -- this sentence could also retry the process with `killChilds >> noFinish` 
    onFinish (\(Just (e :: SomeException)) -> killChilds)
    
    -- Now that we've defined our actions, we can use the alternative operator to
    -- run all of them. The alternative operator run the worker threads and `fillRequests. 
    -- `printResult` is executed for each worker output. `fillRequest` does not
    -- return anything to be printed since it finalizes with `empty`
    r <- runWorkers <|> fillRequests 
    printResult r


    return ()

Additionally, The runWorkers could be collapsed into a single parallel block with all the threads assigned to it and using the thread identifier as the worker identifier:

runWorkers = do
      r <- threads workerCount $ parallel $ do
            delay <- randomRIO (minDelay, maxDelay)
            threadDelay delay
            mint <- atomically $ readTBMChan requestChan
            case mint of
                -- stop the thread when the channel is closed
                Nothing -> return SDone
                Just int ->  do
                      workerId <- myThreadId
                      return$ SMore(workerId, int, int * int)
      case r of
         SDone -> empty  -- no response, stop further actions
         SMore r -> return r