Browse files

Beginning to work on merging the BH port

  • Loading branch information...
1 parent e8875b8 commit 05d03af6f29b4439e05bd5b857bef3a0a0f77f26 @pereckerdal committed May 26, 2009
Showing with 1,484 additions and 881 deletions.
  1. +4 −0 data.scm
  2. +12 −0 exception.scm
  3. +503 −21 match.scm
  4. +6 −20 otp/gen_event.scm
  5. +10 −0 otp/gen_server.scm
  6. +59 −51 recv.scm
  7. +149 −789 termite.scm
  8. +710 −0 termite_core.scm
  9. +31 −0 utils.scm
View
4 data.scm
@@ -2,6 +2,10 @@
;; (it would be "better" if those were implemented functionally)
+(import termite_core
+ match
+ recv)
+
;; ----------------------------------------------------------------------------
;; Cells
View
12 exception.scm
@@ -0,0 +1,12 @@
+
+;; NOTE It might be better to integrate with Gambit's exception mechanism
+(define-type termite-exception
+ id: 6a3a285f-02c4-49ac-b00a-aa57b1ad02cf
+ origin
+ reason
+ object)
+
+
+;; Default callback for received exceptions.
+(define (handle-exception-message event)
+ (raise event))
View
524 match.scm
@@ -1,25 +1,507 @@
-(define-macro (match/action on-success on-fail datum . clauses)
- (let ((tmp (gensym))
- (succ (gensym))
- (fail (gensym)))
+;; ----------------------------------------------------------------------------
+;; Erlang-style pattern matching for Scheme
+;;
+;; TODO
+;; - handle vectors (!)
- `(let ((,tmp ,datum)
- (,succ (lambda () ,on-success)) ;; the thunk for success is lifted
- (,fail (lambda () ,on-fail))) ;; the thunk for failure is lifted
-
- ,(compile-pattern-match `(,succ) `(,fail) clauses tmp))))
+;; tree-based pattern matching optimization
-(define-macro (match datum . clauses)
- (let ((tmp (gensym))
- (fail (gensym)))
+(syntax-begin
- `(let* ((,tmp ,datum)
- (,fail (lambda ()
- (raise
- (list bad-match: ,tmp)))))
- ,(compile-pattern-match
- #f
- `(,fail)
- clauses
- tmp))))
+ (import utils)
+
+ ;; Clause manipulation
+
+ ;; 2 possible clause expression:
+
+ ;; (match data
+ ;; (clause (where guard) . code)
+ ;; (clause . code))
+
+ (define-record-type clause/rt
+ (make-clause pattern guard code)
+ clause?
+ (pattern clause-pattern)
+ (guard clause-guard)
+ (code clause-code))
+
+ ;; accumulate every part of the tree which satisfies PRED? and only go
+ ;; down the child satisfying GO-DOWN?
+ (define (tree-filter pred? go-down? tree)
+ (cond
+ ((pred? tree)
+ (list tree))
+ ((and (pair? tree) (go-down? tree))
+ (append (tree-filter pred? go-down? (car tree))
+ (tree-filter pred? go-down? (cdr tree))))
+ (else '())))
+
+ ;; compile-pattern-match: generate the code for the pattern matching
+ ;;
+ ;; on-success: code to insert when a clause matches
+ ;; on-fail: code to execute when the whole pattern fail
+ ;; clause-list: list of all the pattern clauses
+ ;; args: the name of the variable holding the value we're matching
+ ;; against (bad name...)
+ (define (compile-pattern-match mac-env
+ expand-env
+ on-success
+ on-fail
+ clause-list
+ args)
+
+ ;; remove duplicates (bad name...)
+ (define (delete-duplicates lst)
+ (cond
+ ((null? lst)
+ '())
+ ((let ((search-for (car lst)))
+ (let loop ((list (cdr lst)))
+ (cond
+ ((null? list) #f)
+ ((identifier=? expand-env (car list)
+ expand-env search-for) #t)
+ (else (loop (cdr list))))))
+ (delete-duplicates (cdr lst)))
+ (else
+ (cons (car lst)
+ (delete-duplicates (cdr lst))))))
+
+ (define (quoted-identifier? datum)
+ (and
+ (pair? datum)
+ (identifier=? expand-env (car datum)
+ mac-env 'quote)
+ (pair? (cdr datum))
+ (identifier? (cadr datum))))
+
+ (define (unquoted-identifier? datum)
+ (and
+ (pair? datum)
+ (identifier=? expand-env (car datum)
+ mac-env 'unquote)
+ (pair? (cdr datum))
+ (identifier? (cadr datum))))
+
+ ;; call the translation on each clause sequentially
+ (define (translate clauses made uenv ienv)
+ (if (null? clauses)
+ on-fail
+ (let ((clause (car clauses))
+ (f-k (lambda (made uenv ienv)
+ (translate
+ (cdr clauses)
+ made
+ '()
+ ienv))))
+
+ (pattern-walk (clause-pattern clause)
+ made
+ uenv
+ ienv
+ args
+ (lambda (made uenv ienv)
+ ;; TODO: remove this ugly test (and the
+ ;; on-success test...)
+ (if (eq? #t (clause-guard clause))
+
+ (if on-success
+ `(begin
+ ,on-success
+ ,((clause-code clause) uenv))
+ ((clause-code clause) uenv))
+
+ `(if (let ,(map (lambda (x)
+ (cons (make-syntactic-closure
+ expand-env
+ '()
+ (car x))
+ (cdr x)))
+ uenv)
+ ,(clause-guard clause)) ;; ~if ?
+ ,(if on-success
+ `(begin
+ ,on-success
+ ,((clause-code clause) uenv))
+ ((clause-code clause) uenv))
+ ,(f-k made uenv ienv))))
+ f-k))))
+
+ ;; is that variable already bound?
+ (define (bound? var env)
+ (assoc-pred var
+ env
+ (lambda (a b)
+ (identifier=? expand-env a
+ expand-env b))))
+
+ ;; what code is it bound to
+ (define (lookup var env)
+ (let ((ret (bound? var env)))
+ (and ret (cadr ret))))
+
+ (define (rlookup val env)
+ (let ((v (assoc val (map (lambda (p)
+ (cons (cadr p)
+ (car p)))
+ env))))
+ (if (not v)
+ (error "can't find it:" (list val: val env: env))
+ (cdr v))))
+
+ ;; extend the env. with a new var
+ (define (extend var val env)
+ (cons (list var val) env))
+
+ ;; this is the compilation function that goes 'down' a clause
+ ;; patt: the pattern to match
+ ;; made: the tests made, an a-list of (test-code . result)
+ ;; uenv: the user env., ex. the x in (match 123 (x x))
+ ;; ienv: the 'internal' env, bindings introduced by the macro
+ ;; acc: the current 'accessor', that is the way to get to the current patt
+ ;; s-k: success continuation, if the match succeeds
+ ;; f-k: failure continuation, if the match fails
+ (define (pattern-walk patt made uenv ienv acc s-k f-k)
+
+ ;; has that test already been made
+ (define (test-made? test)
+ (assoc test made))
+
+ ;; did it fail or succeed
+ (define (test-result test)
+ (cdr (assoc test made)))
+
+ ;; build one of those test
+ (define (make-test test var . val)
+ (if (pair? val)
+ `(,test ,var ,(car val))
+ `(,test ,var)))
+
+ ;; add a test that succeeded
+ (define (add-t-test test)
+ (cons (cons test #t) made))
+
+ ;; add a test that failed
+ (define (add-f-test test)
+ (cons (cons test #f) made))
+
+ ;; check to see if a test has already succeded in another context,
+ ;; that would mean trying the current test would fail (I take this
+ ;; assumes every test is mutually exclusive...)
+ (define (test-would-fail? test)
+ (let ((acc (cadr test))
+ (successful-tests (filter cdr made)))
+ (member acc (map cadar successful-tests))))
+
+ ;; generate an IF if both branches are different
+ (define (~if test succ fail)
+ (if (equal? succ fail)
+ succ
+ `(if ,test
+ ,succ
+ ,fail)))
+
+ ;; main body of pattern-walk
+ (cond
+ ;; if the pattern is null
+ ((null? patt)
+ (let ((test (make-test 'null? acc)))
+ (cond
+ ((test-made? test)
+ (if (test-result test)
+ (s-k made uenv ienv)
+ (f-k made uenv ienv)))
+
+ ((test-would-fail? test)
+ (f-k made uenv ienv))
+
+ (else
+ (~if test
+ (s-k (add-t-test test) uenv ienv)
+ (f-k (add-f-test test) uenv ienv))))))
+
+ ;; is the pattern some constant value?
+ ((or (quoted-identifier? patt)
+ (number? patt) ;; fixme: bignums wont work (because of eq?)
+ (eq? #t patt)
+ (eq? #f patt)
+ (char? patt)
+ (keyword? patt))
+ (let ((test (make-test 'eq?
+ acc
+ (if (quoted-identifier? patt)
+ `',(cadr patt)
+ patt))))
+ (cond
+ ((test-made? test)
+ (if (test-result test)
+ (s-k made uenv ienv)
+ (f-k made uenv ienv)))
+
+ ((test-would-fail? test)
+ (f-k made uenv ienv))
+
+ (else
+ (~if test
+ (s-k (add-t-test test) uenv ienv)
+ (f-k (add-f-test test) uenv ienv))))))
+
+ ;; is the pattern an unquoted symbol (reference to the user env)?
+ ((unquoted-identifier? patt)
+ (let ((test (make-test 'eq? acc (cadr patt))))
+ (~if test
+ (s-k made uenv ienv)
+ (f-k made uenv ienv))))
+
+ ;; is the pattern a pair?
+ ((pair? patt)
+ (let ((test (make-test 'pair? acc)))
+
+ (cond
+ ;; test done already
+ ((test-made? test)
+
+ (if (test-result test)
+
+ (pattern-walk (car patt)
+ made
+ uenv
+ ienv
+ (rlookup `(car ,acc) ienv)
+ (lambda (made uenv ienv)
+ (pattern-walk (cdr patt)
+ made
+ uenv
+ ienv
+ (rlookup `(cdr ,acc) ienv)
+ s-k
+ f-k))
+ f-k)
+ (f-k made uenv ienv)))
+
+ ;; another test succeded meaning this one would fail
+ ((test-would-fail? test)
+ (f-k made uenv ienv))
+
+ ;; do the test
+ (else
+ (~if test
+
+ (let ((?car (gensym))
+ (?cdr (gensym)))
+
+ (let ((ienv (extend ?car
+ `(car ,acc)
+ (extend ?cdr
+ `(cdr ,acc)
+ ienv))))
+
+ `(let ((,?car (car ,acc))
+ (,?cdr (cdr ,acc)))
+
+ ,(pattern-walk (car patt)
+ (add-t-test test)
+ uenv
+ ienv
+ ?car
+ (lambda (made uenv ienv)
+ (pattern-walk (cdr patt)
+ made
+ uenv
+ ienv
+ ?cdr
+ s-k
+ f-k))
+ f-k))))
+
+ (f-k (add-f-test test) uenv ienv))))))
+
+ ;; is it a 'free' symbol, to be bound to a new value or compared
+ ;; to a previous value which it was bound to during the pattern
+ ;; matching?
+ ((identifier? patt)
+ (if (bound? patt uenv)
+ (let ((test (make-test 'eq? acc (lookup patt uenv))))
+
+ (if (test-made? test)
+ (if (test-result test)
+ (s-k made uenv ienv)
+ (f-k made uenv ienv))
+ (~if test
+ (s-k (add-t-test test) uenv ienv)
+ (f-k (add-f-test test) uenv ienv))))
+
+ (if (identifier=? expand-env patt
+ mac-env '_)
+ (s-k made uenv ienv)
+ (s-k made (extend patt acc uenv) ienv))))
+
+ ;; maybe it's something we don't handle in here
+ (else
+ (error "unknown pattern" patt))))
+
+ ;; compile-pattern-match main body
+
+ ;; this code build clauses, then extract the [non-trivial]
+ ;; CLAUSE-CODEs and lift them outside WARNING: hairy code, fixme
+ (let* ((transform
+ (map
+ (lambda (clause)
+ (let ((pattern (car clause))
+ (guard
+ (let ((g (map
+ cadr
+ (filter
+ (lambda (x)
+ (and (pair? x)
+ (identifier=? expand-env
+ (car x)
+ mac-env
+ 'where)))
+ (cdr clause)))))
+ (case (length g)
+ ((0)
+ #t)
+ ((1)
+ (make-syntactic-closure expand-env
+ '()
+ (car g)))
+ (else
+ (make-syntactic-closure
+ expand-env
+ '()
+ `(,(make-syntactic-closure mac-env
+ '()
+ 'and)
+ ,@g))))))
+ (code
+ (let ((c (remove
+ (lambda (x)
+ (and (pair? x)
+ (identifier=? expand-env
+ (car x)
+ mac-env
+ 'where)))
+ (cdr clause))))
+ (case (length c)
+ ((0) #t) ;; ???
+ ((1) (car c))
+ (else `(begin ,@c))))))
+ (make-clause pattern
+ guard
+ code)))
+ clause-list))
+
+ (data (map (lambda (clause)
+
+ (let ((code-label (gensym))
+ (var-list
+ (delete-duplicates
+ (tree-filter
+ (lambda (t)
+ (and (identifier? t)
+ (not
+ (identifier=? expand-env t
+ mac-env '_))))
+ (lambda (t)
+ (not
+ (or (unquoted-identifier? t)
+ (quoted-identifier? t))))
+ (clause-pattern clause))))
+ (code (clause-code clause)))
+
+ (let ((lifted `(,code-label
+ ,(make-syntactic-closure
+ expand-env
+ '()
+ `(,(make-syntactic-closure
+ mac-env
+ '()
+ 'lambda)
+ ,var-list
+ ,code))))
+
+ ;; trivial non-triviality test
+ (not-trivial? (and (pair? code)
+ (not (quoted-identifier? code)))))
+
+ (cons (and not-trivial? lifted)
+ (make-clause
+ (clause-pattern clause)
+ (clause-guard clause)
+ (lambda (env)
+ (if not-trivial?
+ (cons code-label
+ (map
+ (lambda (var)
+ (lookup var env))
+ var-list))
+ (if (identifier? code)
+ (make-syntactic-closure
+ expand-env
+ '()
+ (or (lookup code env) code))
+ code))))))))
+ transform)))
+
+ `(let ,(map car (filter car data))
+ ,(translate (map cdr data) '() '() '())))))
+
+(define-syntax match/action
+ (sc-macro-transformer
+ (lambda (form env)
+ (let ((on-success (cadr form))
+ (on-fail (caddr form))
+ (datum (cadddr form))
+ (clauses (cddddr form)))
+ `(let ((tmp ,(make-syntactic-closure env '() datum))
+ ;; the thunk for success is lifted
+ (succ (lambda ()
+ ,(make-syntactic-closure env '() on-success)))
+ ;; the thunk for failure is lifted
+ (fail (lambda ()
+ ,(make-syntactic-closure env '() on-fail))))
+
+ ,(capture-syntactic-environment
+ (lambda (mac-env)
+ (compile-pattern-match mac-env
+ env
+ (make-syntactic-closure
+ mac-env
+ '()
+ '(succ))
+ (make-syntactic-closure
+ mac-env
+ '()
+ '(fail))
+ clauses
+ (make-syntactic-closure
+ mac-env
+ '()
+ 'tmp)))))))))
+
+
+(define-syntax match
+ (sc-macro-transformer
+ (lambda (form env)
+ (let ((datum (cadr form))
+ (clauses (cddr form)))
+ `(let* ((tmp ,(make-syntactic-closure env '() datum))
+ (fail (lambda ()
+ (raise
+ (list bad-match: tmp)))))
+ ,(capture-syntactic-environment
+ (lambda (mac-env)
+ (compile-pattern-match mac-env
+ env
+ #f
+ (make-syntactic-closure
+ mac-env
+ '()
+ '(fail))
+ clauses
+ (make-syntactic-closure
+ mac-env
+ '()
+ 'tmp)))))))))
View
26 otp/gen_event.scm
@@ -7,6 +7,11 @@
;;; CALL :: args state -> reply state
;;; TERMINATE :: reason state -> void
+(import ../recv
+ ../termite_core
+ ../match
+ ../utils)
+
(define-type event-handler
id: 1d3007b8-c5aa-4090-ab55-e352040a4498
read-only:
@@ -59,7 +64,7 @@
(void)))))
(define (internal-event-manager-start spawner handlers)
- (let ((em (spawn event-manager)))
+ (let ((em (spawner event-manager)))
(for-each
(lambda (handler)
(event-manager:add-handler em handler))
@@ -83,22 +88,3 @@
(define (event-manager:stop event-manager)
(! event-manager (list 'stop)))
-
-
-;; build a trivial event handler with no state, only invoking a
-;; callback on any event
-(define (make-simple-event-handler callback initial-state)
- (make-event-handler
- ;; INIT
- (lambda (args)
- initial-state)
- ;; NOTIFY
- (lambda (event state)
- (callback event state))
- ;; CALL
- (lambda (args state)
- (values (void) state))
- ;; TERMINATE
- (lambda (reason state)
- (void))))
-
View
10 otp/gen_server.scm
@@ -7,6 +7,16 @@
;;; CAST :: term state -> state
;;; TERMINATE :: reason state -> void
+(import ../termite_core
+ ../recv)
+
+(export make-server-plugin
+ server:start
+ server:start-link
+ server:call
+ server:cast
+ server:stop)
+
(define-type server-plugin
id: 2ca2d07c-5d6a-44a8-98eb-422b2b8e7296
read-only:
View
110 recv.scm
@@ -1,57 +1,65 @@
-;; All hail the RECV form
-(define-macro (recv . clauses)
- (let ((msg (gensym 'msg)) ;; the current mailbox message
- (loop (gensym 'loop))) ;; the mailbox seeking loop
-
- ;; check the last clause to see if it's a timeout
- (let ((sesualc (reverse clauses)))
- (if (and (pair? (car sesualc))
- (eq? (caar sesualc) 'after))
-
- (let ((clauses (reverse (cdr sesualc)))
- ;; the code to compute the timeout
- (init (cadar sesualc))
- ;; the variable holding the timeout
- (timeout (gensym 'timeout))
- ;; the code to be executed on a timeout
- (on-timeout (cddar sesualc))
- ;; the timeout exception-handler to the whole match
- (e (gensym 'e)))
-
- ;; RECV code when there is a timeout
- `(let ((,timeout ,init))
- (with-exception-catcher
- (lambda (,e)
- (if (mailbox-receive-timeout-exception? ,e)
- (begin
- (thread-mailbox-rewind)
- ,@on-timeout)
- (raise ,e)))
- (lambda ()
- (let ,loop ((,msg (thread-mailbox-next ,timeout)))
- (match/action
- (thread-mailbox-extract-and-rewind)
- (,loop
- (thread-mailbox-next ,timeout))
- ,msg
- ;; extra clause to handle system events
- (event
- (where (termite-exception? event))
- (handle-exception-message event))
- ;; the user clauses
- ,@clauses))))))
+(import (only: match match/action)
+ exception)
- ;; RECV code when there is no timeout
- `(let ,loop ((,msg (thread-mailbox-next)))
+;; All hail the RECV form
+(define-syntax recv
+ (sc-macro-transformer
+ (lambda (form env)
+ (let ((clauses (cdr form)))
+
+ ;; check the last clause to see if it's a timeout
+ (let ((sesualc (reverse clauses)))
+ (if (and (pair? (car sesualc))
+ (eq? (caar sesualc) 'after))
+
+ (let ((clauses (map (lambda (x)
+ (make-syntactic-closure env '() x))
+ (reverse (cdr sesualc))))
+ ;; the code to compute the timeout
+ (init (make-syntactic-closure
+ env
+ '()
+ (cadar sesualc)))
+ ;; the code to be executed on a timeout
+ (on-timeout (map (lambda (x)
+ (make-syntactic-closure env '() x))
+ (cddar sesualc))))
+
+ ;; RECV code when there is a timeout
+ `(let ((timeout ,init))
+ (with-exception-catcher
+ (lambda (e)
+ (if (mailbox-receive-timeout-exception? e)
+ (begin
+ (thread-mailbox-rewind)
+ ,@on-timeout)
+ (raise e)))
+ (lambda ()
+ (let loop ((msg (thread-mailbox-next timeout)))
+ (match/action
+ (thread-mailbox-extract-and-rewind)
+ (loop
+ (thread-mailbox-next timeout))
+ msg
+ ;; extra clause to handle system events
+ (event
+ (where (termite-exception? event))
+ (handle-exception-message event))
+ ;; the user clauses
+ ,@clauses))))))
+
+ ;; RECV code when there is no timeout
+ `(let loop ((msg (thread-mailbox-next)))
(match/action
(thread-mailbox-extract-and-rewind)
- (,loop
+ (loop
(thread-mailbox-next))
- ,msg
+ msg
;; extra clause to handle system events
- (event
- (where (termite-exception? event))
- (handle-exception-message event))
+ (event
+ (where (termite-exception? event))
+ (handle-exception-message event))
;; the user clauses
- ,@clauses))))))
-
+ ,@(map (lambda (x)
+ (make-syntactic-closure env '() x))
+ clauses)))))))))
View
938 termite.scm
@@ -2,713 +2,46 @@
;; File: "termite.scm"
;; this is the main file for the Termite system
-(##namespace ("termite#"))
-(##include "~~/lib/gambit#.scm")
-(##include "termite#.scm")
-
-;; ----------------------------------------------------------------------------
-;; System configuration & global data
-
-(define *termite-cookie* (getenv "TERMITE_COOKIE" #f))
-
-(define current-node (lambda () (error "uninitialized node")))
-
-(define *global-mutex* (make-mutex "global termite mutex"))
-
-;; translation tables for "published" PIDs
-(define *foreign->local* (make-table weak-values: #t))
-(define *local->foreign* (make-table weak-keys: #t))
-;; translation table for "published" tags
-(define *uuid->tag* (make-table weak-values: #t))
-
-;; Get the current time in seconds.
-(define (now)
- (time->seconds
- (current-time)))
-
-;; TODO Improve this
-(define (formatted-current-time)
- (let* ((port (open-process "date"))
- (time (read-line port)))
- (close-port port)
- time))
-
-;; ----------------------------------------------------------------------------
-;; Datatypes
-
-(define (process? obj) (thread? obj))
-(define (process-links pid) (thread-specific pid))
-(define (process-links-set! pid obj) (thread-specific-set! pid obj))
-
-;; universal pid
-(define-type upid
- id: 9e096e09-8c66-4058-bddb-e061f2209838
- tag
- node)
-
-;; nodes
-(define-type node
- id: 8992144e-4f3e-4ce4-9d01-077576f98bc5
- read-only:
- host
- port)
-
-;; tags
-(define-type tag
- id: efa4f5f8-c74c-465b-af93-720d44a08374
- (uuid init: #f))
-
-;; * Test whether 'obj' is a pid.
-(define (pid? obj)
- (or (process? obj) (upid? obj)))
-
-
-;; NOTE It might be better to integrate with Gambit's exception mechanism
-(define-type termite-exception
- id: 6a3a285f-02c4-49ac-b00a-aa57b1ad02cf
- origin
- reason
- object)
-
-
-;; ----------------------------------------------------------------------------
-;; process manipulation primitives
-
-;; * Get the pid of the current process.
-(define self current-thread)
-
-;; Base exception handler for Termite processes.
-(define (base-exception-handler e)
- (continuation-capture
- (lambda (k)
- (let ((log-crash
- (lambda (e)
- (termite-log
- 'error
- (call-with-output-string ""
- (lambda (port)
- (display "#|\n" port)
- (display-exception-in-context
- e
- k
- port)
- ; todo: provide a safe wrapper in Gambit runtime?
- (##cmd-b k port 0)
- (display "|#\n" port)))))))
- (cond
- ;; Propagated Termite exception?
- ((termite-exception? e)
- (if (not (eq? (termite-exception-reason e) 'normal))
- (log-crash (termite-exception-object e)))
- (for-each
- (lambda (pid) (! pid e))
- (process-links (self)))
- (halt!))
- ;; Gambit exception in the current process
- (else
- (log-crash e)
- (for-each
- (lambda (pid)
- (! pid (make-termite-exception (self) 'failure e)))
- (process-links (self)))
- (halt!)))))))
-
-
-;; * Start a new process executing the code in 'thunk'.
-(define (spawn thunk #!key (links '()))
- (let ((t (make-thread
- (lambda ()
- (with-exception-handler
- base-exception-handler
- thunk)
- (shutdown!)))))
- (thread-specific-set! t links)
- (thread-start! t)
- t))
-
-
-(define (spawn-linked-to to thunk)
- (spawn thunk links: (list to)))
-
-
-;; * Start a new process with a bidirectional link to the current
-;; process.
-(define (spawn-link thunk)
- (let ((pid (spawn thunk links: (list (self)))))
- (outbound-link pid)
- pid))
-
-
-;; * Start a new process on remote node 'node', executing the code
-;; in 'thunk'.
-(define (remote-spawn node thunk #!key (links '()))
- (if (equal? node (current-node))
- (spawn thunk links: links)
- (!? (remote-service 'spawner node)
- (list 'spawn thunk links))))
-
-
-;; * Start a new process on remote node 'node', with a bidirectional
-;; link to the current process.
-(define (remote-spawn-link node thunk)
- (let ((pid (remote-spawn node thunk links: (list (self)))))
- (outbound-link pid)
- pid))
-
-
-;; * Cleanly stop the execution of the current process. Linked
-;; processes will receive a "normal" exit message.
-(define (shutdown!)
- (for-each
- (lambda (pid)
- (! pid (make-termite-exception (self) 'normal #f)))
- (process-links (self)))
- (halt!))
-
-;; this is *not* nice: it wont propagate the exit message to the other
-;; processes
-(define (halt!)
- (thread-terminate! (current-thread)))
-
-
-;; * Forcefully terminate a local process. Warning: it only works on
-;; local processes! This should be used with caution.
-(define (terminate! victim)
- (thread-terminate! victim)
- (for-each
- (lambda (link)
- (! link (make-termite-exception victim 'terminated #f)))
- (process-links victim)))
-
-
-;; TODO 'wait-for' and 'alive?' should be grouped in a more general
-;; procedure able to determine the status of a process (alive, dead,
-;; waiting, etc.) and the procedure should work on remote processes
-
-;; * Wait for the end of a process 'pid'. Does not return anything.
-;; Warning: will not work on remote processes.
-(define (%wait-for pid)
- (with-exception-catcher
- (lambda (e)
- (void))
- (lambda ()
- (thread-join! pid)
- (void))))
-
-
-;; Check whether the process 'pid' is still alive. Warning: will not
-;; work on remote processes.
-(define (%alive? pid)
- (with-exception-catcher
- (lambda (e)
- (join-timeout-exception? e))
- (lambda ()
- (thread-join! pid 0)
- #f)))
-
-
-;; ----------------------------------------------------------------------------
-;; Sending messages
-
-;; * Send a message 'msg' to 'pid'. This means that the message will
-;; be enqueued in the mailbox of the destination process.
-;;
-;; Delivery of the message is unreliable in theory, but in practice
-;; local messages will always be delivered, and remote messages will
-;; not be delivered only if the connection is currently broken to the
-;; remote node, or if the remote node is down.
-;;
-;; Note that you will not get an error or an exception if the message
-;; doesn't get there: you need to handle errors yourself.
-(define (! to msg)
- (cond
- ((process? to)
- (thread-send to msg))
- ((upid? to)
- (thread-send dispatcher (list 'relay to msg)))
- (else
- (error "invalid-message-destination" to))))
-
-
-;; ----------------------------------------------------------------------------
-;; Receiving messages
-
-;; incorrect, because it doesn't handle exception messages
-;; (define ? thread-receive)
-
-;; * Retrieve the first message from the mailbox of the current
-;; process. If no message is available, the process will block until
-;; a message is received. If 'timeout' is specified, the process will
-;; only block for that amount of time, and then raise an exception.
-;; It is possible to also pass the 'default' argument to return a
-;; value instead of raising an exception.
-(define (? . opt) ;; TODO: inefficient, fix
- (match opt
- (()
- (recv
- (msg msg)))
-
- ((timeout)
- (recv
- (msg msg)
- (after timeout (thread-receive 0))))
-
- ((timeout default)
- (recv
- (msg msg)
- (after timeout default)))))
-
-
-;; benchmark to see if faster...
-;; (define (? #!optional (timeout +inf.0) (default (lambda (thread-receive 0))))
-;; (with-exception-catcher
-;; (lambda (exception)
-;; (if (mailbox-receive-timeout-exception? exception)
-;; (default)
-;; (raise exception)))
-;; (lambda ()
-;; (thread-receive timeout))))
-
-
-;; * Retrieve the first message from the mailbox of the current
-;; process that satisfised the predicate 'pred?'. If no message
-;; qualifies, the process will block until a message satisfying the
-;; predicate is received. If 'timeout' is specified, the process will
-;; only block for that amount of time, and then raise an exception.
-;; It is possible to also pass the 'default' argument to return a
-;; value instead of raising an exception.
-;; TODO: inefficient, fix
-(define (?? pred? . opt)
- (match opt
- (()
- (recv
- (msg (where (pred? msg)) msg)))
-
- ((timeout)
- (recv
- (msg (where (pred? msg)) msg)
- (after timeout (thread-receive 0))))
-
- ((timeout default)
- (recv
- (msg (where (pred? msg)) msg)
- (after timeout default)))))
-
-
-;; ----------------------------------------------------------------------------
-;; Higher-order concurrency primitives
-
-;; * Send a "synchronous" message to a process. The message will be
-;; annotated with a tag and the pid of the current process, therefore
-;; sending a message of the form '(from tag msg)'. The server
-;; receiving the message must specifically handle that format of
-;; message, and reply with a message of the form '(tag reply)'.
-;;
-;; Like for the |?| and |??| message retrieving operators, it is
-;; possible to specify a 'timeout' to limit the amount of time to wait
-;; for a reply, and a 'default' value to return if no reply has been
-;; received.
-;; RPC
-(define (!? pid msg . opt)
- (let ((tag (make-tag)))
- (! pid (list (self) tag msg))
-
- (match opt
- (()
- (recv
- ((,tag reply) reply)))
-
- ((timeout)
- (recv
- ((,tag reply) reply)
- (after timeout (raise 'timeout))))
-
- ((timeout default)
- (recv
- ((,tag reply) reply)
- (after timeout default))))))
-
-
-;; * Evaluate a 'thunk' on a remote node and return the result of that
-;; evaluation. Just like for |!?|, |?| and |??|, it is possible to
-;; specify a 'timeout' and a 'default' argument.
-(define (on node thunk)
- (let ((tag (make-tag))
- (from (self)))
- (remote-spawn node
- (lambda ()
- (! from (list tag (thunk)))))
- (recv
- ((,tag reply) reply))))
-
-
-;; ----------------------------------------------------------------------------
-;; Links and exception handling
-
-;; Default callback for received exceptions.
-(define (handle-exception-message event)
- (raise event))
-
-;; * Link another process 'pid' /to/ the current one: any exception
-;; not being caught by the remote process and making it crash will be
-;; propagated to the current process.
-(define (inbound-link pid)
- (! linker (list 'link pid (self))))
-
-
-;; * Link the current process /to/ another process 'pid': any
-;; exception not being caught by the current process will be
-;; propagated to the remote process.
-(define (outbound-link pid)
- (let* ((links (process-links (self))))
- (if (not (memq pid links))
- (process-links-set! (self) (cons pid links)))))
-
-
-;; * Link bidirectionally the current process with another process
-;; 'pid': any exception not being caught in any of the two processes
-;; will be propagated to the other one.
-(define (full-link pid)
- (inbound-link pid)
- (outbound-link pid))
-
-
-;; ----------------------------------------------------------------------------
-;; Termite I/O
-
-;; Wraps 'pid's representing Gambit output ports.
-(define-type termite-output-port
- id: b0c30401-474c-4e83-94b4-d516e00fe363
- unprintable:
- pid)
-
-;; Wraps 'pid's representing Gambit input ports.
-(define-type termite-input-port
- id: ebb22fcb-ca61-4765-9896-49e6716471c3
- unprintable:
- pid)
-
-;; Start a process representing a Gambit output port.
-(define (spawn-output-port port #!optional (serialize? #f))
- (output-port-readtable-set!
- port
- (readtable-sharing-allowed?-set
- (output-port-readtable port)
- serialize?))
-
- (make-termite-output-port
- (spawn
- (lambda ()
- (let loop ()
- (recv
- (proc
- (where (procedure? proc))
- (proc port))
- (x (warning "unknown message sent to output port: " x)))
- (loop))))))
-
-;; Start a process representing a Gambit input port.
-(define (spawn-input-port port #!optional (serialize? #f))
- (input-port-readtable-set!
- port
- (readtable-sharing-allowed?-set
- (input-port-readtable port)
- serialize?))
-
- (make-termite-input-port
- (spawn
- (lambda ()
- (let loop ()
- (recv
- ((from token proc)
- (where (procedure? proc))
- (! from (list token (proc port))))
- (x (warning "unknown message sent to input port: " x)))
- (loop))))))
-
-;; IO parameterization
-;; (define current-termite-input-port (make-parameter #f))
-;; (define current-termite-output-port (make-parameter #f))
-
-;; insert IO overrides
-;; (include "termiteio.scm")
-
-
-;; ----------------------------------------------------------------------------
-;; Distribution
-
-;; Convert a 'pid'
-(define (pid->upid obj)
- (mutex-lock! *global-mutex*)
- (cond
- ((table-ref *local->foreign* obj #f)
- => (lambda (x)
- (mutex-unlock! *global-mutex*)
- x))
- (else
- (let ((upid (make-upid (make-uuid) (current-node))))
- (table-set! *local->foreign* obj upid)
- (table-set! *foreign->local* upid obj)
- (mutex-unlock! *global-mutex*)
- upid))))
-
-(define (tag->utag obj)
- (mutex-lock! *global-mutex*)
- (cond
- ((tag-uuid obj)
- (mutex-unlock! *global-mutex*)
- obj)
- (else
- (let ((uuid (make-uuid)))
- (tag-uuid-set! obj uuid)
- (table-set! *uuid->tag* uuid obj)
- (mutex-unlock! *global-mutex*)
- obj))))
-
-
-(define (serialize-hook obj)
- (cond
- ((process? obj)
- (pid->upid obj))
-
- ((tag? obj)
- (tag->utag obj))
-
- ;; unserializable objects, so instead of crashing we set them to #f
- ((or (port? obj))
- #f)
-
- (else obj)))
-
-(define (upid->pid obj)
- (cond
- ((table-ref *foreign->local* obj #f)
- => (lambda (pid) pid))
- ((and (symbol? (upid-tag obj))
- (resolve-service (upid-tag obj)))
- => (lambda (pid)
- pid))
- (else
- (error "don't know how to upid->pid"))))
-
-(define (utag->tag obj)
- (let ((uuid (tag-uuid obj)))
- (cond
- ((table-ref *uuid->tag* uuid #f)
- => (lambda (tag) tag))
- (else obj))))
-
-(define (deserialize-hook obj)
- (cond
- ((and (upid? obj)
- (equal? (upid-node obj)
- (current-node)))
- (upid->pid obj))
- ((tag? obj)
- (utag->tag obj))
- (else obj)))
-
-
-(define (serialize obj port)
- (let* ((serialized-obj
- (object->u8vector obj serialize-hook))
- (len
- (u8vector-length serialized-obj))
- (serialized-len
- (u8vector (bitwise-and len #xff)
- (bitwise-and (arithmetic-shift len -8) #xff)
- (bitwise-and (arithmetic-shift len -16) #xff)
- (bitwise-and (arithmetic-shift len -24) #xff))))
-
- (begin
- (write-subu8vector serialized-len 0 4 port)
- (write-subu8vector serialized-obj 0 len port))))
-
-
-(define (deserialize port)
- (let* ((serialized-len
- (u8vector 0 0 0 0))
- (n
- (read-subu8vector serialized-len 0 4 port)))
-
- (cond ((= 0 n)
- #!eof)
- ((not (= 4 n))
- (error "deserialization error"))
- (else
- (let* ((len
- (+ (u8vector-ref serialized-len 0)
- (arithmetic-shift (u8vector-ref serialized-len 1) 8)
- (arithmetic-shift (u8vector-ref serialized-len 2) 16)
- (arithmetic-shift (u8vector-ref serialized-len 3) 24)))
- (serialized-obj
- (make-u8vector len))
- (n
- (read-subu8vector serialized-obj 0 len port)))
-
- (if (not (eqv? len n))
- (begin
- (error "deserialization error"
- (list len: len n: n)))
- (let ((obj (u8vector->object serialized-obj deserialize-hook)))
- (if (vector? obj)
- (vector->list obj)
- obj))))))))
-
-(define (start-serializing-output-port port)
- (spawn-link
- (lambda ()
- (let loop ()
- (recv
- (('write data)
- ;; (debug out: data)
- (serialize data port)
- (force-output port)) ;; io override
-
- (msg
- (warning "serializing-output-port ignored message: " msg)))
- (loop)))))
-
-
-(define (start-serializing-active-input-port port receiver)
- (spawn-link
- (lambda ()
- (let loop ()
- (let ((data (deserialize port)))
- ;; to receive exceptions...
- (? 0 'ok)
- ;; (debug in: data)
- (if (eof-object? data) (shutdown!))
- (! receiver (list (self) data))
- (loop))))))
-
-
-;; a tcp server listens on a certain port for new tcp connection
-;; requests, and call ON-CONNECT to deal with those new connections.
-(define (start-tcp-server tcp-port-number on-connect)
- (let ((tcp-server-port
- (open-tcp-server (list
- port-number: tcp-port-number
- coalesce: #f))))
- (spawn
- (lambda ()
- (let loop ()
- (on-connect (read tcp-server-port)) ;; io override
- (loop))))))
-
-
-;; MESSENGERs act as proxies for sockets to other nodes
-
-;; initiate a new bidirectional connection to another node important:
-;; caller is responsible for registering it with the dispatcher
-(define (initiate-messenger node)
- ;; (print "OUTBOUND connection established\n")
- (spawn
- (lambda ()
- (with-exception-catcher
- (lambda (e)
- (! dispatcher (list 'unregister (self)))
- (shutdown!))
-
- (lambda ()
- (let ((socket (open-tcp-client
- (list server-address: (node-host node)
- port-number: (node-port node)
- coalesce: #f))))
- ;; the real interesting part
- (let ((in (start-serializing-active-input-port socket (self)))
- (out (start-serializing-output-port socket)))
-
- (! out (list 'write (current-node)))
-
- (messenger-loop node in out))))))))
-
-
-;; start a MESSENGER for an 'inbound' connection (another node
-;; initiated the bidirectional connection, see initiate-messenger)
-(define (start-messenger socket)
- ;; (print "INBOUND connection established\n")
- (spawn
- (lambda ()
- (with-exception-catcher
- (lambda (e)
- (! dispatcher (list 'unregister (self)))
- (shutdown!))
-
- (lambda ()
- (let ((in (start-serializing-active-input-port socket (self)))
- (out (start-serializing-output-port socket)))
- (recv
- ((,in node)
- ;; registering messenger to local dispatcher
- (! dispatcher (list 'register (self) node))
- (messenger-loop node in out)))))))))
-
-
-(define (messenger-loop node in out)
- (recv
- ;; incoming message
- ((,in ('relay id message))
- (let ((to (upid->pid (make-upid id (current-node)))))
- (! to message)))
-
- ;; outgoing message
- (('relay to message)
- ;; 'to' is a upid
- (let* ((id (upid-tag to))
- ;; (node (upid-node to))
- ;; (host (node-host node))
- ;; (port (node-id node))
- )
- (! out (list 'write (list 'relay id message)))))
-
- ;; unknown message
- (msg
- (warning "messenger-loop ignored message: " msg)))
-
- (messenger-loop node in out))
-
-
-;; the DISPATCHER dispatches messages to the right MESSENGER, it keeps
-;; track of known remote nodes
-(define dispatcher
- (spawn
- (lambda ()
- ;; the KNOWN-NODES of the DISPATCHER LOOP is an a-list of NODE => MESSENGER
- (let loop ((known-nodes '()))
- (recv
- (('register messenger node)
- (loop (cons (cons node messenger) known-nodes)))
-
- (('unregister messenger)
- (loop (remove (lambda (m) (equal? (cdr m) messenger)) known-nodes)))
-
- (('relay upid message)
- (let ((node (upid-node upid)))
- (cond
- ;; the message should be sent locally (ideally should not happen
- ;; for performance reasons, but if the programmer wants to do
- ;; that, then OK...)
- ((equal? node (current-node))
- (! (upid->pid upid) message)
- (loop known-nodes))
-
- ;; the message is destined to a pid on a known node
- ((assoc node known-nodes)
- => (lambda (messenger)
- (! (cdr messenger) (list 'relay upid message))
- (loop known-nodes)))
-
- ;; unconnected node, must connect
- (else
- (let ((messenger (initiate-messenger node)))
- (! messenger (list 'relay upid message))
- (loop (cons (cons node messenger) known-nodes)))))))
-
- (msg
- (warning "dispatcher ignored message: " msg) ;; uh...
- (loop known-nodes)))))))
+(export
+ ;; Termite "primordials"
+ self ! ? ?? !? on make-node spawn pid?
+ spawn-link remote-spawn remote-spawn-link
+ ;; Useful
+ make-tag current-node
+ ;; Process linking for error propagation
+ inbound-link outbound-link full-link
+ ;; Wrap Gambit's I/O
+ spawn-output-port spawn-input-port
+ ;; Migration
+ migrate-task migrate/proxy
+ ;; Useful condition reporting/logging procedures
+ warning debug info
+ ;; Node stuff
+ node-init node? node-host node-port
+ ;; Nameserver mechanism
+ ;; make-nameserver-node By Per: This doesn't seem to be defined?
+ ;; Publishing and resolving names for services
+ publish-service unpublish-service resolve-service remote-service
+ ;; default init and node names for convenience
+ node1 node2
+ ;; *termite-nameserver-port* By Per: This doesn't seem to be defined?
+ *termite-cookie*
+ ;; Useful
+ ping
+
+ (re-export: uuid
+ otp/gen_server
+ data
+ deftype
+ recv
+ match))
+
+(import termite_core
+ recv
+ data
+ otp/gen_event
+ match)
;; ----------------------------------------------------------------------------
@@ -717,60 +50,60 @@
;; LINKER (to establish exception-propagation links between processes)
(define linker
(spawn
- (lambda ()
- (let loop ()
- (recv
- (('link from to)
- (cond
- ((process? from)
- (process-links-set! from (cons to (process-links from)))) ;;;;;;;;;;
- ((upid? from)
- (! (remote-service 'linker (upid-node from))
- (list 'link from to)))
- (else
- (warning "in linker-loop: unknown object"))))
- (msg
- (warning "linker ignored message: " msg)))
- (loop)))))
+ (lambda ()
+ (let loop ()
+ (recv
+ (('link from to)
+ (cond
+ ((process? from)
+ (process-links-set! from (cons to (process-links from)))) ;;;;;;;;;;
+ ((upid? from)
+ (! (remote-service 'linker (upid-node from))
+ (list 'link from to)))
+ (else
+ (warning "in linker-loop: unknown object"))))
+ (msg
+ (warning "linker ignored message: " msg)))
+ (loop)))))
;; Remote spawning
;; the SPAWNER answers remote-spawn request
(define spawner
(spawn
- (lambda ()
- (let loop ()
- (recv
- ((from tag ('spawn thunk links))
- (! from (list tag (spawn thunk links: links))))
-
- (msg
- (warning "spawner ignored message: " msg)))
- (loop)))))
+ (lambda ()
+ (let loop ()
+ (recv
+ ((from tag ('spawn thunk links))
+ (! from (list tag (spawn thunk links: links))))
+
+ (msg
+ (warning "spawner ignored message: " msg)))
+ (loop)))))
;; the PUBLISHER is used to implement a mutable global env. for
;; process names
(define publisher
(spawn
- (lambda ()
- (define dict (make-dict))
-
- (let loop ()
- (recv
- (('publish name pid)
- (dict-set! dict name pid))
-
- (('unpublish name pid)
- (dict-set! dict name))
-
- ((from tag ('resolve name))
- (! from (list tag (dict-ref dict name))))
-
- (msg
- (warning "puslisher ignored message: " msg)))
-
- (loop)))))
+ (lambda ()
+ (define dict (make-dict))
+
+ (let loop ()
+ (recv
+ (('publish name pid)
+ (dict-set! dict name pid))
+
+ (('unpublish name pid)
+ (dict-set! dict name))
+
+ ((from tag ('resolve name))
+ (! from (list tag (dict-ref dict name))))
+
+ (msg
+ (warning "puslisher ignored message: " msg)))
+
+ (loop)))))
(define (publish-service name pid)
(! publisher (list 'publish name pid)))
@@ -789,16 +122,46 @@
;; ----------------------------------------------------------------------------
-;; Erlang/OTP-like behavior for "generic servers" and "event handlers"
+;; Links
-(include "otp/gen_server.scm")
-(include "otp/gen_event.scm")
+;; * Link another process 'pid' /to/ the current one: any exception
+;; not being caught by the remote process and making it crash will be
+;; propagated to the current process.
+(define (inbound-link pid)
+ (! linker (list 'link pid (self))))
+
+
+;; * Link bidirectionally the current process with another process
+;; 'pid': any exception not being caught in any of the two processes
+;; will be propagated to the other one.
+(define (full-link pid)
+ (inbound-link pid)
+ (outbound-link pid))
;; ----------------------------------------------------------------------------
-;; Some datastrutures
+;; Erlang/OTP-like behavior for "generic servers" and "event handlers"
+
+;; (include "otp/gen_server.scm") By Per: Not needed
+;; (include "otp/gen_event.scm") By Per: Not needed
-(include "data.scm")
+
+;; build a trivial event handler with no state, only invoking a
+;; callback on any event
+(define (make-simple-event-handler callback initial-state)
+ (make-event-handler
+ ;; INIT
+ (lambda (args)
+ initial-state)
+ ;; NOTIFY
+ (lambda (event state)
+ (callback event state))
+ ;; CALL
+ (lambda (args state)
+ (values (void) state))
+ ;; TERMINATE
+ (lambda (reason state)
+ (void))))
;; ----------------------------------------------------------------------------
@@ -831,56 +194,53 @@
;; might use similar style.)
(define (report-event event port)
- (display (list ";; --- " (formatted-current-time) " ---\n") port)
- (display (list "Event type: " (car event) "\n") port)
- (display (list "In process: " (cadr event) "\n") port)
- (display (list "On node: " (current-node) "\n") port)
- (write (caddr event) port)
+ (let ((display
+ (lambda list
+ (for-each (lambda (x)
+ (display x port))
+ list))))
+ (display ";; --- " (formatted-current-time) " ---\n")
+ (display "Event type: " (car event) "\n")
+ (display "In process: " (cadr event) "\n")
+ (display "On node: " (current-node) "\n")
+ (display (caddr event)))
(newline port)
(force-output port)
port)
(define file-output-log-handler
(make-event-handler
- ;; init
- (lambda (args)
- (match args
- ((filename)
- (open-output-file (list path: filename
- create: 'maybe
- append: #t)))))
- ;; event
- report-event
- ;; call
- (lambda (term port)
- (values (void) port))
- ;; shutdown
- (lambda (reason port)
- (close-output-port port))))
+ ;; init
+ (lambda (args)
+ (match args
+ ((filename)
+ (open-output-file (list path: filename
+ create: 'maybe
+ append: #t)))))
+ ;; event
+ report-event
+ ;; call
+ (lambda (term port)
+ (values (void) port))
+ ;; shutdown
+ (lambda (reason port)
+ (close-output-port port))))
;; 'type' is a keyword (error warning info debug)
-(define (termite-log type message)
- (event-manager:notify logger (list type (self) message)))
-
-(define (warning . terms)
- (termite-log 'warning terms))
-
-(define (info . terms)
- (termite-log 'info terms))
-
-(define (debug . terms)
- (termite-log 'debug terms))
+(termite-log-fun
+ (lambda (type message)
+ (event-manager:notify logger (list type (self) message))))
(define logger
(let ((logger (event-manager:start)))
(event-manager:add-handler logger
- (make-simple-event-handler
- report-event
- (current-error-port)))
+ (make-simple-event-handler
+ report-event
+ (current-error-port)))
(event-manager:add-handler logger
- file-output-log-handler
- "_termite.log")
+ file-output-log-handler
+ "_termite.log")
logger))
View
710 termite_core.scm
@@ -0,0 +1,710 @@
+;; Copyright (C) 2005-2008 by Guillaume Germain, All Rights Reserved.
+;; File: "termite.scm"
+
+;; this is the main file for the Termite system
+
+(import recv
+ utils
+ uuid
+ exception
+ match)
+
+
+;; ----------------------------------------------------------------------------
+;; System configuration & global data
+
+(define *termite-cookie* (getenv "TERMITE_COOKIE" #f))
+
+(define current-node (lambda () (error "uninitialized node")))
+
+(define *global-mutex* (make-mutex "global termite mutex"))
+
+;; translation tables for "published" PIDs
+(define *foreign->local* (make-table weak-values: #t))
+(define *local->foreign* (make-table weak-keys: #t))
+;; translation table for "published" tags
+(define *uuid->tag* (make-table weak-values: #t))
+
+;; Get the current time in seconds.
+(define (now)
+ (time->seconds
+ (current-time)))
+
+;; TODO Improve this
+(define (formatted-current-time)
+ (let* ((port (open-process "date"))
+ (time (read-line port)))
+ (close-port port)
+ time))
+
+;; ----------------------------------------------------------------------------
+;; Datatypes
+
+(define (process? obj) (thread? obj))
+(define (process-links pid) (thread-specific pid))
+(define (process-links-set! pid obj) (thread-specific-set! pid obj))
+
+;; universal pid
+(define-type upid
+ id: 9e096e09-8c66-4058-bddb-e061f2209838
+ tag
+ node)
+
+;; nodes
+(define-type node
+ id: 8992144e-4f3e-4ce4-9d01-077576f98bc5
+ read-only:
+ host
+ port)
+
+;; tags
+(define-type tag
+ id: efa4f5f8-c74c-465b-af93-720d44a08374
+ (uuid init: #f))
+
+;; * Test whether 'obj' is a pid.
+(define (pid? obj)
+ (or (process? obj) (upid? obj)))
+
+;; ----------------------------------------------------------------------------
+;; Logging functions
+
+(define termite-log-fun (make-parameter #f))
+
+(define (termite-log type message)
+ (and (termite-log-fun)
+ ((termite-log-fun) type message)))
+
+(define (warning . terms)
+ (termite-log 'warning terms))
+
+(define (info . terms)
+ (termite-log 'info terms))
+
+(define (debug . terms)
+ (termite-log 'debug terms))
+
+
+
+;; ----------------------------------------------------------------------------
+;; process manipulation primitives
+
+;; * Get the pid of the current process.
+(define self current-thread)
+
+;; Base exception handler for Termite processes.
+(define (base-exception-handler e)
+ (continuation-capture
+ (lambda (k)
+ (let ((log-crash
+ (lambda (e)
+ (termite-log
+ 'error
+ (call-with-output-string
+ ""
+ (lambda (port)
+ (display "#|\n" port)
+ (display-exception-in-context
+ e
+ k
+ port)
+ ;; todo: provide a safe
+ ;; wrapper in Gambit
+ ;; runtime?
+ (##cmd-b k port 0 #f)
+ (display "|#\n" port)))))))
+ (cond
+ ;; Propagated Termite exception?
+ ((termite-exception? e)
+ (if (not (eq? (termite-exception-reason e) 'normal))
+ (log-crash (termite-exception-object e)))
+ (for-each
+ (lambda (pid) (! pid e))
+ (process-links (self)))
+ (halt!))
+ ;; Gambit exception in the current process
+ (else
+ (log-crash e)
+ (for-each
+ (lambda (pid)
+ (! pid (make-termite-exception (self) 'failure e)))
+ (process-links (self)))
+ (halt!)))))))
+
+
+;; * Start a new process executing the code in 'thunk'.
+(define (spawn thunk #!key (links '()))
+ (let ((t (make-thread
+ (lambda ()
+ (with-exception-handler
+ base-exception-handler
+ thunk)
+ (shutdown!)))))
+ (thread-specific-set! t links)
+ (thread-start! t)
+ t))
+
+
+(define (spawn-linked-to to thunk)
+ (spawn thunk links: (list to)))
+
+
+;; * Start a new process with a bidirectional link to the current
+;; process.
+(define (spawn-link thunk)
+ (let ((pid (spawn thunk links: (list (self)))))
+ (outbound-link pid)
+ pid))
+
+
+;; * Start a new process on remote node 'node', executing the code
+;; in 'thunk'.
+(define (remote-spawn node thunk #!key (links '()))
+ (if (equal? node (current-node))
+ (spawn thunk links: links)
+ (!? (remote-service 'spawner node)
+ (list 'spawn thunk links))))
+
+
+;; * Start a new process on remote node 'node', with a bidirectional
+;; link to the current process.
+(define (remote-spawn-link node thunk)
+ (let ((pid (remote-spawn node thunk links: (list (self)))))
+ (outbound-link pid)
+ pid))
+
+
+;; * Cleanly stop the execution of the current process. Linked
+;; processes will receive a "normal" exit message.
+(define (shutdown!)
+ (for-each
+ (lambda (pid)
+ (! pid (make-termite-exception (self) 'normal #f)))
+ (process-links (self)))
+ (halt!))
+
+;; this is *not* nice: it wont propagate the exit message to the other
+;; processes
+(define (halt!)
+ (thread-terminate! (current-thread)))
+
+
+;; * Forcefully terminate a local process. Warning: it only works on
+;; local processes! This should be used with caution.
+(define (terminate! victim)
+ (thread-terminate! victim)
+ (for-each
+ (lambda (link)
+ (! link (make-termite-exception victim 'terminated #f)))
+ (process-links victim)))
+
+
+;; TODO 'wait-for' and 'alive?' should be grouped in a more general
+;; procedure able to determine the status of a process (alive, dead,
+;; waiting, etc.) and the procedure should work on remote processes
+
+;; * Wait for the end of a process 'pid'. Does not return anything.
+;; Warning: will not work on remote processes.
+(define (%wait-for pid)
+ (with-exception-catcher
+ (lambda (e)
+ (void))
+ (lambda ()
+ (thread-join! pid)
+ (void))))
+
+
+;; Check whether the process 'pid' is still alive. Warning: will not
+;; work on remote processes.
+(define (%alive? pid)
+ (with-exception-catcher
+ (lambda (e)
+ (join-timeout-exception? e))
+ (lambda ()
+ (thread-join! pid 0)
+ #f)))
+
+
+;; ----------------------------------------------------------------------------
+;; Sending messages
+
+;; * Send a message 'msg' to 'pid'. This means that the message will
+;; be enqueued in the mailbox of the destination process.
+;;
+;; Delivery of the message is unreliable in theory, but in practice
+;; local messages will always be delivered, and remote messages will
+;; not be delivered only if the connection is currently broken to the
+;; remote node, or if the remote node is down.
+;;
+;; Note that you will not get an error or an exception if the message
+;; doesn't get there: you need to handle errors yourself.
+(define (! to msg)
+ (cond
+ ((process? to)
+ (thread-send to msg))
+ ((upid? to)
+ (thread-send dispatcher (list 'relay to msg)))
+ (else
+ (error "invalid-message-destination" to))))
+
+
+;; ----------------------------------------------------------------------------
+;; Receiving messages
+
+;; incorrect, because it doesn't handle exception messages
+;; (define ? thread-receive)
+
+;; * Retrieve the first message from the mailbox of the current
+;; process. If no message is available, the process will block until
+;; a message is received. If 'timeout' is specified, the process will
+;; only block for that amount of time, and then raise an exception.
+;; It is possible to also pass the 'default' argument to return a
+;; value instead of raising an exception.
+(define (? . opt) ;; TODO: inefficient, fix
+ (match opt
+ (()
+ (recv
+ (msg msg)))
+
+ ((timeout)
+ (recv
+ (msg msg)
+ (after timeout (thread-receive 0))))
+
+ ((timeout default)
+ (recv
+ (msg msg)
+ (after timeout default)))))
+
+
+;; benchmark to see if faster...
+;; (define (? #!optional (timeout +inf.0) (default (lambda (thread-receive 0))))
+;; (with-exception-catcher
+;; (lambda (exception)
+;; (if (mailbox-receive-timeout-exception? exception)
+;; (default)
+;; (raise exception)))
+;; (lambda ()
+;; (thread-receive timeout))))
+
+
+;; * Retrieve the first message from the mailbox of the current
+;; process that satisfised the predicate 'pred?'. If no message
+;; qualifies, the process will block until a message satisfying the
+;; predicate is received. If 'timeout' is specified, the process will
+;; only block for that amount of time, and then raise an exception.
+;; It is possible to also pass the 'default' argument to return a
+;; value instead of raising an exception.
+;; TODO: inefficient, fix
+(define (?? pred? . opt)
+ (match opt
+ (()
+ (recv
+ (msg (where (pred? msg)) msg)))
+
+ ((timeout)
+ (recv
+ (msg (where (pred? msg)) msg)
+ (after timeout (thread-receive 0))))
+
+ ((timeout default)
+ (recv
+ (msg (where (pred? msg)) msg)
+ (after timeout default)))))
+
+
+;; ----------------------------------------------------------------------------
+;; Higher-order concurrency primitives
+
+;; * Send a "synchronous" message to a process. The message will be
+;; annotated with a tag and the pid of the current process, therefore
+;; sending a message of the form '(from tag msg)'. The server
+;; receiving the message must specifically handle that format of
+;; message, and reply with a message of the form '(tag reply)'.
+;;
+;; Like for the |?| and |??| message retrieving operators, it is
+;; possible to specify a 'timeout' to limit the amount of time to wait
+;; for a reply, and a 'default' value to return if no reply has been
+;; received.
+;; RPC
+(define (!? pid msg . opt)
+ (let ((tag (make-tag)))
+ (! pid (list (self) tag msg))
+
+ (match opt
+ (()
+ (recv
+ ((,tag reply) reply)))
+
+ ((timeout)
+ (recv
+ ((,tag reply) reply)
+ (after timeout (raise 'timeout))))
+
+ ((timeout default)
+ (recv
+ ((,tag reply) reply)
+ (after timeout default))))))
+
+
+;; * Evaluate a 'thunk' on a remote node and return the result of that
+;; evaluation. Just like for |!?|, |?| and |??|, it is possible to
+;; specify a 'timeout' and a 'default' argument.
+(define (on node thunk)
+ (let ((tag (make-tag))
+ (from (self)))
+ (remote-spawn node
+ (lambda ()
+ (! from (list tag (thunk)))))
+ (recv
+ ((,tag reply) reply))))
+
+
+;; ----------------------------------------------------------------------------
+;; Links and exception handling
+
+;; * Link the current process /to/ another process 'pid': any
+;; exception not being caught by the current process will be
+;; propagated to the remote process.
+(define (outbound-link pid)
+ (let* ((links (process-links (self))))
+ (if (not (memq pid links))
+ (process-links-set! (self) (cons pid links)))))
+
+
+;; ----------------------------------------------------------------------------
+;; Termite I/O
+
+;; Wraps 'pid's representing Gambit output ports.
+(define-type termite-output-port
+ id: b0c30401-474c-4e83-94b4-d516e00fe363
+ unprintable:
+ pid)
+
+;; Wraps 'pid's representing Gambit input ports.
+(define-type termite-input-port
+ id: ebb22fcb-ca61-4765-9896-49e6716471c3
+ unprintable:
+ pid)
+
+;; Start a process representing a Gambit output port.
+(define (spawn-output-port port #!optional (serialize? #f))
+ (output-port-readtable-set!
+ port
+ (readtable-sharing-allowed?-set
+ (output-port-readtable port)
+ serialize?))
+
+ (make-termite-output-port
+ (spawn
+ (lambda ()
+ (let loop ()
+ (recv
+ (proc
+ (where (procedure? proc))
+ (proc port))
+ (x (warning "unknown message sent to output port: " x)))
+ (loop))))))
+
+;; Start a process representing a Gambit input port.
+(define (spawn-input-port port #!optional (serialize? #f))
+ (input-port-readtable-set!
+ port
+ (readtable-sharing-allowed?-set
+ (input-port-readtable port)
+ serialize?))
+
+ (make-termite-input-port
+ (spawn
+ (lambda ()
+ (let loop ()
+ (recv
+ ((from token proc)
+ (where (procedure? proc))
+ (! from (list token (proc port))))
+ (x (warning "unknown message sent to input port: " x)))
+ (loop))))))
+
+;; IO parameterization
+;; (define current-termite-input-port (make-parameter #f))
+;; (define current-termite-output-port (make-parameter #f))
+
+;; insert IO overrides
+;; (include "termiteio.scm")
+
+
+;; ----------------------------------------------------------------------------
+;; Distribution
+
+;; Convert a 'pid'
+(define (pid->upid obj)
+ (mutex-lock! *global-mutex*)
+ (cond
+ ((table-ref *local->foreign* obj #f)
+ => (lambda (x)
+ (mutex-unlock! *global-mutex*)
+ x))
+ (else
+ (let ((upid (make-upid (make-uuid) (current-node))))
+ (table-set! *local->foreign* obj upid)
+ (table-set! *foreign->local* upid obj)
+ (mutex-unlock! *global-mutex*)
+ upid))))
+
+(define (tag->utag obj)
+ (mutex-lock! *global-mutex*)
+ (cond
+ ((tag-uuid obj)
+ (mutex-unlock! *global-mutex*)
+ obj)
+ (else
+ (let ((uuid (make-uuid)))
+ (tag-uuid-set! obj uuid)
+ (table-set! *uuid->tag* uuid obj)
+ (mutex-unlock! *global-mutex*)
+ obj))))
+
+
+(define (serialize-hook obj)
+ (cond
+ ((process? obj)
+ (pid->upid obj))
+
+ ((tag? obj)
+ (tag->utag obj))
+
+ ;; unserializable objects, so instead of crashing we set them to #f
+ ((or (port? obj))
+ #f)
+
+ (else obj)))
+
+(define (upid->pid obj)
+ (cond
+ ((table-ref *foreign->local* obj #f)
+ => (lambda (pid) pid))
+ ((and (symbol? (upid-tag obj))
+ (resolve-service (upid-tag obj)))
+ => (lambda (pid)
+ pid))
+ (else
+ (error "don't know how to upid->pid"))))
+
+(define (utag->tag obj)
+ (let ((uuid (tag-uuid obj)))
+ (cond
+ ((table-ref *uuid->tag* uuid #f)
+ => (lambda (tag) tag))
+ (else obj))))
+
+(define (deserialize-hook obj)
+ (cond
+ ((and (upid? obj)
+ (equal? (upid-node obj)
+ (current-node)))
+ (upid->pid obj))
+ ((tag? obj)
+ (utag->tag obj))
+ (else obj)))
+
+
+(define (serialize obj port)
+ (let* ((serialized-obj
+ (object->u8vector obj serialize-hook))
+ (len
+ (u8vector-length serialized-obj))
+ (serialized-len
+ (u8vector (bitwise-and len #xff)
+ (bitwise-and (arithmetic-shift len -8) #xff)
+ (bitwise-and (arithmetic-shift len -16) #xff)
+ (bitwise-and (arithmetic-shift len -24) #xff))))
+
+ (begin
+ (write-subu8vector serialized-len 0 4 port)
+ (write-subu8vector serialized-obj 0 len port))))
+
+
+(define (deserialize port)
+ (let* ((serialized-len
+ (u8vector 0 0 0 0))
+ (n
+ (read-subu8vector serialized-len 0 4 port)))
+
+ (cond ((= 0 n)
+ #!eof)
+ ((not (= 4 n))
+ (error "deserialization error"))
+ (else
+ (let* ((len
+ (+ (u8vector-ref serialized-len 0)
+ (arithmetic-shift (u8vector-ref serialized-len 1) 8)
+ (arithmetic-shift (u8vector-ref serialized-len 2) 16)
+ (arithmetic-shift (u8vector-ref serialized-len 3) 24)))
+ (serialized-obj
+ (make-u8vector len))
+ (n
+ (read-subu8vector serialized-obj 0 len port)))
+
+ (if (not (eqv? len n))
+ (begin
+ (error "deserialization error"
+ (list len: len n: n)))
+ (let ((obj (u8vector->object serialized-obj deserialize-hook)))
+ (if (vector? obj)
+ (vector->list obj)
+ obj))))))))
+
+(define (start-serializing-output-port port)
+ (spawn-link
+ (lambda ()
+ (let loop ()
+ (recv
+ (('write data)
+ ;; (debug out: data)
+ (serialize data port)
+ (force-output port)) ;; io override
+
+ (msg
+ (warning "serializing-output-port ignored message: " msg)))
+ (loop)))))
+
+
+(define (start-serializing-active-input-port port receiver)
+ (spawn-link
+ (lambda ()
+ (let loop ()
+ (let ((data (deserialize port)))
+ ;; to receive exceptions...
+ (? 0 'ok)
+ ;; (debug in: data)
+ (if (eof-object? data) (shutdown!))
+ (! receiver (list (self) data))
+ (loop))))))
+
+
+;; a tcp server listens on a certain port for new tcp connection
+;; requests, and call ON-CONNECT to deal with those new connections.
+(define (start-tcp-server tcp-port-number on-connect)
+ (let ((tcp-server-port
+ (open-tcp-server (list
+ port-number: tcp-port-number
+ coalesce: #f))))
+ (spawn
+ (lambda ()
+ (let loop ()
+ (on-connect (read tcp-server-port)) ;; io override
+ (loop))))))
+
+
+;; MESSENGERs act as proxies for sockets to other nodes
+
+;; initiate a new bidirectional connection to another node important:
+;; caller is responsible for registering it with the dispatcher
+(define (initiate-messenger node)
+ ;; (print "OUTBOUND connection established\n")
+ (spawn
+ (lambda ()
+ (with-exception-catcher
+ (lambda (e)
+ (! dispatcher (list 'unregister (self)))
+ (shutdown!))
+
+ (lambda ()
+ (let ((socket (open-tcp-client
+ (list server-address: (node-host node)
+ port-number: (node-port node)
+ coalesce: #f))))
+ ;; the real interesting part
+ (let ((in (start-serializing-active-input-port socket (self)))
+ (out (start-serializing-output-port socket)))
+
+ (! out (list 'write (current-node)))
+
+ (messenger-loop node in out))))))))
+
+
+;; start a MESSENGER for an 'inbound' connection (another node
+;; initiated the bidirectional connection, see initiate-messenger)
+(define (start-messenger socket)
+ ;; (print "INBOUND connection established\n")
+ (spawn
+ (lambda ()
+ (with-exception-catcher
+ (lambda (e)
+ (! dispatcher (list 'unregister (self)))
+ (shutdown!))
+
+ (lambda ()
+ (let ((in (start-serializing-active-input-port socket (self)))
+ (out (start-serializing-output-port socket)))
+ (recv
+ ((,in node)
+ ;; registering messenger to local dispatcher
+ (! dispatcher (list 'register (self) node))
+ (messenger-loop node in out)))))))))
+
+
+(define (messenger-loop node in out)
+ (recv
+ ;; incoming message
+ ((,in ('relay id message))
+ (let ((to (upid->pid (make-upid id (current-node)))))
+ (! to message)))
+
+ ;; outgoing message
+ (('relay to message)
+ ;; 'to' is a upid
+ (let* ((id (upid-tag to))
+ ;; (node (upid-node to))
+ ;; (host (node-host node))
+ ;; (port (node-id node))
+ )
+ (! out (list 'write (list 'relay id message)))))
+
+ ;; unknown message
+ (msg
+ (warning "messenger-loop ignored message: " msg)))
+
+ (messenger-loop node in out))
+
+
+;; the DISPATCHER dispatches messages to the right MESSENGER, it keeps
+;; track of known remote nodes
+(define dispatcher
+ (spawn
+ (lambda ()
+ ;; the KNOWN-NODES of the DISPATCHER LOOP is an a-list of NODE => MESSENGER
+ (let loop ((known-nodes '()))
+ (recv
+ (('register messenger node)
+ (loop (cons (cons node messenger) known-nodes)))
+
+ (('unregister messenger)
+ (loop (remove (lambda (m) (equal? (cdr m) messenger)) known-nodes)))
+
+ (('relay upid message)
+ (let ((node (upid-node upid)))
+ (cond
+ ;; the message should be sent locally (ideally should not happen
+ ;; for performance reasons, but if the programmer wants to do
+ ;; that, then OK...)
+ ((equal? node (current-node))
+ (! (upid->pid upid) message)
+ (loop known-nodes))
+
+ ;; the message is destined to a pid on a known node
+ ((assoc node known-nodes)
+ => (lambda (messenger)
+ (! (cdr messenger) (list 'relay upid message))
+ (loop known-nodes)))
+
+ ;; unconnected node, must connect
+ (else
+ (let ((messenger (initiate-messenger node)))
+ (! messenger (list 'relay upid message))
+ (loop (cons (cons node messenger) known-nodes)))))))
+
+ (msg
+ (warning "dispatcher ignored message: " msg) ;; uh...
+ (loop known-nodes)))))))
+
View
31 utils.scm
@@ -0,0 +1,31 @@
+;; utils
+
+;; ----------------------------------------------------------------------------
+;; Some basic utilities
+
+(define (find pred? lst)
+ (let loop ((lst lst))
+ (and (not (null? lst))
+ (if (pred? (car lst))
+ (car lst)
+ (loop (cdr lst))))))
+
+(define (assoc-pred x lst pred?)
+ (find (lambda (entry)
+ (pred? x (car entry)))
+ lst))
+
+(define (filter pred? lst)
+ (cond
+ ((null? lst) '())
+
+ ((pred? (car lst))
+ (cons (car lst)
+ (filter pred? (cdr lst))))
+ (else
+ (filter pred? (cdr lst)))))
+
+(define (remove pred? lst)
+ (filter (lambda (x)
+ (not (pred? x)))
+ lst))

0 comments on commit 05d03af

Please sign in to comment.