Skip to content

Commit aa5329e

Browse files
8231187: SelectorProvider.inheritedChannel() returns TCP socket channel for Unix domain socket
Reviewed-by: alanb, chegar
1 parent f740058 commit aa5329e

File tree

9 files changed

+577
-18
lines changed

9 files changed

+577
-18
lines changed

src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2011, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -27,11 +27,16 @@
2727

2828
import java.io.IOException;
2929
import java.nio.channels.spi.AbstractSelector;
30+
import java.nio.channels.*;
3031

3132
public class KQueueSelectorProvider
3233
extends SelectorProviderImpl
3334
{
3435
public AbstractSelector openSelector() throws IOException {
3536
return new KQueueSelectorImpl(this);
3637
}
38+
39+
public Channel inheritedChannel() throws IOException {
40+
return InheritedChannel.getChannel();
41+
}
3742
}

src/java.base/unix/classes/sun/nio/ch/InheritedChannel.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2003, 2014, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -43,6 +43,12 @@ class InheritedChannel {
4343
private static final int SOCK_STREAM = 1;
4444
private static final int SOCK_DGRAM = 2;
4545

46+
// socket address type
47+
private static final int AF_UNKNOWN = -1;
48+
private static final int AF_INET = 1;
49+
private static final int AF_INET6 = 2;
50+
private static final int AF_UNIX = 3;
51+
4652
// oflag values when opening a file
4753
private static final int O_RDONLY = 0;
4854
private static final int O_WRONLY = 1;
@@ -89,6 +95,20 @@ protected void implCloseSelectableChannel() throws IOException {
8995
}
9096
}
9197

