diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..4330bf6 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,24 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - uses: purescript-contrib/setup-purescript@main + + - name: Install esbuild + run: npm install + + - name: Build source + run: spago build + + - name: Run tests + run: npm run test diff --git a/package.json b/package.json new file mode 100644 index 0000000..a25998f --- /dev/null +++ b/package.json @@ -0,0 +1,11 @@ +{ + "private": true, + "type": "module", + "scripts": { + "test:bundle": "spago bundle-app --main Test.Worker --to output/Test.Worker.js --platform node", + "test": "npm run test:bundle && spago test" + }, + "devDependencies": { + "esbuild": "^0.14.39" + } +} diff --git a/packages.dhall b/packages.dhall index b51cba4..5ec3f48 100644 --- a/packages.dhall +++ b/packages.dhall @@ -1,4 +1,5 @@ let upstream = - https://github.com/purescript/package-sets/releases/download/psc-0.14.3-20210811/packages.dhall sha256:a2de7ef2f2e753733eddfa90573a82da0c7c61d46fa87d015b7f15ef8a6e97d5 + https://github.com/purescript/package-sets/releases/download/psc-0.15.0-20220509/packages.dhall + sha256:d4c1a03606efdbb7bb7745a9d3aa908cb1ba5611921373659a4c7bf1c41c106c in upstream diff --git a/spago.dhall b/spago.dhall index ba25229..49e44bb 100644 --- a/spago.dhall +++ b/spago.dhall @@ -11,6 +11,7 @@ , "exceptions" , "foldable-traversable" , "foreign-object" + , "integers" , "newtype" , "parallel" , "prelude" diff --git a/src/Node/WorkerBees.js b/src/Node/WorkerBees.js index 2b4135d..d35a82a 100644 --- a/src/Node/WorkerBees.js +++ b/src/Node/WorkerBees.js @@ -1,22 +1,17 @@ -var fs = require("fs"); -var workerThreads = require("worker_threads"); +import workerThreads from "worker_threads"; -exports.spawnImpl = function(left, right, worker, options, cb) { +export function spawnImpl(left, right, worker, options, cb) { worker.resolve(function(err, res) { if (err) { return cb(left(err))(); } var thread; - var requirePath = res.filePath.replace(/\\/g, "\\\\"); - var jsEval = res.export - ? [ - 'var worker = require("' + requirePath + '").' + res.export + ';', - 'worker.spawn ? worker.spawn() : worker();' - ].join('\n') - : 'require("' + requirePath + '")'; + // Must be either an absolute path or a relative path (i.e. relative to the + // current working directory) starting with ./ or ../, if a filepath. + // https://nodejs.org/api/worker_threads.html#new-workerfilename-options + var importPath = res.filePath; try { - thread = new workerThreads.Worker(jsEval, { - eval: true, + thread = new workerThreads.Worker(importPath, { workerData: options.workerData }); thread.on('message', function(value) { @@ -35,77 +30,9 @@ exports.spawnImpl = function(left, right, worker, options, cb) { cb(left(e))(); } }); -}; - -exports.makeImpl = function(ctor) { - var originalFn = Error.prepareStackTrace; - var worker, workerError, callerFilePath, callerLineNumber; - - Error.prepareStackTrace = function(err, stack) { - return stack; - }; - - try { - var stack = new Error().stack; - - do { - var frame = stack.shift(); - callerFilePath = frame.getFileName(); - callerLineNumber = frame.getLineNumber(); - } while (callerFilePath === __filename); - - Error.prepareStackTrace = originalFn; - - } catch (e) { - Error.prepareStackTrace = originalFn; - throw new Error("Unable to define worker."); - } - - function resolve(cb) { - if (worker || workerError) { - return cb(workerError, worker); - } - - fs.readFile(callerFilePath, function(err, buff) { - if (err) { - workerError = err - return cb(workerError); - } - - var callerModuleLines = buff.toString('utf8').split('\n'); - var callerLine = callerModuleLines[callerLineNumber - 1]; - var workerName = callerLine.match(new RegExp("^var ([\\p{Ll}_][\\p{L}0-9_']*) = Node_WorkerBees\\.make", "u")); - - if (workerName) { - var exportRegex = new RegExp("^\\s*" + workerName[1] + ":\\s*" + workerName[1]); - for (var i = callerLineNumber; i < callerModuleLines.length; i++) { - if (callerModuleLines[i] === "module.exports = {") { - var exported = callerModuleLines.slice(i).some(function(line) { - return exportRegex.test(line); - }); - if (exported) { - worker = { - filePath: callerFilePath, - export: workerName[1] - }; - return cb(void 0, worker); - } - } - } - } - - workerError = new Error("Worker must be defined in a top-level, exported binding"); - cb(workerError); - }); - } - - return { - resolve: resolve, - spawn: mainImpl(ctor) - }; -}; +} -exports.unsafeMakeImpl = function(params) { +export function unsafeMakeImpl(params) { return { resolve: function(cb) { cb(void 0, params); @@ -114,9 +41,9 @@ exports.unsafeMakeImpl = function(params) { throw new Error("Cannot spawn unsafe worker directly."); } }; -}; +} -function mainImpl(ctor) { +export function mainImpl(ctor) { return function() { if (workerThreads.isMainThread) { throw new Error("Worker running on main thread."); @@ -143,13 +70,11 @@ function mainImpl(ctor) { }; } -exports.mainImpl = mainImpl; - -exports.postImpl = function(value, worker) { +export function postImpl(value, worker) { worker.postMessage(value); -}; +} -exports.terminateImpl = function(left, right, worker, cb) { +export function terminateImpl(left, right, worker, cb) { worker.terminate() .then(function() { cb(right(void 0))(); @@ -157,8 +82,8 @@ exports.terminateImpl = function(left, right, worker, cb) { .catch(function(err) { cb(left(err))(); }); -}; +} -exports.threadId = function(worker) { +export function threadId(worker) { return worker.threadId; -}; +} diff --git a/src/Node/WorkerBees.purs b/src/Node/WorkerBees.purs index 91f7013..69dea9d 100644 --- a/src/Node/WorkerBees.purs +++ b/src/Node/WorkerBees.purs @@ -4,10 +4,8 @@ module Node.WorkerBees , WorkerOptions , Worker , ThreadId(..) - , make , makeAsMain , unsafeWorkerFromPath - , unsafeWorkerFromPathAndExport , lift , liftReader , liftEffect @@ -65,9 +63,7 @@ foreign import data Worker :: Type -> Type -> Type -> Type foreign import data WorkerThread :: Type -> Type -foreign import makeImpl :: forall a i o. WorkerConstructor a i o -> Worker a i o - -foreign import unsafeMakeImpl :: forall a i o. { filePath :: String, export :: String } -> Worker a i o +foreign import unsafeMakeImpl :: forall a i o. { filePath :: String } -> Worker a i o foreign import mainImpl :: forall a i o. WorkerConstructor a i o -> Effect Unit @@ -79,21 +75,22 @@ foreign import terminateImpl :: forall i. EffectFn4 (forall x y. x -> Either x y foreign import threadId :: forall i. WorkerThread i -> ThreadId --- | Builds a new Worker. Treat this like it's special top-level declaration --- | syntax. Workers can only be declared at the top-level with `make`, and they --- | _must_ be exported. Failing to meet these criteria will result in a runtime --- | exception. -make :: forall a i o. Sendable o => WorkerConstructor a i o -> Worker a i o -make = makeImpl - +-- | Implements the worker code that can later be called via the +-- | `unsafeWorkerFromPath` function. This code _must_ be bundled such that +-- | `main` is actually called in the file. makeAsMain :: forall a i o. Sendable o => WorkerConstructor a i o -> Effect Unit makeAsMain = mainImpl +-- | Builds a new worker given a path to the compiled code constituting the `main` +-- | function that should execute in the worker. The worker code should be created +-- | using `makeAsMain`. The path must be either an absolute path or a relative +-- | path that begins with ./ or ../ +-- | +-- | ```purs +-- | unsafeWorkerFromPath "./output/My.Bundled.Output/index.js" +-- | ``` unsafeWorkerFromPath :: forall a i o. Sendable o => String -> Worker a i o -unsafeWorkerFromPath = unsafeMakeImpl <<< { filePath: _, export: "" } - -unsafeWorkerFromPathAndExport :: forall a i o. Sendable o => { filePath :: String, export :: String } -> Worker a i o -unsafeWorkerFromPathAndExport = unsafeMakeImpl +unsafeWorkerFromPath = unsafeMakeImpl <<< { filePath: _ } -- | Instantiates a new worker thread. If this worker subscribes to input, it -- | will need to be cleaned up with `terminate`, otherwise it will hold your diff --git a/src/Node/WorkerBees/Aff.purs b/src/Node/WorkerBees/Aff.purs index d81b4ee..bc633ad 100644 --- a/src/Node/WorkerBees/Aff.purs +++ b/src/Node/WorkerBees/Aff.purs @@ -45,7 +45,7 @@ post :: forall i. Sendable i => i -> WorkerThread i -> Aff Unit post i = liftEffect <<< Worker.post i -- | Terminates the worker thread. -terminate :: forall i . WorkerThread i -> Aff Unit +terminate :: forall i. WorkerThread i -> Aff Unit terminate worker = invincible $ makeAff \k -> do Worker.terminate worker k pure mempty diff --git a/src/Node/WorkerBees/Aff/Pool.purs b/src/Node/WorkerBees/Aff/Pool.purs index 6f74496..9eb25db 100644 --- a/src/Node/WorkerBees/Aff/Pool.purs +++ b/src/Node/WorkerBees/Aff/Pool.purs @@ -63,7 +63,7 @@ terminate (WorkerPool { queue, threads }) = do for_ pending (AVar.kill termError <<< snd) -- | Submits a new input to the worker pool, and waits for the reply. -invoke :: forall i o . Sendable i => WorkerPool i o -> i -> Aff o +invoke :: forall i o. Sendable i => WorkerPool i o -> i -> Aff o invoke (WorkerPool { queue }) i = do res <- AVar.empty AVar.put (Tuple i res) queue diff --git a/test/Main.purs b/test/Main.purs index 3a7ef2c..acf218f 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -10,11 +10,13 @@ import Node.WorkerBees (Worker) import Node.WorkerBees as Worker import Node.WorkerBees.Aff.Pool as WorkerPool -worker :: Worker Unit Int String -worker = Worker.make (Worker.lift show) - main :: Effect Unit main = Aff.launchAff_ do Console.log "Main..." - res <- WorkerPool.poolTraverse worker unit 4 (Array.range 1 100) + + let + worker :: Worker Unit Int String + worker = Worker.unsafeWorkerFromPath "./output/Test.Worker.js" + + res <- WorkerPool.poolTraverse worker unit 1 (Array.range 1 100) Console.logShow res diff --git a/test/Worker.purs b/test/Worker.purs new file mode 100644 index 0000000..a719ca6 --- /dev/null +++ b/test/Worker.purs @@ -0,0 +1,10 @@ +module Test.Worker where + +import Prelude + +import Data.Int as Int +import Effect (Effect) +import Node.WorkerBees as Worker + +main :: Effect Unit +main = Worker.makeAsMain (Worker.lift (Int.toStringAs Int.decimal))