Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 51 additions & 26 deletions src/Node/Stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,30 @@ exports.setEncodingImpl = function(s) {
};
};

exports.onDataEitherImpl = function(left){
return function(right){
return function(s) {
return function(f) {
return function() {
s.on('data', function(chunk) {
if (chunk instanceof Buffer) {
f(right(chunk))();
}
else if (typeof chunk === "string") {
f(left(chunk))();
}
else {
throw new Error(
"Node.Stream.onDataEitherImpl: Unrecognised" +
"chunk type; expected String or Buffer, got:" +
chunk);
}
});
};
exports.readChunkImpl = function(Left) {
return function(Right) {
return function(chunk) {
if (chunk instanceof Buffer) {
return Right(chunk);
} else if (typeof chunk === 'string') {
return Left(chunk);
} else {
throw new Error(
"Node.Stream.readChunkImpl: Unrecognised " +
"chunk type; expected String or Buffer, got: " +
chunk);
}
};
};
};

exports.onDataEitherImpl = function(readChunk) {
return function(r) {
return function(f) {
return function() {
r.on('data', function(data) {
f(readChunk(data))();
});
};
};
};
Expand All @@ -40,9 +44,15 @@ exports.onDataEitherImpl = function(left){
exports.onEnd = function(s) {
return function(f) {
return function() {
s.on('end', function() {
f();
});
s.on('end', f);
};
};
};

exports.onReadable = function(s) {
return function(f) {
return function() {
s.on('readable', f);
};
};
};
Expand All @@ -60,9 +70,7 @@ exports.onError = function(s) {
exports.onClose = function(s) {
return function(f) {
return function() {
s.on('close', function() {
f();
});
s.on('close', f);
};
};
};
Expand Down Expand Up @@ -93,6 +101,23 @@ exports.pipe = function(r) {
};
};

exports.readImpl = function(readChunk) {
return function(Nothing) {
return function(Just) {
return function(r) {
return function() {
const v = r.read();
if (v === null) {
return Nothing;
} else {
return Just(readChunk(v));
}
};
};
};
};
};

exports.write = function(w) {
return function(chunk) {
return function(done) {
Expand Down
44 changes: 39 additions & 5 deletions src/Node/Stream.purs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ module Node.Stream
, onDataString
, onDataEither
, setEncoding
, onReadable
, onEnd
, onClose
, onError
, resume
, pause
, isPaused
, pipe
, read
, readString
, readEither
, write
, writeString
, cork
Expand All @@ -29,6 +33,7 @@ module Node.Stream
import Prelude

import Control.Bind ((<=<))
import Data.Maybe (Maybe(..), maybe)
import Data.Either (Either(..))
import Node.Encoding
import Node.Buffer (Buffer())
Expand Down Expand Up @@ -61,6 +66,11 @@ type Writable r = Stream (write :: Write | r)
-- | A duplex (readable _and_ writable stream)
type Duplex = Stream (read :: Read, write :: Write)

foreign import data Chunk :: *
readChunk :: Chunk -> Either String Buffer
readChunk = readChunkImpl Left Right
foreign import readChunkImpl :: (forall l r. l -> Either l r) -> (forall l r. r -> Either l r) -> Chunk -> Either String Buffer

-- | Listen for `data` events, returning data in a Buffer. Note that this will fail
-- | if `setEncoding` has been called on the stream.
onData :: forall w eff. Readable w (err :: EXCEPTION | eff) -> (Buffer -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit
Expand All @@ -70,23 +80,44 @@ onData r cb =
fromEither x =
case x of
Left _ ->
throw "Node.Stream.onData: Stream encoding should not be set"
throw "Stream encoding should not be set"
Right buf ->
pure buf

read :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Eff (err :: EXCEPTION | eff) (Maybe Buffer)
read r = do
v <- readEither r
case v of
Nothing -> pure Nothing
Just (Left _) -> throw "Stream encoding should not be set"
Just (Right b) -> pure (Just b)

readString :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Encoding -> Eff (err :: EXCEPTION | eff) (Maybe String)
readString r enc = do
v <- readEither r
case v of
Nothing -> pure Nothing
Just (Left _) -> throw "Stream encoding should not be set"
Just (Right buf) -> Just <$> (unsafeInterleaveEff $ Buffer.toString enc buf)

readEither :: forall w eff. Readable w eff -> Eff eff (Maybe (Either String Buffer))
readEither = readImpl readChunk Nothing Just
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should export this, so that it lines up with the onData functions?


foreign import readImpl :: forall r eff. (Chunk -> Either String Buffer) -> (forall a. Maybe a) -> (forall a. a -> Maybe a) -> Readable r eff -> Eff eff (Maybe (Either String Buffer))

-- | Listen for `data` events, returning data in a String, which will be
-- | decoded using the given encoding. Note that this will fail if `setEncoding`
-- | has been called on the stream.
onDataString :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Encoding -> (String -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit
onDataString r enc cb = onData r (cb <=< unsafeInterleaveEff <<< Buffer.toString enc)

foreign import onDataEitherImpl :: forall w eff. (forall l r. l -> Either l r) -> (forall l r. r -> Either l r) -> Readable w eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit

-- | Listen for `data` events, returning data in an `Either String Buffer`. This
-- | function is provided for the (hopefully rare) case that `setEncoding` has
-- | been called on the stream.
onDataEither :: forall w eff. Readable w eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit
onDataEither = onDataEitherImpl Left Right
onDataEither :: forall r eff. Readable r (err :: EXCEPTION | eff) -> (Either String Buffer -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit
onDataEither r cb = onDataEitherImpl readChunk r cb

foreign import onDataEitherImpl :: forall r eff. (Chunk -> Either String Buffer) -> Readable r eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit

foreign import setEncodingImpl :: forall w eff. Readable w eff -> String -> Eff eff Unit

Expand All @@ -99,6 +130,9 @@ foreign import setEncodingImpl :: forall w eff. Readable w eff -> String -> Eff
setEncoding :: forall w eff. Readable w eff -> Encoding -> Eff eff Unit
setEncoding r enc = setEncodingImpl r (show enc)

-- | Listen for `readable` events.
foreign import onReadable :: forall w eff. Readable w eff -> Eff eff Unit -> Eff eff Unit

-- | Listen for `end` events.
foreign import onEnd :: forall w eff. Readable w eff -> Eff eff Unit -> Eff eff Unit

Expand Down
40 changes: 39 additions & 1 deletion test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ module Test.Main where

import Prelude

import Data.Maybe (Maybe(..), isNothing, isJust)
import Data.Maybe.Unsafe (fromJust)
import Data.Either (Either(..))
import Node.Buffer as Buffer
import Node.Encoding
import Node.Stream
import Node.Stream.StdIO

import Control.Monad.Eff
import Control.Monad.Eff.Console
Expand Down Expand Up @@ -40,9 +41,46 @@ main = do
log "test pipe"
testPipe

log "test manual reads"
testReads

testString :: String
testString = "üöß💡"

testReads = do
testReadString
testReadBuf

where
testReadString = do
sIn <- passThrough
v <- readString sIn UTF8
assert (isNothing v)

onReadable sIn do
str <- readString sIn UTF8
assert (isJust str)
assertEqual (fromJust str) testString
return unit

writeString sIn UTF8 testString do
return unit

testReadBuf = do
sIn <- passThrough
v <- read sIn
assert (isNothing v)

onReadable sIn do
buf <- read sIn
assert (isJust buf)
assertEqual <$> (Buffer.toString UTF8 (fromJust buf))
<*> pure testString
return unit

writeString sIn UTF8 testString do
return unit

testSetDefaultEncoding = do
w1 <- writableStreamBuffer
check w1
Expand Down