From 21d27d5adf5e7428c28491d0fdb1557ea0bfa1f5 Mon Sep 17 00:00:00 2001 From: Felix Schlitter Date: Fri, 20 May 2016 05:59:29 +1200 Subject: [PATCH] Expose `read` and `on('readable')` --- src/Node/Stream.js | 77 +++++++++++++++++++++++++++++--------------- src/Node/Stream.purs | 44 ++++++++++++++++++++++--- test/Main.purs | 40 ++++++++++++++++++++++- 3 files changed, 129 insertions(+), 32 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index 51dddba..5570dab 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -12,26 +12,30 @@ exports.setEncodingImpl = function(s) { }; }; -exports.onDataEitherImpl = function(left){ - return function(right){ - return function(s) { - return function(f) { - return function() { - s.on('data', function(chunk) { - if (chunk instanceof Buffer) { - f(right(chunk))(); - } - else if (typeof chunk === "string") { - f(left(chunk))(); - } - else { - throw new Error( - "Node.Stream.onDataEitherImpl: Unrecognised" + - "chunk type; expected String or Buffer, got:" + - chunk); - } - }); - }; +exports.readChunkImpl = function(Left) { + return function(Right) { + return function(chunk) { + if (chunk instanceof Buffer) { + return Right(chunk); + } else if (typeof chunk === 'string') { + return Left(chunk); + } else { + throw new Error( + "Node.Stream.readChunkImpl: Unrecognised " + + "chunk type; expected String or Buffer, got: " + + chunk); + } + }; + }; +}; + +exports.onDataEitherImpl = function(readChunk) { + return function(r) { + return function(f) { + return function() { + r.on('data', function(data) { + f(readChunk(data))(); + }); }; }; }; @@ -40,9 +44,15 @@ exports.onDataEitherImpl = function(left){ exports.onEnd = function(s) { return function(f) { return function() { - s.on('end', function() { - f(); - }); + s.on('end', f); + }; + }; +}; + +exports.onReadable = function(s) { + return function(f) { + return function() { + s.on('readable', f); }; }; }; @@ -60,9 +70,7 @@ exports.onError = function(s) { exports.onClose = function(s) { return function(f) { return function() { - s.on('close', function() { - f(); - }); + s.on('close', f); }; }; }; @@ -93,6 +101,23 @@ exports.pipe = function(r) { }; }; +exports.readImpl = function(readChunk) { + return function(Nothing) { + return function(Just) { + return function(r) { + return function() { + const v = r.read(); + if (v === null) { + return Nothing; + } else { + return Just(readChunk(v)); + } + }; + }; + }; + }; +}; + exports.write = function(w) { return function(chunk) { return function(done) { diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 34bae53..c38be34 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -11,6 +11,7 @@ module Node.Stream , onDataString , onDataEither , setEncoding + , onReadable , onEnd , onClose , onError @@ -18,6 +19,9 @@ module Node.Stream , pause , isPaused , pipe + , read + , readString + , readEither , write , writeString , cork @@ -29,6 +33,7 @@ module Node.Stream import Prelude import Control.Bind ((<=<)) +import Data.Maybe (Maybe(..), maybe) import Data.Either (Either(..)) import Node.Encoding import Node.Buffer (Buffer()) @@ -61,6 +66,11 @@ type Writable r = Stream (write :: Write | r) -- | A duplex (readable _and_ writable stream) type Duplex = Stream (read :: Read, write :: Write) +foreign import data Chunk :: * +readChunk :: Chunk -> Either String Buffer +readChunk = readChunkImpl Left Right +foreign import readChunkImpl :: (forall l r. l -> Either l r) -> (forall l r. r -> Either l r) -> Chunk -> Either String Buffer + -- | Listen for `data` events, returning data in a Buffer. Note that this will fail -- | if `setEncoding` has been called on the stream. onData :: forall w eff. Readable w (err :: EXCEPTION | eff) -> (Buffer -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit @@ -70,23 +80,44 @@ onData r cb = fromEither x = case x of Left _ -> - throw "Node.Stream.onData: Stream encoding should not be set" + throw "Stream encoding should not be set" Right buf -> pure buf +read :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Eff (err :: EXCEPTION | eff) (Maybe Buffer) +read r = do + v <- readEither r + case v of + Nothing -> pure Nothing + Just (Left _) -> throw "Stream encoding should not be set" + Just (Right b) -> pure (Just b) + +readString :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Encoding -> Eff (err :: EXCEPTION | eff) (Maybe String) +readString r enc = do + v <- readEither r + case v of + Nothing -> pure Nothing + Just (Left _) -> throw "Stream encoding should not be set" + Just (Right buf) -> Just <$> (unsafeInterleaveEff $ Buffer.toString enc buf) + +readEither :: forall w eff. Readable w eff -> Eff eff (Maybe (Either String Buffer)) +readEither = readImpl readChunk Nothing Just + +foreign import readImpl :: forall r eff. (Chunk -> Either String Buffer) -> (forall a. Maybe a) -> (forall a. a -> Maybe a) -> Readable r eff -> Eff eff (Maybe (Either String Buffer)) + -- | Listen for `data` events, returning data in a String, which will be -- | decoded using the given encoding. Note that this will fail if `setEncoding` -- | has been called on the stream. onDataString :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Encoding -> (String -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit onDataString r enc cb = onData r (cb <=< unsafeInterleaveEff <<< Buffer.toString enc) -foreign import onDataEitherImpl :: forall w eff. (forall l r. l -> Either l r) -> (forall l r. r -> Either l r) -> Readable w eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit - -- | Listen for `data` events, returning data in an `Either String Buffer`. This -- | function is provided for the (hopefully rare) case that `setEncoding` has -- | been called on the stream. -onDataEither :: forall w eff. Readable w eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit -onDataEither = onDataEitherImpl Left Right +onDataEither :: forall r eff. Readable r (err :: EXCEPTION | eff) -> (Either String Buffer -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit +onDataEither r cb = onDataEitherImpl readChunk r cb + +foreign import onDataEitherImpl :: forall r eff. (Chunk -> Either String Buffer) -> Readable r eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit foreign import setEncodingImpl :: forall w eff. Readable w eff -> String -> Eff eff Unit @@ -99,6 +130,9 @@ foreign import setEncodingImpl :: forall w eff. Readable w eff -> String -> Eff setEncoding :: forall w eff. Readable w eff -> Encoding -> Eff eff Unit setEncoding r enc = setEncodingImpl r (show enc) +-- | Listen for `readable` events. +foreign import onReadable :: forall w eff. Readable w eff -> Eff eff Unit -> Eff eff Unit + -- | Listen for `end` events. foreign import onEnd :: forall w eff. Readable w eff -> Eff eff Unit -> Eff eff Unit diff --git a/test/Main.purs b/test/Main.purs index 9b3c95e..a1949ce 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -2,11 +2,12 @@ module Test.Main where import Prelude +import Data.Maybe (Maybe(..), isNothing, isJust) +import Data.Maybe.Unsafe (fromJust) import Data.Either (Either(..)) import Node.Buffer as Buffer import Node.Encoding import Node.Stream -import Node.Stream.StdIO import Control.Monad.Eff import Control.Monad.Eff.Console @@ -40,9 +41,46 @@ main = do log "test pipe" testPipe + log "test manual reads" + testReads + testString :: String testString = "üöß💡" +testReads = do + testReadString + testReadBuf + + where + testReadString = do + sIn <- passThrough + v <- readString sIn UTF8 + assert (isNothing v) + + onReadable sIn do + str <- readString sIn UTF8 + assert (isJust str) + assertEqual (fromJust str) testString + return unit + + writeString sIn UTF8 testString do + return unit + + testReadBuf = do + sIn <- passThrough + v <- read sIn + assert (isNothing v) + + onReadable sIn do + buf <- read sIn + assert (isJust buf) + assertEqual <$> (Buffer.toString UTF8 (fromJust buf)) + <*> pure testString + return unit + + writeString sIn UTF8 testString do + return unit + testSetDefaultEncoding = do w1 <- writableStreamBuffer check w1