@@ -47,8 +47,8 @@ public static void main(String[] args) throws Exception {
47
47
48
48
static void test () throws Exception {
49
49
ExecutorService threadPool = Executors .newCachedThreadPool ();
50
- try (Reactor r = new Reactor ();
51
- Actor a = new Actor (r .getSocketAddress ())
50
+ try (Responder r = new Responder ();
51
+ Initiator a = new Initiator (r .getSocketAddress ())
52
52
) {
53
53
invoke (threadPool , a , r );
54
54
} finally {
@@ -75,12 +75,25 @@ private static void wait(CompletableFuture<?>... futures) throws CompletionExcep
75
75
future .join ();
76
76
}
77
77
78
- public static class Actor implements AutoCloseable , Runnable {
79
- final SocketAddress socketAddress ;
78
+ private static SocketAddress toConnectAddress (SocketAddress address ) {
79
+ if (address instanceof InetSocketAddress ) {
80
+ var inet = (InetSocketAddress ) address ;
81
+ if (inet .getAddress ().isAnyLocalAddress ()) {
82
+ // if the peer is bound to the wildcard address, use
83
+ // the loopback address to connect.
84
+ var loopback = InetAddress .getLoopbackAddress ();
85
+ return new InetSocketAddress (loopback , inet .getPort ());
86
+ }
87
+ }
88
+ return address ;
89
+ }
90
+
91
+ public static class Initiator implements AutoCloseable , Runnable {
92
+ final SocketAddress connectSocketAddress ;
80
93
final DatagramChannel dc ;
81
94
82
- Actor (SocketAddress socketAddress ) throws IOException {
83
- this .socketAddress = socketAddress ;
95
+ Initiator (SocketAddress peerSocketAddress ) throws IOException {
96
+ this .connectSocketAddress = toConnectAddress ( peerSocketAddress ) ;
84
97
dc = DatagramChannel .open ();
85
98
}
86
99
@@ -89,37 +102,38 @@ public void run() {
89
102
ByteBuffer bb = ByteBuffer .allocateDirect (256 );
90
103
bb .put ("hello" .getBytes ());
91
104
bb .flip ();
92
- dc .connect (socketAddress );
105
+ log .println ("Initiator connecting to " + connectSocketAddress );
106
+ dc .connect (connectSocketAddress );
93
107
94
108
// Send a message
95
- log .println ("Actor attempting to write to Reactor at " + socketAddress .toString ());
109
+ log .println ("Initiator attempting to write to Responder at " + connectSocketAddress .toString ());
96
110
dc .write (bb );
97
111
98
112
// Try to send to some other address
99
113
try {
100
114
int port = dc .socket ().getLocalPort ();
101
115
InetAddress loopback = InetAddress .getLoopbackAddress ();
102
116
InetSocketAddress otherAddress = new InetSocketAddress (loopback , (port == 3333 ? 3332 : 3333 ));
103
- log .println ("Testing if Actor throws AlreadyConnectedException" + otherAddress .toString ());
117
+ log .println ("Testing if Initiator throws AlreadyConnectedException" + otherAddress .toString ());
104
118
dc .send (bb , otherAddress );
105
- throw new RuntimeException ("Actor allowed send to other address while already connected" );
119
+ throw new RuntimeException ("Initiator allowed send to other address while already connected" );
106
120
} catch (AlreadyConnectedException ace ) {
107
121
// Correct behavior
108
122
}
109
123
110
124
// Read a reply
111
125
bb .flip ();
112
- log .println ("Actor waiting to read" );
126
+ log .println ("Initiator waiting to read" );
113
127
dc .read (bb );
114
128
bb .flip ();
115
129
CharBuffer cb = StandardCharsets .US_ASCII .
116
130
newDecoder ().decode (bb );
117
- log .println ("Actor received from Reactor at " + socketAddress + ": " + cb );
131
+ log .println ("Initiator received from Responder at " + connectSocketAddress + ": " + cb );
118
132
} catch (Exception ex ) {
119
- log .println ("Actor threw exception: " + ex );
133
+ log .println ("Initiator threw exception: " + ex );
120
134
throw new RuntimeException (ex );
121
135
} finally {
122
- log .println ("Actor finished" );
136
+ log .println ("Initiator finished" );
123
137
}
124
138
}
125
139
@@ -129,11 +143,12 @@ public void close() throws IOException {
129
143
}
130
144
}
131
145
132
- public static class Reactor implements AutoCloseable , Runnable {
146
+ public static class Responder implements AutoCloseable , Runnable {
133
147
final DatagramChannel dc ;
134
148
135
- Reactor () throws IOException {
136
- dc = DatagramChannel .open ().bind (new InetSocketAddress (InetAddress .getLoopbackAddress (), 0 ));
149
+ Responder () throws IOException {
150
+ var address = new InetSocketAddress (InetAddress .getLoopbackAddress (), 0 );
151
+ dc = DatagramChannel .open ().bind (address );
137
152
}
138
153
139
154
SocketAddress getSocketAddress () throws IOException {
@@ -144,23 +159,23 @@ public void run() {
144
159
try {
145
160
// Listen for a message
146
161
ByteBuffer bb = ByteBuffer .allocateDirect (100 );
147
- log .println ("Reactor waiting to receive" );
162
+ log .println ("Responder waiting to receive" );
148
163
SocketAddress sa = dc .receive (bb );
149
164
bb .flip ();
150
165
CharBuffer cb = StandardCharsets .US_ASCII .
151
166
newDecoder ().decode (bb );
152
- log .println ("Reactor received from Actor at" + sa + ": " + cb );
167
+ log .println ("Responder received from Initiator at" + sa + ": " + cb );
153
168
154
169
// Reply to sender
155
170
dc .connect (sa );
156
171
bb .flip ();
157
- log .println ("Reactor attempting to write: " + dc .getRemoteAddress ().toString ());
172
+ log .println ("Responder attempting to write: " + dc .getRemoteAddress ().toString ());
158
173
dc .write (bb );
159
174
} catch (Exception ex ) {
160
- log .println ("Reactor threw exception: " + ex );
175
+ log .println ("Responder threw exception: " + ex );
161
176
throw new RuntimeException (ex );
162
177
} finally {
163
- log .println ("Reactor finished" );
178
+ log .println ("Responder finished" );
164
179
}
165
180
}
166
181
0 commit comments