Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for PureScript 0.15 #3

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: CI

on:
push:
branches: [main]
pull_request:

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- uses: purescript-contrib/setup-purescript@main

- name: Install esbuild
run: npm install

- name: Build source
run: spago build

- name: Run tests
run: npm run test
11 changes: 11 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"private": true,
"type": "module",
"scripts": {
"test:bundle": "spago bundle-app --main Test.Worker --to output/Test.Worker.js --platform node",
"test": "npm run test:bundle && spago test"
},
"devDependencies": {
"esbuild": "^0.14.39"
}
}
3 changes: 2 additions & 1 deletion packages.dhall
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
let upstream =
https://github.com/purescript/package-sets/releases/download/psc-0.14.3-20210811/packages.dhall sha256:a2de7ef2f2e753733eddfa90573a82da0c7c61d46fa87d015b7f15ef8a6e97d5
https://github.com/purescript/package-sets/releases/download/psc-0.15.0-20220509/packages.dhall
sha256:d4c1a03606efdbb7bb7745a9d3aa908cb1ba5611921373659a4c7bf1c41c106c

in upstream
1 change: 1 addition & 0 deletions spago.dhall
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
, "exceptions"
, "foldable-traversable"
, "foreign-object"
, "integers"
, "newtype"
, "parallel"
, "prelude"
Expand Down
109 changes: 17 additions & 92 deletions src/Node/WorkerBees.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
var fs = require("fs");
var workerThreads = require("worker_threads");
import workerThreads from "worker_threads";