98+
public static class InheritedUnixChannelImpl extends UnixDomainSocketChannelImpl {
99+
100+
InheritedUnixChannelImpl(FileDescriptor fd)
101+
throws IOException
102+
{
103+
super(fd);
104+
}
105+
106+
protected void implCloseSelectableChannel() throws IOException {
107+
super.implCloseChannel();
108+
detachIOStreams();
109+
}
110+
}
111+
92112
public static class InheritedServerSocketChannelImpl extends
93113
ServerSocketChannelImpl {
94114

@@ -160,7 +180,6 @@ private static Channel createChannel() throws IOException {
160180
return null;
161181
}
162182

163-
164183
// Next we create a FileDescriptor for the dup'ed file descriptor
165184
// Have to use reflection and also make assumption on how FD
166185
// is implemented.
@@ -182,6 +201,17 @@ private static Channel createChannel() throws IOException {
182201

183202
Channel c;
184203
if (st == SOCK_STREAM) {
204+
int family = addressFamily(fdVal);
205+
if (family == AF_UNKNOWN)
206+
return null;
207+
if (family == AF_UNIX) {
208+
if (isConnected(fdVal)) {
209+
return new InheritedUnixChannelImpl(fd);
210+
} else {
211+
// listener. unsupported.
212+
return null;
213+
}
214+
}
185215
InetAddress ia = peerAddress0(fdVal);
186216
if (ia == null) {
187217
c = new InheritedServerSocketChannelImpl(provider, fd);
@@ -232,9 +262,13 @@ public static synchronized Channel getChannel() throws IOException {
232262
private static native int open0(String path, int oflag) throws IOException;
233263
private static native void close0(int fd) throws IOException;
234264
private static native int soType0(int fd);
265+
private static native int addressFamily(int fd);
235266
private static native InetAddress peerAddress0(int fd);
236267
private static native int peerPort0(int fd);
237268

269+
// return true if socket is connected to a peer
270+
private static native boolean isConnected(int fd);
271+
238272
static {
239273
IOUtil.load();
240274
initIDs();
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
26+
package sun.nio.ch;
27+
28+
import java.io.FileDescriptor;
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.nio.channels.AsynchronousCloseException;
32+
import java.nio.channels.ByteChannel;
33+
import java.nio.channels.ClosedChannelException;
34+
import java.nio.channels.NotYetConnectedException;
35+
import java.nio.channels.spi.AbstractInterruptibleChannel;
36+
import java.util.Objects;
37+
import java.util.concurrent.locks.ReentrantLock;
38+
39+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
40+
41+
class UnixDomainSocketChannelImpl
42+
extends AbstractInterruptibleChannel
43+
implements ByteChannel
44+
{
45+
// Used to make native read and write calls
46+
private static final NativeDispatcher nd = new SocketDispatcher();
47+
48+
// Our file descriptor object
49+
private final FileDescriptor fd;
50+
// Lock held by current reading or connecting thread
51+
private final ReentrantLock readLock = new ReentrantLock();
52+
53+
// Lock held by current writing or connecting thread
54+
private final ReentrantLock writeLock = new ReentrantLock();
55+
56+
// Lock for managing close state
57+
private final Object stateLock = new Object();
58+
59+
// Channel state
60+
private static final int ST_INUSE = 0;
61+
private static final int ST_CLOSING = 1;
62+
private static final int ST_CLOSED = 2;
63+
private int state;
64+
65+
// IDs of native threads doing reads and writes, for signalling
66+
private long readerThread;
67+
private long writerThread;
68+
69+
UnixDomainSocketChannelImpl(FileDescriptor fd)
70+
throws IOException
71+
{
72+
this.fd = fd;
73+
}
74+
75+
/**
76+
* Checks that the channel is open.
77+
*
78+
* @throws ClosedChannelException if channel is closed (or closing)
79+
*/
80+
private void ensureOpen() throws ClosedChannelException {
81+
if (!isOpen())
82+
throw new ClosedChannelException();
83+
}
84+
85+
/**
86+
* Closes the socket if there are no I/O operations in progress
87+
*/
88+
private boolean tryClose() throws IOException {
89+
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
90+
if (readerThread == 0 && writerThread == 0) {
91+
state = ST_CLOSED;
92+
nd.close(fd);
93+
return true;
94+
} else {
95+
return false;
96+
}
97+
}
98+
99+
/**
100+
* Complete closure of pre-closed socket (release the file descriptor)
101+
*/
102+
private void tryFinishClose() {
103+
try {
104+
tryClose();
105+
} catch (IOException ignore) { }
106+
}
107+
108+
/**
109+
* Marks the beginning of a read operation
110+
*
111+
* @throws ClosedChannelException if the channel is closed
112+
* @throws NotYetConnectedException if the channel is not yet connected
113+
*/
114+
private void beginRead() throws ClosedChannelException {
115+
// set hook for Thread.interrupt
116+
begin();
117+
synchronized (stateLock) {
118+
ensureOpen();
119+
readerThread = NativeThread.current();
120+
}
121+
}
122+
123+
/**
124+
* Marks the end of a read operation that may have blocked.
125+
*
126+
* @throws AsynchronousCloseException if the channel was closed due to this
127+
* thread being interrupted on a blocking read operation.
128+
*/
129+
private void endRead(boolean completed)
130+
throws AsynchronousCloseException
131+
{
132+
synchronized (stateLock) {
133+
readerThread = 0;
134+
if (state == ST_CLOSING) {
135+
tryFinishClose();
136+
}
137+
}
138+
end(completed);
139+
}
140+
141+
@Override
142+
public int read(ByteBuffer buf) throws IOException {
143+
Objects.requireNonNull(buf);
144+
145+
readLock.lock();
146+
try {
147+
int n = 0;
148+
try {
149+
beginRead();
150+
n = IOUtil.read(fd, buf, -1, nd);
151+
while (IOStatus.okayToRetry(n) && isOpen()) {
152+
park(Net.POLLIN, 0L);
153+
n = IOUtil.read(fd, buf, -1, nd);
154+
}
155+
} finally {
156+
endRead(n > 0);
157+
}
158+
return n;
159+
} finally {
160+
readLock.unlock();
161+
}
162+
}
163+
164+
/**
165+
* Marks the beginning of a write operation that might block.
166+
*
167+
* @throws ClosedChannelException if the channel is closed
168+
* @throws NotYetConnectedException if the channel is not yet connected
169+
*/
170+
private void beginWrite() throws ClosedChannelException {
171+
begin();
172+
synchronized (stateLock) {
173+
// set hook for Thread.interrupt
174+
ensureOpen();
175+
writerThread = NativeThread.current();
176+
}
177+
}
178+
179+
/**
180+
* Marks the end of a write operation that may have blocked.
181+
*
182+
* @throws AsynchronousCloseException if the channel was closed due to this
183+
* thread being interrupted on a blocking write operation.
184+
*/
185+
private void endWrite(boolean completed)
186+
throws AsynchronousCloseException
187+
{
188+
synchronized (stateLock) {
189+
writerThread = 0;
190+
if (state == ST_CLOSING) {
191+
tryFinishClose();
192+
}
193+
}
194+
end(completed);
195+
}
196+
197+
void park(int event, long nanos) throws IOException {
198+
long millis;
199+
if (nanos <= 0) {
200+
millis = -1;
201+
} else {
202+
millis = NANOSECONDS.toMillis(nanos);
203+
}
204+
Net.poll(fd, event, millis);
205+
}
206+
207+
@Override
208+
public int write(ByteBuffer buf) throws IOException {
209+
Objects.requireNonNull(buf);
210+
211+
writeLock.lock();
212+
try {
213+
int n = 0;
214+
try {
215+
beginWrite();
216+
n = IOUtil.write(fd, buf, -1, nd);
217+
while (IOStatus.okayToRetry(n) && isOpen()) {
218+
park(Net.POLLOUT, 0L);
219+
n = IOUtil.write(fd, buf, -1, nd);
220+
}
221+
} finally {
222+
endWrite(n > 0);
223+
}
224+
return n;
225+
} finally {
226+
writeLock.unlock();
227+
}
228+
}
229+
230+
/**
231+
* Closes this channel
232+
*
233+
* If there is an I/O operation in progress then the socket is pre-closed
234+
* and the I/O threads signalled, in which case the final close is deferred
235+
* until all I/O operations complete.
236+
*/
237+
@Override
238+
protected void implCloseChannel() throws IOException {
239+
synchronized (stateLock) {
240+
assert state == ST_INUSE;
241+
state = ST_CLOSING;
242+
if (!tryClose()) {
243+
long reader = readerThread;
244+
long writer = writerThread;
245+
if (reader != 0 || writer != 0) {
246+
nd.preClose(fd);
247+
if (reader != 0)
248+
NativeThread.signal(reader);
249+
if (writer != 0)
250+
NativeThread.signal(writer);
251+
}
252+
}
253+
}
254+
}
255+
256+
@Override
257+
public String toString() {
258+
StringBuilder sb = new StringBuilder();
259+
sb.append(this.getClass().getSuperclass().getName());
260+
sb.append('[');
261+
if (!isOpen())
262+
sb.append("closed");
263+
sb.append(']');
264+
return sb.toString();
265+
}
266+
}

0 commit comments

Comments
 (0)