Skip to content

Commit 92cda32

Browse files
committed
Merge pull request #121 from perl6/donaldh-sockets
Implement sync sockets, pipes on JVM using NIO.
2 parents 7121946 + 8b1a699 commit 92cda32

File tree

12 files changed

+573
-213
lines changed

12 files changed

+573
-213
lines changed

src/vm/jvm/QAST/Compiler.nqp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,7 @@ QAST::OperationsJAST.map_classlib_core_op('getstdin', $TYPE_OPS, 'getstdin', [],
18891889
QAST::OperationsJAST.map_classlib_core_op('getstdout', $TYPE_OPS, 'getstdout', [], $RT_OBJ, :tc);
18901890
QAST::OperationsJAST.map_classlib_core_op('getstderr', $TYPE_OPS, 'getstderr', [], $RT_OBJ, :tc);
18911891
QAST::OperationsJAST.map_classlib_core_op('setencoding', $TYPE_OPS, 'setencoding', [$RT_OBJ, $RT_STR], $RT_OBJ, :tc);
1892+
QAST::OperationsJAST.map_classlib_core_op('setinputlinesep', $TYPE_OPS, 'setinputlinesep', [$RT_OBJ, $RT_STR], $RT_OBJ, :tc);
18921893
QAST::OperationsJAST.map_classlib_core_op('tellfh', $TYPE_OPS, 'tellfh', [$RT_OBJ], $RT_INT, :tc);
18931894
QAST::OperationsJAST.map_classlib_core_op('readfh', $TYPE_OPS, 'readfh', [$RT_OBJ, $RT_OBJ, $RT_INT], $RT_OBJ, :tc);
18941895
QAST::OperationsJAST.map_classlib_core_op('writefh', $TYPE_OPS, 'writefh', [$RT_OBJ, $RT_OBJ], $RT_OBJ, :tc);
@@ -1919,6 +1920,8 @@ QAST::OperationsJAST.add_core_op('shell', -> $qastcomp, $op {
19191920
?? QAST::Op.new( :op('shell1'), |@operands )
19201921
!! QAST::Op.new( :op('shell3'), |@operands ));
19211922
});
1923+
QAST::OperationsJAST.map_classlib_core_op('spawn', $TYPE_OPS, 'spawn', [$RT_OBJ, $RT_STR, $RT_OBJ], $RT_INT, :tc);
1924+
QAST::OperationsJAST.map_classlib_core_op('openpipe', $TYPE_OPS, 'openpipe', [$RT_STR, $RT_STR, $RT_OBJ, $RT_STR], $RT_OBJ, :tc);
19221925

19231926
QAST::OperationsJAST.map_classlib_core_op('symlink', $TYPE_OPS, 'symlink', [$RT_STR, $RT_STR], $RT_INT, :tc);
19241927

