Permalink
Find file
Fetching contributors…
Cannot retrieve contributors at this time
329 lines (302 sloc) 10.3 KB
;Copyright (C) 2009 Oriol Corcoll
;
;Licensed under the Apache License, Version 2.0 (the "License");
;you may not use this file except in compliance with the License.
;You may obtain a copy of the License at
;
;http://www.apache.org/licenses/LICENSE-2.0
;
;Unless required by applicable law or agreed to in writing, software
;distributed under the License is distributed on an "AS IS" BASIS,
;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;See the License for the specific language governing permissions and
;limitations under the License.
(module beanscheme scheme
(provide (all-defined-out))
;Open beanstalkd connection.
;:p host: string?
;:p port: integer?
;:r: input-port? output-port?
(define (open host port)
(tcp-connect host port))
;Close beanstalkd connection.
;:p out: output-port?
;:p in: input-port?
(define (quit out in)
(fprintf out "quit\r\n")
(close-output-port out)
(close-input-port in))
;List tubes.
;:p out: output-port?
;:p in: input-port?
;:r: list? or #f
(define (list-tubes out in)
(fprintf out "list-tubes\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "OK")
(filter (lambda (i) (not(string=? "" i)))
(map (lambda (i) (regexp-replace* #rx"-| " i ""))
(regexp-split #rx"\n|\r\n" (read-string (+ (string->number (last conf)) 2) in))))
#f)))
;List watched tubes
;:p out: output-port?
;:p in: input-port?
;:r: list? or #f
(define (list-tubes-watched out in)
(fprintf out "list-tubes-watched\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "OK")
(filter (lambda (i) (not(string=? "" i)))
(map (lambda (i) (regexp-replace* #rx"-| " i ""))
(regexp-split #rx"\n|\r\n" (read-string (+ (string->number (last conf)) 2) in))))
#f)))
;Tube in use.
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (list-tube-used out in)
(fprintf out "list-tube-used\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "USING")
(last conf)
#f)))
;Change tube in use.
;:p tube: string?
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (use tube out in)
(fprintf out "use ~a\r\n" tube)
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "USING")
(last conf)
#f)))
;Change watching tube.
;:p tube: string?
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (watch tube out in)
(fprintf out "watch ~a\r\n" tube)
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "WATCHING")
(last conf)
#f)))
;Ignore a tube.
;:p tube: string?
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (ignore tube out in)
(fprintf out "ignore ~a\r\n" tube)
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "WATCHING")
(last conf)
#f)))
;Put job in queue.
;:p data: string?
;:p out: output-port?
;:p in: input-port?
;:p priority [100]: integer?
;:p delay [0]: integer?
;:p ttr [5]: integer?
;:r: string? or #f
(define (put-string data out in [priority 100] [delay 0] [ttr 5])
(fprintf out "put ~a ~a ~a ~a\r\n" priority delay ttr (string-length data))
(fprintf out "~a\r\n" data)
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "INSERTED")
(last conf)
#f)))
;Put job in queue.
;:p data: bytes?
;:p out: output-port?
;:p in: input-port?
;:p priority [100]: integer?
;:p delay [0]: integer?
;:p ttr [5]: integer?
;:r: string? or #f
(define (put-binary data out in [priority 100] [delay 0] [ttr 5])
(fprintf out "put ~a ~a ~a ~a\r\n" priority delay ttr (bytes-length data))
(fprintf out "~a\r\n" data)
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "INSERTED")
(last conf)
#f)))
;Reserve job.
;:p out: output-port?
;:p in: input-port?
;:p timeout [#f]: integer?
;:r: integer? string? or #f
(define (reserve out in [timeout #f])
(if timeout
(fprintf out "reserve-with-timeout ~a\r\n" timeout)
(fprintf out "reserve\r\n"))
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "RESERVED")
(values (string->number(second conf))
(let ([str (read-string (+ (string->number (third conf)) 2) in)])
(substring str 0 (- (string-length str) 2))))
#f)))
;Release job.
;:p id: integer?
;:p out: output-port?
;:p in: input-port?
;:p priority [100]: integer?
;:p delay [0]: integer?
;:r: #t or #f
(define (release id out in [priority 100] [delay 0])
(fprintf out "release ~a ~a ~a\r\n" (number->string id) priority delay)
(flush-output out)
(if (string=? (read-line in 'return-linefeed) "RELEASED") #t #f))
;Bury job.
;:p id: integer?
;:p out: output-port?
;:p in: input-port?
;:p priority [100]: integer?
;:r: #t or #f
(define (bury id out in [priority 100])
(fprintf out "bury ~a ~a\r\n" (number->string id) priority)
(flush-output out)
(if (string=? (read-line in 'return-linefeed) "BURIED") #t #f))
;Touch job.
;:p id: integer?
;:p out: output-port?
;:p in: input-port?
;:r: #t or #f
(define (touch id out in)
(fprintf out "touch ~a\r\n" (number->string id))
(flush-output out)
(if (string=? (read-line in 'return-linefeed) "TOUCHED") #t #f))
;Delete job.
;:p id: integer?
;:p out: output-port?
;:p in: input-port?
;:r: #t or #f
(define (delete id out in)
(fprintf out "delete ~a\r\n" (number->string id))
(flush-output out)
(if (string=? (read-line in 'return-linefeed) "DELETED") #t #f))
;Kick job.
;:p bound: integer?
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (kick bound out in)
(fprintf out "kick ~a\r\n" (number->string bound))
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "KICKED")
(last conf)
#f)))
;Stats of job.
;:p id: integer?
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (stats-job id out in)
(fprintf out "stats-job ~a\r\n" (number->string id))
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "OK")
(let ([str (read-string (+ (string->number (second conf)) 2) in)])
(substring str 0 (- (string-length str) 2)))
#f)))
;Stats of tube.
;:p tube: string?
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (stats-tube tube out in)
(fprintf out "stats-tube ~a\r\n" tube)
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "OK")
(let ([str (read-string (+ (string->number (second conf)) 2) in)])
(substring str 0 (- (string-length str) 2)))
#f)))
;Stats of queue.
;:p out: output-port?
;:p in: input-port?
;:r: string? or #f
(define (stats out in)
(fprintf out "stats\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "OK")
(let ([str (read-string (+ (string->number (second conf)) 2) in)])
(substring str 0 (- (string-length str) 2)))
#f)))
;Pause a tube.
;:p tube: string?
;:p delay: integer?
;:p out: output-port?
;:p in: input-port?
;:r: #t or #f
(define (pause-tube tube delay out in)
(fprintf out "pause-tube ~a ~a\r\n" tube delay)
(flush-output out)
(if (string=? (read-line in 'return-linefeed) "PAUSED") #t #f))
;Peek job.
;:p id: integer?
;:p out: output-port?
;:p in: input-port?
;:r: integer? string? or #f
(define (peek id out in)
(fprintf out "peek ~a\r\n" (number->string id))
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "FOUND")
(values (string->number(second conf))
(let ([str (read-string (+ (string->number (third conf)) 2) in)])
(substring str 0 (- (string-length str) 2))))
#f)))
;Peek ready job.
;:p out: output-port?
;:p in: input-port?
;:r: integer? string? or #f
(define (peek-ready out in)
(fprintf out "peek-ready\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "FOUND")
(values (string->number(second conf))
(let ([str (read-string (+ (string->number (third conf)) 2) in)])
(substring str 0 (- (string-length str) 2))))
#f)))
;Peek delayed job.
;:p out: output-port?
;:p in: input-port?
;:r: integer? string? or #f
(define (peek-delayed out in)
(fprintf out "peek-delayed\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "FOUND")
(values (string->number(second conf))
(let ([str (read-string (+ (string->number (third conf)) 2) in)])
(substring str 0 (- (string-length str) 2))))
#f)))
;Peek buried job.
;:p out: output-port?
;:p in: input-port?
;:r: integer? string? or #f
(define (peek-buried out in)
(fprintf out "peek-buried\r\n")
(flush-output out)
(let ([conf (regexp-split #rx" " (read-line in 'return-linefeed))])
(if (string=? (first conf) "FOUND")
(values (string->number(second conf))
(let ([str (read-string (+ (string->number (third conf)) 2) in)])
(substring str 0 (- (string-length str) 2))))
#f)))
)