exports.spawnImpl = function(left, right, worker, options, cb) {
export function spawnImpl(left, right, worker, options, cb) {
worker.resolve(function(err, res) {
if (err) {
return cb(left(err))();
}
var thread;
var requirePath = res.filePath.replace(/\\/g, "\\\\");
var jsEval = res.export
? [
'var worker = require("' + requirePath + '").' + res.export + ';',
'worker.spawn ? worker.spawn() : worker();'
].join('\n')
: 'require("' + requirePath + '")';
// Must be either an absolute path or a relative path (i.e. relative to the
// current working directory) starting with ./ or ../, if a filepath.
// https://nodejs.org/api/worker_threads.html#new-workerfilename-options
var importPath = res.filePath;
try {
thread = new workerThreads.Worker(jsEval, {
eval: true,
thread = new workerThreads.Worker(importPath, {
workerData: options.workerData
});
thread.on('message', function(value) {
Expand All @@ -35,77 +30,9 @@ exports.spawnImpl = function(left, right, worker, options, cb) {
cb(left(e))();
}
});
};

exports.makeImpl = function(ctor) {
var originalFn = Error.prepareStackTrace;
var worker, workerError, callerFilePath, callerLineNumber;

Error.prepareStackTrace = function(err, stack) {
return stack;
};

try {
var stack = new Error().stack;

do {
var frame = stack.shift();
callerFilePath = frame.getFileName();
callerLineNumber = frame.getLineNumber();
} while (callerFilePath === __filename);

Error.prepareStackTrace = originalFn;

} catch (e) {
Error.prepareStackTrace = originalFn;
throw new Error("Unable to define worker.");
}

function resolve(cb) {
if (worker || workerError) {
return cb(workerError, worker);
}

fs.readFile(callerFilePath, function(err, buff) {
if (err) {
workerError = err
return cb(workerError);
}

var callerModuleLines = buff.toString('utf8').split('\n');
var callerLine = callerModuleLines[callerLineNumber - 1];
var workerName = callerLine.match(new RegExp("^var ([\\p{Ll}_][\\p{L}0-9_']*) = Node_WorkerBees\\.make", "u"));

if (workerName) {
var exportRegex = new RegExp("^\\s*" + workerName[1] + ":\\s*" + workerName[1]);
for (var i = callerLineNumber; i < callerModuleLines.length; i++) {
if (callerModuleLines[i] === "module.exports = {") {
var exported = callerModuleLines.slice(i).some(function(line) {
return exportRegex.test(line);
});
if (exported) {
worker = {
filePath: callerFilePath,
export: workerName[1]
};
return cb(void 0, worker);
}
}
}
}

workerError = new Error("Worker must be defined in a top-level, exported binding");
cb(workerError);
});
}

return {
resolve: resolve,
spawn: mainImpl(ctor)
};
};
}

exports.unsafeMakeImpl = function(params) {
export function unsafeMakeImpl(params) {
return {
resolve: function(cb) {
cb(void 0, params);
Expand All @@ -114,9 +41,9 @@ exports.unsafeMakeImpl = function(params) {
throw new Error("Cannot spawn unsafe worker directly.");
}
};
};
}

function mainImpl(ctor) {
export function mainImpl(ctor) {
return function() {
if (workerThreads.isMainThread) {
throw new Error("Worker running on main thread.");
Expand All @@ -143,22 +70,20 @@ function mainImpl(ctor) {
};
}

exports.mainImpl = mainImpl;

exports.postImpl = function(value, worker) {
export function postImpl(value, worker) {
worker.postMessage(value);
};
}

exports.terminateImpl = function(left, right, worker, cb) {
export function terminateImpl(left, right, worker, cb) {
worker.terminate()
.then(function() {
cb(right(void 0))();
})
.catch(function(err) {
cb(left(err))();
});
};
}

exports.threadId = function(worker) {
export function threadId(worker) {
return worker.threadId;
};
}
29 changes: 13 additions & 16 deletions src/Node/WorkerBees.purs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ module Node.WorkerBees
, WorkerOptions
, Worker
, ThreadId(..)
, make
, makeAsMain
, unsafeWorkerFromPath
, unsafeWorkerFromPathAndExport
, lift
, liftReader
, liftEffect
Expand Down Expand Up @@ -65,9 +63,7 @@ foreign import data Worker :: Type -> Type -> Type -> Type

foreign import data WorkerThread :: Type -> Type

foreign import makeImpl :: forall a i o. WorkerConstructor a i o -> Worker a i o

foreign import unsafeMakeImpl :: forall a i o. { filePath :: String, export :: String } -> Worker a i o
foreign import unsafeMakeImpl :: forall a i o. { filePath :: String } -> Worker a i o

foreign import mainImpl :: forall a i o. WorkerConstructor a i o -> Effect Unit

Expand All @@ -79,21 +75,22 @@ foreign import terminateImpl :: forall i. EffectFn4 (forall x y. x -> Either x y

foreign import threadId :: forall i. WorkerThread i -> ThreadId

-- | Builds a new Worker. Treat this like it's special top-level declaration
-- | syntax. Workers can only be declared at the top-level with `make`, and they
-- | _must_ be exported. Failing to meet these criteria will result in a runtime
-- | exception.
make :: forall a i o. Sendable o => WorkerConstructor a i o -> Worker a i o
make = makeImpl

-- | Implements the worker code that can later be called via the
-- | `unsafeWorkerFromPath` function. This code _must_ be bundled such that
-- | `main` is actually called in the file.
makeAsMain :: forall a i o. Sendable o => WorkerConstructor a i o -> Effect Unit
makeAsMain = mainImpl

-- | Builds a new worker given a path to the compiled code constituting the `main`
-- | function that should execute in the worker. The worker code should be created
-- | using `makeAsMain`. The path must be either an absolute path or a relative
-- | path that begins with ./ or ../
-- |
-- | ```purs
-- | unsafeWorkerFromPath "./output/My.Bundled.Output/index.js"
-- | ```
unsafeWorkerFromPath :: forall a i o. Sendable o => String -> Worker a i o
unsafeWorkerFromPath = unsafeMakeImpl <<< { filePath: _, export: "" }

unsafeWorkerFromPathAndExport :: forall a i o. Sendable o => { filePath :: String, export :: String } -> Worker a i o
unsafeWorkerFromPathAndExport = unsafeMakeImpl
unsafeWorkerFromPath = unsafeMakeImpl <<< { filePath: _ }

-- | Instantiates a new worker thread. If this worker subscribes to input, it
-- | will need to be cleaned up with `terminate`, otherwise it will hold your
Expand Down
2 changes: 1 addition & 1 deletion src/Node/WorkerBees/Aff.purs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ post :: forall i. Sendable i => i -> WorkerThread i -> Aff Unit
post i = liftEffect <<< Worker.post i

-- | Terminates the worker thread.
terminate :: forall i . WorkerThread i -> Aff Unit
terminate :: forall i. WorkerThread i -> Aff Unit
terminate worker = invincible $ makeAff \k -> do
Worker.terminate worker k
pure mempty
2 changes: 1 addition & 1 deletion src/Node/WorkerBees/Aff/Pool.purs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ terminate (WorkerPool { queue, threads }) = do
for_ pending (AVar.kill termError <<< snd)

-- | Submits a new input to the worker pool, and waits for the reply.
invoke :: forall i o . Sendable i => WorkerPool i o -> i -> Aff o
invoke :: forall i o. Sendable i => WorkerPool i o -> i -> Aff o
invoke (WorkerPool { queue }) i = do
res <- AVar.empty
AVar.put (Tuple i res) queue
Expand Down
10 changes: 6 additions & 4 deletions test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import Node.WorkerBees (Worker)
import Node.WorkerBees as Worker
import Node.WorkerBees.Aff.Pool as WorkerPool

worker :: Worker Unit Int String
worker = Worker.make (Worker.lift show)

main :: Effect Unit
main = Aff.launchAff_ do
Console.log "Main..."
res <- WorkerPool.poolTraverse worker unit 4 (Array.range 1 100)

let
worker :: Worker Unit Int String
worker = Worker.unsafeWorkerFromPath "./output/Test.Worker.js"

res <- WorkerPool.poolTraverse worker unit 1 (Array.range 1 100)
Console.logShow res
10 changes: 10 additions & 0 deletions test/Worker.purs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module Test.Worker where

import Prelude

import Data.Int as Int
import Effect (Effect)
import Node.WorkerBees as Worker

main :: Effect Unit
main = Worker.makeAsMain (Worker.lift (Int.toStringAs Int.decimal))