@@ -1930,6 +1933,11 @@ QAST::OperationsJAST.map_classlib_core_op('openasync', $TYPE_OPS, 'openasync', [
19301933
QAST::OperationsJAST.map_classlib_core_op('slurpasync', $TYPE_OPS, 'slurpasync', [$RT_OBJ, $RT_OBJ, $RT_OBJ, $RT_OBJ], $RT_OBJ, :tc);
19311934
QAST::OperationsJAST.map_classlib_core_op('linesasync', $TYPE_OPS, 'linesasync', [$RT_OBJ, $RT_OBJ, $RT_INT, $RT_OBJ, $RT_OBJ, $RT_OBJ], $RT_OBJ, :tc);
19321935

1936+
QAST::OperationsJAST.map_classlib_core_op('socket', $TYPE_OPS, 'socket', [$RT_INT], $RT_OBJ, :tc);
1937+
QAST::OperationsJAST.map_classlib_core_op('connect', $TYPE_OPS, 'connect', [$RT_OBJ, $RT_STR, $RT_INT], $RT_OBJ, :tc);
1938+
QAST::OperationsJAST.map_classlib_core_op('bindsock', $TYPE_OPS, 'bindsock', [$RT_OBJ, $RT_STR, $RT_INT], $RT_OBJ, :tc);
1939+
QAST::OperationsJAST.map_classlib_core_op('accept', $TYPE_OPS, 'accept', [$RT_OBJ], $RT_OBJ, :tc);
1940+
19331941
QAST::OperationsJAST.map_classlib_core_op('debugnoop', $TYPE_OPS, 'debugnoop', [$RT_OBJ], $RT_OBJ, :tc);
19341942

19351943
# terms

src/vm/jvm/runtime/org/perl6/nqp/io/FileHandle.java

Lines changed: 11 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,55 @@
22

33
import java.io.File;
44
import java.io.IOException;
5-
import java.nio.ByteBuffer;
6-
import java.nio.CharBuffer;
75
import java.nio.channels.FileChannel;
86
import java.nio.charset.Charset;
9-
import java.nio.charset.CharsetDecoder;
10-
import java.nio.charset.CharsetEncoder;
117
import java.nio.file.Path;
128
import java.nio.file.StandardOpenOption;
13-
import java.util.ArrayList;
149

1510
import org.perl6.nqp.runtime.ExceptionHandling;
1611
import org.perl6.nqp.runtime.ThreadContext;
1712

18-
public class FileHandle implements IIOClosable, IIOSeekable, IIOEncodable, IIOSyncReadable, IIOSyncWritable {
19-
private FileChannel chan;
20-
private CharsetEncoder enc;
21-
private CharsetDecoder dec;
22-
private boolean eof = false;
23-
private ByteBuffer readBuffer;
13+
public class FileHandle extends SyncHandle implements IIOSeekable {
14+
15+
FileChannel fc;
2416

2517
public FileHandle(ThreadContext tc, String filename, String mode) {
2618
try {
2719
Path p = new File(filename).toPath();
2820
if (mode.equals("r")) {
29-
chan = FileChannel.open(p, StandardOpenOption.READ);
21+
fc = FileChannel.open(p, StandardOpenOption.READ);
3022
}
3123
else if (mode.equals("w")) {
32-
chan = FileChannel.open(p, StandardOpenOption.WRITE,
24+
fc = FileChannel.open(p, StandardOpenOption.WRITE,
3325
StandardOpenOption.CREATE,
3426
StandardOpenOption.TRUNCATE_EXISTING);
3527
}
3628
else if (mode.equals("wa")) {
37-
chan = FileChannel.open(p, StandardOpenOption.WRITE,
29+
fc = FileChannel.open(p, StandardOpenOption.WRITE,
3830
StandardOpenOption.CREATE,
3931
StandardOpenOption.APPEND);
4032
}
4133
else {
4234
ExceptionHandling.dieInternal(tc, "Unhandled file open mode '" + mode + "'");
4335
}
36+
chan = fc;
4437
setEncoding(tc, Charset.forName("UTF-8"));
4538
} catch (IOException e) {
4639
throw ExceptionHandling.dieInternal(tc, e);
4740
}
4841
}
4942

50-
public void close(ThreadContext tc) {
51-
try {
52-
chan.close();
53-
} catch (IOException e) {
54-
throw ExceptionHandling.dieInternal(tc, e);
55-
}
56-
}
57-
5843
public void seek(ThreadContext tc, long offset, long whence) {
5944
try {
6045
switch ((int)whence) {
6146
case 0:
62-
chan.position(offset);
47+
fc.position(offset);
6348
break;
6449
case 1:
65-
chan.position(chan.position() + offset);
50+
fc.position(fc.position() + offset);
6651
break;
6752
case 2:
68-
chan.position(chan.size());
53+
fc.position(fc.size());
6954
break;
7055
default:
7156
throw ExceptionHandling.dieInternal(tc, "Invalid seek mode");
@@ -77,151 +62,10 @@ public void seek(ThreadContext tc, long offset, long whence) {
7762

7863
public long tell(ThreadContext tc) {
7964
try {
80-
long position = chan.position();
65+
long position = fc.position();
8166
return readBuffer != null ? position - readBuffer.remaining() : position;
8267
} catch (IOException e) {
8368
throw ExceptionHandling.dieInternal(tc, e);
8469
}
8570
}
86-
87-
public void setEncoding(ThreadContext tc, Charset cs) {
88-
enc = cs.newEncoder();
89-
dec = cs.newDecoder();
90-
}
91-
92-
public synchronized String slurp(ThreadContext tc) {
93-
try {
94-
// Read in file.
95-
ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
96-
ByteBuffer curBuffer = ByteBuffer.allocate(32768);
97-
int total = 0;
98-
int read;
99-
if (readBuffer != null) {
100-
total = readBuffer.limit() - readBuffer.position();
101-
buffers.add(ByteBuffer.wrap(readBuffer.array(), readBuffer.position(), total));
102-
readBuffer = null;
103-
}
104-
while ((read = chan.read(curBuffer)) != -1) {
105-
curBuffer.flip();
106-
buffers.add(curBuffer);
107-
curBuffer = ByteBuffer.allocate(32768);
108-
total += read;
109-
}
110-
eof = true;
111-
112-
return decodeBuffers(buffers, total);
113-
} catch (IOException e) {
114-
throw ExceptionHandling.dieInternal(tc, e);
115-
}
116-
}
117-
118-
public synchronized String readline(ThreadContext tc) {
119-
try {
120-
boolean foundLine = false;
121-
ArrayList<ByteBuffer> lineChunks = new ArrayList<ByteBuffer>();
122-
int total = 0;
123-
124-
while (!foundLine) {
125-
/* Ensure we have a buffer available. */
126-
if (readBuffer == null) {
127-
readBuffer = ByteBuffer.allocate(32768);
128-
if (chan.read(readBuffer) == -1) {
129-
/* End of file, so what we have is fine. */
130-
eof = true;
131-
foundLine = true;
132-
readBuffer.flip();
133-
break;
134-
}
135-
readBuffer.flip();
136-
}
137-
138-
/* Look for a line end. */
139-
int start = readBuffer.position();
140-
int end = start;
141-
while (!foundLine && end < readBuffer.limit()) {
142-
if (readBuffer.get(end) == '\n')
143-
foundLine = true;
144-
end++;
145-
}
146-
147-
/* Copy what we found into the results. */
148-
byte[] lineBytes = new byte[end - start];
149-
readBuffer.get(lineBytes);
150-
lineChunks.add(ByteBuffer.wrap(lineBytes));
151-
total += lineBytes.length;
152-
153-
/* If we didn't find a line, will cross chunk boundary. */
154-
if (!foundLine)
155-
readBuffer = null;
156-
}
157-
158-
if (lineChunks.size() == 1)
159-
return dec.decode(lineChunks.get(0)).toString();
160-
else
161-
return decodeBuffers(lineChunks, total);
162-
} catch (IOException e) {
163-
throw ExceptionHandling.dieInternal(tc, e);
164-
}
165-
}
166-
167-
private String decodeBuffers(ArrayList<ByteBuffer> buffers, int total) throws IOException {
168-
// Copy to a single buffer and decode (could be smarter, but need
169-
// to be wary as UTF-8 chars may span a buffer boundary).
170-
ByteBuffer allBytes = ByteBuffer.allocate(total);
171-
for (ByteBuffer bb : buffers) {
172-
int amount = total < bb.limit() ? total : bb.limit();
173-
allBytes.put(bb.array(), 0, amount);
174-
total -= amount;
175-
}
176-
allBytes.rewind();
177-
return dec.decode(allBytes).toString();
178-
}
179-
180-
public boolean eof(ThreadContext tc) {
181-
return eof;
182-
}
183-
184-
public byte[] read(ThreadContext tc, int bytes) {
185-
try {
186-
ByteBuffer buffer = ByteBuffer.allocate(bytes);
187-
chan.read(buffer);
188-
buffer.flip();
189-
byte[] res = new byte[buffer.limit()];
190-
buffer.get(res);
191-
return res;
192-
} catch (IOException e) {
193-
throw ExceptionHandling.dieInternal(tc, e);
194-
}
195-
}
196-
197-
public void write(ThreadContext tc, byte[] array) {
198-
ByteBuffer buffer = ByteBuffer.wrap(array);
199-
write(tc, buffer);
200-
}
201-
202-
protected void write(ThreadContext tc, ByteBuffer buffer) {
203-
try {
204-
int toWrite = buffer.limit();
205-
int written = 0;
206-
while (written < toWrite) {
207-
written += chan.write(buffer);
208-
}
209-
} catch (IOException e) {
210-
throw ExceptionHandling.dieInternal(tc, e);
211-
}
212-
}
213-
214-
public void print(ThreadContext tc, String s) {
215-
try {
216-
ByteBuffer buffer = enc.encode(CharBuffer.wrap(s));
217-
write(tc, buffer);
218-
} catch (IOException e) {
219-
throw ExceptionHandling.dieInternal(tc, e);
220-
}
221-
}
222-
223-
public void say(ThreadContext tc, String s) {
224-
print(tc, s);
225-
print(tc, System.lineSeparator());
226-
}
22771
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.perl6.nqp.io;
2+
3+
import org.perl6.nqp.runtime.ThreadContext;
4+
5+
public interface IIOBindable {
6+
7+
public void bind(ThreadContext tc, String host, int port);
8+
public SocketHandle accept(ThreadContext tc);
9+
10+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.perl6.nqp.io;
2+
3+
import org.perl6.nqp.runtime.ThreadContext;
4+
5+
public interface IIOLineSeparable {
6+
public void setInputLineSeparator(ThreadContext tc, String sep);
7+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.perl6.nqp.io;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.io.InputStream;
6+
import java.io.OutputStream;
7+
import java.lang.ProcessBuilder.Redirect;
8+
import java.nio.ByteBuffer;
9+
import java.nio.channels.ByteChannel;
10+
import java.nio.channels.Channels;
11+
import java.nio.channels.ReadableByteChannel;
12+
import java.nio.channels.WritableByteChannel;
13+
import java.nio.charset.Charset;
14+
import java.util.Map;
15+
16+
import org.perl6.nqp.runtime.ExceptionHandling;
17+
import org.perl6.nqp.runtime.ThreadContext;
18+
19+
public class ProcessHandle extends SyncHandle {
20+
21+
Process process;
22+
23+
public ProcessHandle(ThreadContext tc, String cmd, String dir, Map<String, String> env) {
24+
ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
25+
pb.directory(new File(dir));
26+
pb.redirectErrorStream(true);
27+
28+
// Clear the JVM inherited environment and use provided only
29+
Map<String, String> pbEnv = pb.environment();
30+
pbEnv.clear();
31+
pbEnv.putAll(env);
32+
33+
try {
34+
process = pb.start();
35+
chan = new ProcessChannel(process.getOutputStream(), process.getInputStream());
36+
setEncoding(tc, Charset.forName("UTF-8"));
37+
} catch (IOException e) {
38+
throw ExceptionHandling.dieInternal(tc, e);
39+
}
40+
}
41+
42+
static class ProcessChannel implements ByteChannel {
43+
protected WritableByteChannel stdin;
44+
protected ReadableByteChannel stdout;
45+
46+
public ProcessChannel(OutputStream stdin, InputStream stdout) {
47+
this.stdin = Channels.newChannel(stdin);
48+
this.stdout = Channels.newChannel(stdout);
49+
}
50+
51+
public int read(ByteBuffer dst) throws IOException {
52+
return stdout.read(dst);
53+
}
54+
55+
public boolean isOpen() {
56+
return stdin.isOpen();
57+
}
58+
59+
public void close() throws IOException {
60+
stdin.close();
61+
stdout.close();
62+
}
63+
64+
public int write(ByteBuffer src) throws IOException {
65+
return stdin.write(src);
66+
}
67+
}
68+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.perl6.nqp.io;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.ServerSocketChannel;
6+
import java.nio.channels.SocketChannel;
7+
8+
import org.perl6.nqp.runtime.ExceptionHandling;
9+
import org.perl6.nqp.runtime.ThreadContext;
10+
11+
public class ServerSocketHandle implements IIOBindable {
12+
13+
ServerSocketChannel listenChan;
14+
15+
public ServerSocketHandle(ThreadContext tc) {
16+
try {
17+
listenChan = ServerSocketChannel.open();
18+
} catch (IOException e) {
19+
ExceptionHandling.dieInternal(tc, e);
20+
}
21+
}
22+
23+
public void bind(ThreadContext tc, String host, int port) {
24+
try {
25+
InetSocketAddress addr = new InetSocketAddress(host, port);
26+
listenChan.bind(addr);
27+
} catch (IOException e) {
28+
throw ExceptionHandling.dieInternal(tc, e);
29+
}
30+
}
31+
32+
public SocketHandle accept(ThreadContext tc) {
33+
try {
34+
SocketChannel chan = listenChan.accept();
35+
return chan == null ? null : new SocketHandle(tc, chan);
36+
} catch (IOException e) {
37+
throw ExceptionHandling.dieInternal(tc, e);
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)