Skip to content

Commit 4c19ca6

Browse files
authored
Merge pull request #3604 from rahulrangers/selector-datagram
Added Cats-Effect selector-based Datagramsocket implementation for JVM
2 parents 6a5d774 + 65d24a5 commit 4c19ca6

File tree

3 files changed

+245
-1
lines changed

3 files changed

+245
-1
lines changed

io/jvm/src/main/scala/fs2/io/net/NetworkPlatform.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
9494
case None => orElse
9595
}
9696

97+
private def selectingDatagram[A](
98+
ifSelecting: SelectingIpDatagramSocketsProvider[F] => Resource[F, A],
99+
orElse: => Resource[F, A]
100+
): Resource[F, A] =
101+
Resource.eval(tryGetSelector).flatMap {
102+
case Some(selector) => ifSelecting(new SelectingIpDatagramSocketsProvider(selector))
103+
case None => orElse
104+
}
105+
97106
override def connect(
98107
address: GenSocketAddress,
99108
options: List[SocketOption]
@@ -118,7 +127,15 @@ private[net] trait NetworkCompanionPlatform extends NetworkLowPriority { self: N
118127
address: GenSocketAddress,
119128
options: List[SocketOption]
120129
): Resource[F, DatagramSocket[F]] =
121-
fallback.bindDatagramSocket(address, options)
130+
matchAddress(
131+
address,
132+
sa =>
133+
selectingDatagram(
134+
_.bindDatagramSocket(sa, options),
135+
fallback.bindDatagramSocket(sa, options)
136+
),
137+
ua => fallback.bindDatagramSocket(ua, options)
138+
)
122139

123140
// Implementations of deprecated operations
124141

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package net
25+
26+
import cats.effect.{LiftIO, Selector}
27+
import cats.effect.kernel.Async
28+
import cats.effect.std.Mutex
29+
import cats.syntax.all._
30+
import com.comcast.ip4s.{IpAddress, SocketAddress, GenSocketAddress, NetworkInterface}
31+
32+
import java.net.InetSocketAddress
33+
import java.nio.ByteBuffer
34+
import java.nio.channels.{DatagramChannel, SelectionKey}
35+
import java.net.{NetworkInterface => JNetworkInterface}
36+
import com.comcast.ip4s.MulticastJoin
37+
import CollectionCompat.*
38+
39+
private final class SelectingDatagramSocket[F[_]: LiftIO] private (
40+
selector: Selector,
41+
ch: DatagramChannel,
42+
readMutex: Mutex[F],
43+
writeMutex: Mutex[F],
44+
override val address: SocketAddress[IpAddress]
45+
)(implicit F: Async[F])
46+
extends DatagramSocket[F] {
47+
48+
private[this] val bufferSize = 1 << 16
49+
50+
def localAddress: F[SocketAddress[IpAddress]] =
51+
F.pure(address)
52+
53+
def read: F[Datagram] =
54+
readMutex.lock.surround {
55+
val buf = ByteBuffer.allocate(bufferSize)
56+
57+
def go: F[Datagram] =
58+
F.delay(ch.receive(buf)).flatMap {
59+
case null =>
60+
selector.select(ch, SelectionKey.OP_READ).to *> go
61+
case src =>
62+
F.delay {
63+
buf.flip()
64+
val bytes = new Array[Byte](buf.remaining())
65+
buf.get(bytes)
66+
buf.clear()
67+
Datagram(
68+
SocketAddress.fromInetSocketAddress(src.asInstanceOf[InetSocketAddress]),
69+
Chunk.array(bytes)
70+
)
71+
}
72+
}
73+
go
74+
}
75+
76+
def readGen: F[GenDatagram] =
77+
read.map(_.toGenDatagram)
78+
79+
def reads: Stream[F, Datagram] =
80+
Stream.repeatEval(read)
81+
82+
private def write0(bytes: Chunk[Byte], addr: Option[InetSocketAddress]): F[Unit] =
83+
writeMutex.lock.surround {
84+
val buf = bytes.toByteBuffer
85+
86+
def go: F[Unit] =
87+
F.delay {
88+
addr match {
89+
case Some(a) => ch.send(buf, a)
90+
case None => ch.write(buf)
91+
}
92+
}.flatMap { _ =>
93+
if (buf.hasRemaining) selector.select(ch, SelectionKey.OP_WRITE).to[F] *> go
94+
else F.unit
95+
}
96+
97+
go
98+
}
99+
100+
def write(bytes: Chunk[Byte], address: GenSocketAddress): F[Unit] =
101+
write0(bytes, Some(address.asIpUnsafe.toInetSocketAddress))
102+
103+
def write(bytes: Chunk[Byte]): F[Unit] =
104+
write0(bytes, None)
105+
106+
def writes: Pipe[F, Datagram, Nothing] =
107+
_.evalMap(write).drain
108+
109+
def connect(addr: GenSocketAddress): F[Unit] =
110+
F.delay(ch.connect(addr.asIpUnsafe.toInetSocketAddress)).void
111+
112+
def disconnect: F[Unit] =
113+
F.delay(ch.disconnect()).void
114+
115+
def getOption[A](key: java.net.SocketOption[A]): F[Option[A]] =
116+
F.delay(Option(ch.getOption(key)))
117+
118+
def setOption[A](key: java.net.SocketOption[A], value: A): F[Unit] =
119+
F.delay(ch.setOption(key, value)).void
120+
121+
override def join(
122+
join: MulticastJoin[IpAddress],
123+
interface: NetworkInterface
124+
): F[GroupMembership] =
125+
F.delay {
126+
val jinterface = JNetworkInterface.getByName(interface.name)
127+
val membership = join.fold(
128+
j => ch.join(j.group.address.toInetAddress, jinterface),
129+
j => ch.join(j.group.address.toInetAddress, jinterface, j.source.toInetAddress)
130+
)
131+
new GroupMembership {
132+
def drop = F.delay(membership.drop)
133+
def block(source: IpAddress) =
134+
F.delay { membership.block(source.toInetAddress); () }
135+
def unblock(source: IpAddress) =
136+
F.delay { membership.unblock(source.toInetAddress); () }
137+
override def toString = "GroupMembership"
138+
}
139+
}
140+
141+
override def join(
142+
j: MulticastJoin[IpAddress],
143+
interface: JNetworkInterface
144+
): F[GroupMembership] =
145+
join(j, NetworkInterface.fromJava(interface))
146+
147+
override def supportedOptions: F[Set[SocketOption.Key[?]]] =
148+
F.delay {
149+
ch.supportedOptions.asScala.toSet
150+
}
151+
152+
}
153+
154+
private object SelectingDatagramSocket {
155+
def apply[F[_]: LiftIO](
156+
selector: Selector,
157+
ch: DatagramChannel,
158+
local: SocketAddress[IpAddress]
159+
)(implicit F: Async[F]): F[DatagramSocket[F]] =
160+
(Mutex[F], Mutex[F]).flatMapN { (readM, writeM) =>
161+
F.delay {
162+
new SelectingDatagramSocket[F](selector, ch, readM, writeM, local)
163+
}
164+
}
165+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package net
25+
26+
import cats.effect.{Async, LiftIO, Resource, Selector}
27+
import cats.syntax.all._
28+
import com.comcast.ip4s.{Dns, Host, SocketAddress}
29+
30+
import java.net.InetSocketAddress
31+
32+
private final class SelectingIpDatagramSocketsProvider[F[_]](selector: Selector)(implicit
33+
F: Async[F],
34+
F2: LiftIO[F],
35+
F3: Dns[F]
36+
) extends IpDatagramSocketsProvider[F] {
37+
38+
override def bindDatagramSocket(
39+
address: SocketAddress[Host],
40+
options: List[SocketOption]
41+
): Resource[F, DatagramSocket[F]] =
42+
Resource
43+
.make(F.delay(selector.provider.openDatagramChannel()))(ch => F.delay(ch.close()))
44+
.evalMap { ch =>
45+
address.host.resolve[F].flatMap { addr =>
46+
val jAddr = new InetSocketAddress(addr.toInetAddress, address.port.value)
47+
F.delay {
48+
ch.configureBlocking(false)
49+
ch.bind(jAddr)
50+
options.foreach(opt => ch.setOption(opt.key, opt.value))
51+
} *> F
52+
.delay {
53+
val isa = ch.getLocalAddress.asInstanceOf[InetSocketAddress]
54+
val inet = isa.getAddress
55+
new InetSocketAddress(inet, isa.getPort)
56+
}
57+
.flatMap(local =>
58+
SelectingDatagramSocket(selector, ch, SocketAddress.fromInetSocketAddress(local))
59+
)
60+
}
61+
}
62+
}

0 commit comments

Comments
 (0)