Skip to content

Commit d08607e

Browse files
committed
WIP
1 parent 75304dd commit d08607e

File tree

4 files changed

+36
-28
lines changed

4 files changed

+36
-28
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ lazy val core: Project = (project in file("core"))
2020
.settings(
2121
name := "core",
2222
libraryDependencies ++= Seq(
23+
"org.jctools" % "jctools-core" % "4.0.1",
2324
scalaTest
2425
)
2526
)

core/src/main/scala/ox/channels2/Channel.scala

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,45 @@
11
package ox.channels2
22

3-
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentSkipListSet, CountDownLatch}
3+
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentSkipListSet, CountDownLatch, Semaphore, SynchronousQueue}
44
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicReferenceArray}
5+
import java.util.concurrent.locks.LockSupport
56
import scala.annotation.tailrec
67

78
class Channel[T]:
89
private val senders = new AtomicLong(0L)
910
private val receivers = new AtomicLong(0L)
1011
private val buffer: AtomicReferenceArray[State] = AtomicReferenceArray[State](20_000_000) // TODO
11-
12+
1213
//
1314

1415
private object Interrupted
1516
private class Continuation[E]():
16-
private val q = new ArrayBlockingQueue[E | Interrupted.type](1)
17+
private val creatingThread = Thread.currentThread()
18+
private val data = new AtomicReference[E | Interrupted.type | Null]()
1719

18-
def tryResume(e: E): Boolean = q.offer(e)
20+
def tryResume(e: E): Boolean =
21+
val result = data.compareAndSet(null, e)
22+
LockSupport.unpark(creatingThread)
23+
result
1924

2025
def await(onInterrupt: () => Unit): E =
21-
try
22-
q.take().asInstanceOf[E] // can't be anything else
23-
catch
24-
case e: InterruptedException =>
25-
if q.offer(Interrupted)
26-
then
27-
try onInterrupt()
28-
catch case ee: Throwable => e.addSuppressed(ee)
29-
throw e
30-
else
31-
Thread.currentThread().interrupt() // propagating the interruption to the next blocking call
32-
q.poll().asInstanceOf[E] // another thread has just inserted E
26+
var x = 0
27+
while data.get() == null do
28+
if x <= 0 then
29+
Thread.`yield`()
30+
x += 1
31+
else LockSupport.park()
32+
33+
if Thread.interrupted() then
34+
data.compareAndSet(null, Interrupted) // TODO if
35+
val e = new InterruptedException()
36+
37+
try onInterrupt()
38+
catch case ee: Throwable => e.addSuppressed(ee)
39+
40+
throw e
41+
42+
data.get().asInstanceOf[E] // another thread has just inserted E
3343

3444
//
3545

@@ -134,4 +144,4 @@ class Channel[T]:
134144
fork(s.add(c.receive()))
135145
}
136146
}
137-
println(s.size)
147+
println(s.size)

core/src/main/scala/ox/channels2/perf/impls.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,8 @@ package ox.channels2.perf
33
import ox.*
44
import ox.channels2.*
55

6+
import java.util.concurrent.SynchronousQueue
67
import java.util.concurrent.atomic.{AtomicLong, AtomicReferenceArray}
7-
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedDeque, Executors}
8-
import scala.annotation.tailrec
9-
import scala.concurrent.Await
10-
import scala.concurrent.duration.*
118

129
def usingOx(): Unit =
1310
val max = 10_000_000
@@ -42,7 +39,7 @@ def passingValues(): Unit =
4239
}
4340

4441
val max = 10_000_000
45-
val st = new AtomicReferenceArray[ArrayBlockingQueue[String]](max)
42+
val st = new AtomicReferenceArray[SynchronousQueue[String]](max)
4643
val s = new AtomicLong(0)
4744
val r = new AtomicLong(0)
4845
timed("passingValues") {
@@ -51,23 +48,23 @@ def passingValues(): Unit =
5148
for (i <- 0 until max) {
5249
s.incrementAndGet()
5350
r.get()
54-
val q = new ArrayBlockingQueue[String](1)
51+
val q = new SynchronousQueue[String]()
5552
val qq = if st.compareAndSet(i, null, q) then
5653
q.take()
5754
else
58-
st.get(i).offer("sender")
55+
st.get(i).put("sender")
5956
}
6057
}
6158

6259
fork {
6360
for (i <- 0 until max) {
6461
r.incrementAndGet()
6562
s.get()
66-
val q = new ArrayBlockingQueue[String](1)
63+
val q = new SynchronousQueue[String]()
6764
val qq = if st.compareAndSet(i, null, q) then
6865
q.take()
6966
else
70-
st.get(i).offer("receiver")
67+
st.get(i).put("receiver")
7168
}
7269
}.join()
7370
}

core/src/main/scala/ox/channels2/perf/tests.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package ox.channels2.perf
33
@main def run(): Unit =
44
for (i <- 1 to 3) {
55
println(s"Run $i")
6-
// usingOx()
7-
passingValues()
6+
usingOx()
7+
// passingValues()
88
// usingThreads()
99
}

0 commit comments

Comments
 (0)