diff --git a/dependency-versions.gradle b/dependency-versions.gradle index 3edf26fce..de9aef705 100644 --- a/dependency-versions.gradle +++ b/dependency-versions.gradle @@ -55,6 +55,7 @@ dependencyManagement { dependency('io.opentelemetry:opentelemetry-extension-trace-propagators:1.2.0') dependency('io.opentelemetry:opentelemetry-proto:1.7.1-alpha') dependency('io.opentelemetry:opentelemetry-sdk:1.36.0') + dependency('io.opentelemetry.semconv:opentelemetry-semconv-incubating:1.24.0-alpha') dependency('io.opentelemetry:opentelemetry-sdk-trace:1.36.0') dependency('io.opentelemetry:opentelemetry-sdk-metrics:1.36.0') dependency('io.opentelemetry:opentelemetry-sdk-testing:1.36.0') @@ -90,7 +91,7 @@ dependencyManagement { entry 'bcpkix-jdk15on' entry 'bcprov-jdk15on' } - dependencySet(group: 'org.eclipse.jetty', version: '12.0.7') { + dependencySet(group: 'org.eclipse.jetty', version: '11.0.20') { entry 'jetty-server' entry 'jetty-servlet' entry 'jetty-servlets' diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpDownstream.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpDownstream.kt index 1f3bb3735..447fda141 100644 --- a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpDownstream.kt +++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpDownstream.kt @@ -6,12 +6,11 @@ import io.vertx.core.Vertx import io.vertx.core.buffer.Buffer import io.vertx.core.net.NetServer import io.vertx.core.net.NetSocket -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import org.apache.tuweni.bytes.Bytes -import org.apache.tuweni.concurrent.coroutines.await import org.slf4j.LoggerFactory import kotlin.coroutines.CoroutineContext @@ -34,7 +33,7 @@ class TcpDownstream( val server = vertx.createNetServer() server.connectHandler { handleSocket(it) - }.listen(port, host).await() + }.listen(port, host).coAwait() tcpServer = server logger.info("Started downstream proxy server on $host:$port") } diff --git a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpUpstream.kt b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpUpstream.kt index 3080f3769..1fc3cc63f 100644 --- a/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpUpstream.kt +++ b/devp2p-proxy/src/main/kotlin/org/apache/tuweni/devp2p/proxy/TcpUpstream.kt @@ -6,6 +6,7 @@ import io.vertx.core.Vertx import io.vertx.core.buffer.Buffer import io.vertx.core.net.NetClient import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async @@ -33,7 +34,7 @@ class TcpUpstream( } override suspend fun handleRequest(message: Bytes): Bytes { - val socket = tcpclient!!.connect(port, host).await() + val socket = tcpclient!!.connect(port, host).coAwait() val result = AsyncResult.incomplete() socket.handler { @@ -44,7 +45,7 @@ class TcpUpstream( }.exceptionHandler { result.completeExceptionally(it) } - socket.write(Buffer.buffer(message.toArrayUnsafe())).await() + socket.write(Buffer.buffer(message.toArrayUnsafe())).coAwait() return result.await() } diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt index 0d2441ba2..5b9501197 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt @@ -9,6 +9,7 @@ import io.vertx.core.buffer.Buffer import io.vertx.core.datagram.DatagramPacket import io.vertx.core.net.SocketAddress import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import io.vertx.kotlin.coroutines.dispatcher import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineExceptionHandler @@ -335,7 +336,7 @@ internal class CoroutineDiscoveryService constructor( } fun start() = launch { - server.handler { receiveDatagram(it) }.listen(bindAddress.port(), bindAddress.host()).await() + server.handler { receiveDatagram(it) }.listen(bindAddress.port(), bindAddress.host()).coAwait() val endpoint = Endpoint( advertiseAddress ?: (server.localAddress()).host(), advertiseUdpPort ?: server.localAddress().port(), @@ -428,7 +429,7 @@ internal class CoroutineDiscoveryService constructor( override suspend fun shutdown() { if (shutdown.compareAndSet(false, true)) { logger.info("{}: shutdown", serviceDescriptor) - server.close().await() + server.close().coAwait() for (pending in awaitingPongs.values) { pending.complete(null) } @@ -916,6 +917,6 @@ internal class CoroutineDiscoveryService constructor( } private suspend fun sendPacket(address: SocketAddress, packet: Packet) { - server.send(Buffer.buffer(packet.encode().toArrayUnsafe()), address.port(), address.host()).await() + server.send(Buffer.buffer(packet.encode().toArrayUnsafe()), address.port(), address.host()).coAwait() } } diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt index ff34ff764..5a71080cb 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/PacketType.kt @@ -6,6 +6,8 @@ import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.bytes.Bytes32 import org.apache.tuweni.crypto.SECP256K1 +private const val MAX_VALUE: Byte = 0x7f + /** * DevP2P discovery packet types * @param typeId the byte representing the type @@ -86,7 +88,6 @@ internal enum class PacketType( }, ; companion object { - private const val MAX_VALUE: Byte = 0x7f private val INDEX = arrayOfNulls(MAX_VALUE.toInt()) init { @@ -100,7 +101,7 @@ internal enum class PacketType( } init { - require(typeId <= PacketType.MAX_VALUE) { "Packet typeId must be in range [0x00, 0x80)" } + require(typeId <= MAX_VALUE) { "Packet typeId must be in range [0x00, 0x80)" } } abstract fun decode( diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/DiscoveryV5Service.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/DiscoveryV5Service.kt index ba48a1988..bbbab0499 100644 --- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/DiscoveryV5Service.kt +++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/v5/DiscoveryV5Service.kt @@ -7,6 +7,7 @@ import io.vertx.core.buffer.Buffer import io.vertx.core.datagram.DatagramPacket import io.vertx.core.net.SocketAddress import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -176,14 +177,14 @@ internal class DefaultDiscoveryV5Service( private lateinit var receiveJob: Job override suspend fun start(): AsyncCompletion { - server.handler(this::receiveDatagram).listen(bindAddress.port, bindAddress.hostString).await() + server.handler(this::receiveDatagram).listen(bindAddress.port, bindAddress.hostString).coAwait() return bootstrap() } override suspend fun terminate() { if (started.compareAndSet(true, false)) { receiveJob.cancel() - server.close().await() + server.close().coAwait() } } @@ -191,23 +192,23 @@ internal class DefaultDiscoveryV5Service( override suspend fun addPeer(enr: EthereumNodeRecord, address: SocketAddress): AsyncCompletion { val session = sessions[address] - if (session == null) { + return if (session == null) { logger.trace("Creating new session for peer {}", enr) val handshakeSession = handshakes.computeIfAbsent(address) { addr -> createHandshake(addr, enr.publicKey(), enr) } - return asyncCompletion { + asyncCompletion { logger.trace("Handshake connection start {}", enr) handshakeSession.connect().await() logger.trace("Handshake connection done {}", enr) } } else { logger.trace("Session found for peer {}", enr) - return AsyncCompletion.completed() + AsyncCompletion.completed() } } private fun send(addr: SocketAddress, message: Bytes) { launch { - server.send(Buffer.buffer(message.toArrayUnsafe()), addr.port(), addr.host()).await() + server.send(Buffer.buffer(message.toArrayUnsafe()), addr.port(), addr.host()).coAwait() } } @@ -227,7 +228,7 @@ internal class DefaultDiscoveryV5Service( var session = sessions.get(packet.sender()) val size = Math.min(Packet.MAX_SIZE, packet.data().length()) val buffer = ByteBuffer.allocate(size) - packet.data().byteBuf.readBytes(buffer) + buffer.put(packet.data().bytes) buffer.flip() val message = Bytes.wrapByteBuffer(buffer) if (message.slice(0, 32) == whoAreYouHeader && session != null) { diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt index 3ed757448..6a14842a8 100644 --- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt +++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt @@ -6,6 +6,7 @@ import io.vertx.core.Vertx import io.vertx.core.buffer.Buffer import io.vertx.core.net.SocketAddress import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.apache.tuweni.bytes.Bytes @@ -61,7 +62,7 @@ internal class DiscoveryServiceTest { val reference = AsyncResult.incomplete() val client = vertx.createDatagramSocket().handler { res -> reference.complete(res.data()) - }.listen(0, "localhost").await() + }.listen(0, "localhost").coAwait() val clientEndpoint = Endpoint("192.168.1.1", 5678, 7654) val ping = PingPacket.create( clientKeyPair, @@ -70,10 +71,10 @@ internal class DiscoveryServiceTest { Endpoint(address), null, ) - client.send(Buffer.buffer(ping.encode().toArrayUnsafe()), address.port(), address.host()).await() + client.send(Buffer.buffer(ping.encode().toArrayUnsafe()), address.port(), address.host()).coAwait() val datagram = reference.await() val buffer = ByteBuffer.allocate(datagram.length()) - datagram.byteBuf.readBytes(buffer) + buffer.put(datagram.bytes) val pong = Packet.decodeFrom(buffer) as PongPacket assertEquals(discoveryService.nodeId, pong.nodeId) assertEquals(ping.hash, pong.pingHash) @@ -92,7 +93,7 @@ internal class DiscoveryServiceTest { reference.set(AsyncResult.incomplete()) val bootstrapClient = vertx.createDatagramSocket().handler { res -> reference.get().complete(res.data()) - }.listen(0, "127.0.0.1").await() + }.listen(0, "127.0.0.1").coAwait() val serviceKeyPair = SECP256K1.KeyPair.random() val peerRepository = EphemeralPeerRepository() @@ -113,7 +114,7 @@ internal class DiscoveryServiceTest { val datagram = reference.get().await() val buffer = ByteBuffer.allocate(datagram.length()) - datagram.byteBuf.readBytes(buffer) + buffer.put(datagram.bytes) val ping = Packet.decodeFrom(buffer) as PingPacket assertEquals(discoveryService.nodeId, ping.nodeId) assertEquals( @@ -132,7 +133,7 @@ internal class DiscoveryServiceTest { ) reference.set(AsyncResult.incomplete()) val address = SocketAddress.inetSocketAddress(discoveryService.localPort, "127.0.0.1") - bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).await() + bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).coAwait() val findNodesDatagram = reference.get().await() @@ -164,7 +165,7 @@ internal class DiscoveryServiceTest { val reference = AsyncResult.incomplete() val bootstrapClient = vertx.createDatagramSocket().handler { res -> reference.complete(res.data()) - }.listen(0, "localhost").await() + }.listen(0, "localhost").coAwait() val serviceKeyPair = SECP256K1.KeyPair.random() val peerRepository = EphemeralPeerRepository() @@ -184,7 +185,7 @@ internal class DiscoveryServiceTest { ) val datagram = reference.await() val buffer = ByteBuffer.allocate(datagram.length()) - datagram.byteBuf.readBytes(buffer) + buffer.put(datagram.bytes) val ping = Packet.decodeFrom(buffer) as PingPacket assertEquals(discoveryService.nodeId, ping.nodeId) assertEquals( @@ -202,7 +203,7 @@ internal class DiscoveryServiceTest { null, ) val address = SocketAddress.inetSocketAddress(discoveryService.localPort, "127.0.0.1") - bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).await() + bootstrapClient.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).coAwait() delay(1000) val bootstrapPeer = @@ -225,7 +226,7 @@ internal class DiscoveryServiceTest { val reference = AsyncResult.incomplete() val bootstrapClient = vertx.createDatagramSocket().handler { res -> reference.complete(res.data()) - }.listen(0, "localhost").await() + }.listen(0, "localhost").coAwait() val discoveryService = DiscoveryService.open( vertx, @@ -244,7 +245,7 @@ internal class DiscoveryServiceTest { val datagram = reference.await() val buffer = ByteBuffer.allocate(datagram.length()) - datagram.byteBuf.readBytes(buffer) + buffer.put(datagram.bytes) val ping = Packet.decodeFrom(buffer) as PingPacket assertEquals(discoveryService.nodeId, ping.nodeId) assertEquals( @@ -264,7 +265,7 @@ internal class DiscoveryServiceTest { reference.set(AsyncResult.incomplete()) val bootstrapClient = vertx.createDatagramSocket().handler { res -> reference.get().complete(res.data()) - }.listen(0, "localhost").await() + }.listen(0, "localhost").coAwait() val discoveryService = DiscoveryService.open( vertx, @@ -280,7 +281,7 @@ internal class DiscoveryServiceTest { val datagram1 = reference.get().await() reference.set(AsyncResult.incomplete()) val buffer1 = ByteBuffer.allocate(datagram1.length()) - datagram1.byteBuf.readBytes(buffer1) + buffer1.put(datagram1.bytes) val ping1 = Packet.decodeFrom(buffer1) as PingPacket assertEquals(discoveryService.nodeId, ping1.nodeId) assertEquals( @@ -290,7 +291,7 @@ internal class DiscoveryServiceTest { val datagram2 = reference.get().await() reference.set(AsyncResult.incomplete()) val buffer2 = ByteBuffer.allocate(datagram2.length()) - datagram2.byteBuf.readBytes(buffer2) + buffer2.put(datagram2.bytes) val ping2 = Packet.decodeFrom(buffer2) as PingPacket assertEquals(discoveryService.nodeId, ping2.nodeId) assertEquals( @@ -300,7 +301,7 @@ internal class DiscoveryServiceTest { val datagram3 = reference.get().await() reference.set(AsyncResult.incomplete()) val buffer3 = ByteBuffer.allocate(datagram3.length()) - datagram3.byteBuf.readBytes(buffer3) + buffer3.put(datagram3.bytes) val ping3 = Packet.decodeFrom(buffer3) as PingPacket assertEquals(discoveryService.nodeId, ping3.nodeId) assertEquals( @@ -328,18 +329,18 @@ internal class DiscoveryServiceTest { reference.set(AsyncResult.incomplete()) val client = vertx.createDatagramSocket().handler { res -> reference.get().complete(res.data()) - }.listen(0, "localhost").await() + }.listen(0, "localhost").coAwait() val findNodes = FindNodePacket.create( clientKeyPair, System.currentTimeMillis(), SECP256K1.KeyPair.random().publicKey(), ) - client.send(Buffer.buffer(findNodes.encode().toArrayUnsafe()), address.port(), address.host()).await() + client.send(Buffer.buffer(findNodes.encode().toArrayUnsafe()), address.port(), address.host()).coAwait() val datagram = reference.get().await() val buffer = ByteBuffer.allocate(datagram.length()) - datagram.byteBuf.readBytes(buffer) + buffer.put(datagram.bytes) val ping = Packet.decodeFrom(buffer) as PingPacket assertEquals(discoveryService.nodeId, ping.nodeId) @@ -355,11 +356,11 @@ internal class DiscoveryServiceTest { ) reference.set(AsyncResult.incomplete()) - client.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).await() + client.send(Buffer.buffer(pong.encode().toArrayUnsafe()), address.port(), address.host()).coAwait() val datagram2 = reference.get().await() val buffer2 = ByteBuffer.allocate(datagram2.length()) - datagram2.byteBuf.readBytes(buffer2) + buffer2.put(datagram2.bytes) val neighbors = Packet.decodeFrom(buffer2) as NeighborsPacket assertEquals(discoveryService.nodeId, neighbors.nodeId) diff --git a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultDiscoveryV5ServiceTest.kt b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultDiscoveryV5ServiceTest.kt index 28e17e8ed..76bdf0874 100644 --- a/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultDiscoveryV5ServiceTest.kt +++ b/devp2p/src/test/kotlin/org/apache/tuweni/devp2p/v5/DefaultDiscoveryV5ServiceTest.kt @@ -5,6 +5,7 @@ package org.apache.tuweni.devp2p.v5 import io.vertx.core.Vertx import io.vertx.core.buffer.Buffer import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.runBlocking import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.concurrent.AsyncResult @@ -52,7 +53,7 @@ class DefaultDiscoveryV5ServiceTest { val reference = AsyncResult.incomplete() val client = vertx.createDatagramSocket().handler { res -> reference.complete(res.data()) - }.listen(19001, "localhost").await() + }.listen(19001, "localhost").coAwait() val discoveryV5Service: DiscoveryV5Service = DiscoveryService.open( vertx, @@ -64,7 +65,7 @@ class DefaultDiscoveryV5ServiceTest { val datagram = reference.await() val buffer = ByteBuffer.allocate(datagram.length()) - datagram.byteBuf.readBytes(buffer) + buffer.put(datagram.bytes) buffer.flip() val receivedBytes = Bytes.wrapByteBuffer(buffer) val content = receivedBytes.slice(45) diff --git a/dns-discovery/src/main/kotlin/org/apache/tuweni/discovery/DNSResolver.kt b/dns-discovery/src/main/kotlin/org/apache/tuweni/discovery/DNSResolver.kt index 30d9659c4..1f699c09b 100644 --- a/dns-discovery/src/main/kotlin/org/apache/tuweni/discovery/DNSResolver.kt +++ b/dns-discovery/src/main/kotlin/org/apache/tuweni/discovery/DNSResolver.kt @@ -22,7 +22,7 @@ import io.vertx.core.Vertx import io.vertx.core.dns.DnsClient import io.vertx.core.dns.DnsClientOptions import io.vertx.core.dns.DnsException -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.runBlocking import org.apache.tuweni.concurrent.AsyncCompletion import org.apache.tuweni.concurrent.coroutines.asyncCompletion @@ -145,7 +145,7 @@ class DNSResolver @JvmOverloads constructor( */ suspend fun resolveRecordRaw(domainName: String): String? { try { - val records = dnsClient.resolveTXT(domainName).await() + val records = dnsClient.resolveTXT(domainName).coAwait() if (records.isNotEmpty()) { return records[0] } else { diff --git a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/ProxyEthereumClientTest.kt b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/ProxyEthereumClientTest.kt index 44b08d598..f975dfe22 100644 --- a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/ProxyEthereumClientTest.kt +++ b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/ProxyEthereumClientTest.kt @@ -5,6 +5,7 @@ package org.apache.tuweni.ethclient import io.vertx.core.Vertx import io.vertx.core.net.NetServerOptions import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.runBlocking import org.apache.tuweni.concurrent.AsyncResult import org.apache.tuweni.concurrent.coroutines.await @@ -29,7 +30,7 @@ class ProxyEthereumClientTest { socket.write("Hello World!") } } - server.listen().await() + server.listen().coAwait() val identity = SECP256K1.KeyPair.random() val identity2 = SECP256K1.KeyPair.random() @@ -90,7 +91,7 @@ class ProxyEthereumClientTest { val receivedMessage = AsyncResult.incomplete() val netClient = vertx.createNetClient() - val socket = netClient.connect(15000, "127.0.0.1").await() + val socket = netClient.connect(15000, "127.0.0.1").coAwait() socket.handler { receivedMessage.complete(it.toString()) } diff --git a/eth-repository/src/test/kotlin/org/apache/tuweni/eth/repository/BlockchainIndexTest.kt b/eth-repository/src/test/kotlin/org/apache/tuweni/eth/repository/BlockchainIndexTest.kt index a3af91236..d7c4ec70b 100644 --- a/eth-repository/src/test/kotlin/org/apache/tuweni/eth/repository/BlockchainIndexTest.kt +++ b/eth-repository/src/test/kotlin/org/apache/tuweni/eth/repository/BlockchainIndexTest.kt @@ -10,7 +10,7 @@ import org.apache.lucene.search.BooleanQuery import org.apache.lucene.search.IndexSearcher import org.apache.lucene.search.ScoreDoc import org.apache.lucene.search.TermQuery -import org.apache.lucene.search.TopScoreDocCollector +import org.apache.lucene.search.TopScoreDocCollectorManager import org.apache.lucene.store.Directory import org.apache.lucene.util.BytesRef import org.apache.tuweni.bytes.Bytes @@ -65,12 +65,12 @@ internal class BlockchainIndexTest { val reader = DirectoryReader.open(writer) val searcher = IndexSearcher(reader) - val collector = TopScoreDocCollector.create(10, ScoreDoc(1, 1.0f), 1) + val collector = TopScoreDocCollectorManager(10, ScoreDoc(1, 1.0f), 1) val query = BooleanQuery.Builder() .add(TermQuery(Term("_id", BytesRef(header.hash.toArrayUnsafe()))), BooleanClause.Occur.MUST) .add(TermQuery(Term("_type", "block")), BooleanClause.Occur.MUST) searcher.search(query.build(), collector) - val hits = collector.topDocs().scoreDocs + val hits = collector.newCollector().topDocs().scoreDocs assertEquals(1, hits.size) } @@ -100,12 +100,12 @@ internal class BlockchainIndexTest { val reader = DirectoryReader.open(index) val searcher = IndexSearcher(reader) - val collector = TopScoreDocCollector.create(10, ScoreDoc(1, 1.0f), 1) + val collector = TopScoreDocCollectorManager(10, ScoreDoc(1, 1.0f), 1) val query = BooleanQuery.Builder() .add(TermQuery(Term("_id", BytesRef(header.hash.toArrayUnsafe()))), BooleanClause.Occur.MUST) .add(TermQuery(Term("_type", "block")), BooleanClause.Occur.MUST) searcher.search(query.build(), collector) - val hits = collector.topDocs().scoreDocs + val hits = collector.newCollector().topDocs().scoreDocs assertEquals(1, hits.size) } diff --git a/ethstats/src/integrationTest/kotlin/org/apache/tuweni/ethstats/FakeEthStatsServer.kt b/ethstats/src/integrationTest/kotlin/org/apache/tuweni/ethstats/FakeEthStatsServer.kt index 16d6d1b7b..cae7af56e 100644 --- a/ethstats/src/integrationTest/kotlin/org/apache/tuweni/ethstats/FakeEthStatsServer.kt +++ b/ethstats/src/integrationTest/kotlin/org/apache/tuweni/ethstats/FakeEthStatsServer.kt @@ -4,7 +4,7 @@ package org.apache.tuweni.ethstats import io.vertx.core.Vertx import io.vertx.core.http.ServerWebSocket -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.runBlocking class FakeEthStatsServer(val vertx: Vertx, val networkInterface: String, var port: Int) { @@ -13,7 +13,7 @@ class FakeEthStatsServer(val vertx: Vertx, val networkInterface: String, var por init { server.webSocketHandler(this::connect) runBlocking { - server.listen(port, networkInterface).await() + server.listen(port, networkInterface).coAwait() port = server.actualPort() } } diff --git a/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsReporter.kt b/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsReporter.kt index 32f648fca..9aed281df 100644 --- a/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsReporter.kt +++ b/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsReporter.kt @@ -8,11 +8,12 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ArrayNode import io.vertx.core.MultiMap import io.vertx.core.Vertx -import io.vertx.core.http.HttpClient -import io.vertx.core.http.HttpClientOptions import io.vertx.core.http.WebSocket +import io.vertx.core.http.WebSocketClient +import io.vertx.core.http.WebSocketClientOptions import io.vertx.core.http.WebSocketConnectOptions import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay @@ -89,7 +90,7 @@ class EthStatsReporter( val nodeInfo = NodeInfo(name, node, port, network, protocol, os = os, osVer = osVer) private val started = AtomicBoolean(false) - private var client: HttpClient? = null + private var client: WebSocketClient? = null private val newTxCount = AtomicReference() private val newHead = AtomicReference() private val newNodeStats = AtomicReference() @@ -98,7 +99,7 @@ class EthStatsReporter( suspend fun start() { if (started.compareAndSet(false, true)) { - client = vertx.createHttpClient(HttpClientOptions().setLogActivity(true)) + client = vertx.createWebSocketClient(WebSocketClientOptions().setLogActivity(true)) startInternal() } } @@ -144,7 +145,7 @@ class EthStatsReporter( val result = AsyncResult.incomplete() val options = WebSocketConnectOptions().setHost(uri.host).setPort(uri.port) .setHeaders(MultiMap.caseInsensitiveMultiMap().add("origin", "http://localhost")) - val ws = client!!.webSocket(options).await() + val ws = client!!.connect(options).coAwait() ws.closeHandler { launch { attemptConnect(uri) } } ws.exceptionHandler { e -> logger.debug("Error while communicating with ethnetstats", e) diff --git a/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsServer.kt b/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsServer.kt index 89017eae6..b543a3fa6 100644 --- a/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsServer.kt +++ b/ethstats/src/main/kotlin/org/apache/tuweni/ethstats/EthStatsServer.kt @@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode import io.vertx.core.Vertx import io.vertx.core.http.HttpServer import io.vertx.core.http.ServerWebSocket -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import org.apache.tuweni.eth.EthJsonModule @@ -43,13 +43,13 @@ class EthStatsServer( if (started.compareAndSet(false, true)) { server = vertx.createHttpServer().webSocketHandler(this::connect).exceptionHandler { logger.error("Exception occurred", it) - }.listen(port, networkInterface).await() + }.listen(port, networkInterface).coAwait() } } suspend fun stop() { if (started.compareAndSet(true, false)) { - server?.close()?.await() + server?.close()?.coAwait() } } diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt index b05f59c63..79fae72c1 100644 --- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt +++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt @@ -8,9 +8,11 @@ import io.vertx.core.datagram.DatagramSocket import io.vertx.core.http.HttpClient import io.vertx.core.http.HttpMethod import io.vertx.core.http.HttpServer +import io.vertx.core.http.WebSocketClient import io.vertx.core.net.NetClient import io.vertx.core.net.NetServer import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import org.apache.tuweni.bytes.Bytes @@ -51,6 +53,7 @@ class HobbitsTransport( private var exceptionHandler: ((Throwable) -> Unit)? = { } private var httpClient: HttpClient? = null + private var wsClient: WebSocketClient? = null private var tcpClient: NetClient? = null private var udpClient: DatagramSocket? = null @@ -153,9 +156,9 @@ class HobbitsTransport( val completion = AsyncCompletion.incomplete() when (transport) { Transport.HTTP -> { - val req = httpClient!!.request(HttpMethod.POST, port, host, requestURI).await() + val req = httpClient!!.request(HttpMethod.POST, port, host, requestURI).coAwait() .exceptionHandler(exceptionHandler) - val response = req.send(Buffer.buffer(message.toBytes().toArrayUnsafe())).await() + val response = req.send(Buffer.buffer(message.toBytes().toArrayUnsafe())).coAwait() if (response.statusCode() == 200) { completion.complete() } else { @@ -183,14 +186,14 @@ class HobbitsTransport( } Transport.WS -> { try { - val websocket = httpClient!!.webSocket( + val websocket = wsClient!!.connect( port, host, requestURI, - ).await() + ).coAwait() websocket.exceptionHandler(exceptionHandler) - websocket.writeBinaryMessage(Buffer.buffer(message.toBytes().toArrayUnsafe())).await() - websocket.end().await() + websocket.writeBinaryMessage(Buffer.buffer(message.toBytes().toArrayUnsafe())).coAwait() + websocket.end().coAwait() completion.complete() } catch (e: Exception) { completion.completeExceptionally(e) @@ -242,6 +245,7 @@ class HobbitsTransport( suspend fun start() { if (started.compareAndSet(false, true)) { httpClient = vertx.createHttpClient() + wsClient = vertx.createWebSocketClient() tcpClient = vertx.createNetClient() udpClient = vertx.createDatagramSocket().exceptionHandler(exceptionHandler) @@ -377,6 +381,7 @@ class HobbitsTransport( fun stop() { if (started.compareAndSet(true, false)) { httpClient!!.close() + wsClient!!.close() tcpClient!!.close() udpClient!!.close() for (server in httpServers.values) { diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt index 9156ad26c..895d4965f 100644 --- a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt +++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt @@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import io.vertx.core.Vertx import io.vertx.core.VertxOptions import io.vertx.core.buffer.Buffer -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -182,7 +182,7 @@ class Downloader(val vertx: Vertx, val config: DownloaderConfig, override val co "block-${blockNumber.toString().padStart(16, '0')}.json", ) coroutineScope { - vertx.fileSystem().writeFile(filePath.toString(), Buffer.buffer(block)).await() + vertx.fileSystem().writeFile(filePath.toString(), Buffer.buffer(block)).coAwait() } } diff --git a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCClient.kt b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCClient.kt index 0863e072e..b2e4ce484 100644 --- a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCClient.kt +++ b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCClient.kt @@ -48,7 +48,7 @@ class JSONRPCClient( val requestCounter = AtomicLong(1) val client = WebClient.create( vertx, - WebClientOptions().setUserAgent(userAgent).setTryUseCompression(true) + WebClientOptions().setUserAgent(userAgent).setDecompressionSupported(true) .setTracingPolicy(TracingPolicy.ALWAYS) as WebClientOptions, ) val authorizationHeader = "Basic " + Base64.getEncoder() diff --git a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCServer.kt b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCServer.kt index c97cf738f..1596850f0 100644 --- a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCServer.kt +++ b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/JSONRPCServer.kt @@ -185,17 +185,20 @@ private class JSONRPCUser(val principal: JsonObject) : User { return JsonObject() } + @Deprecated("interface deprecated") override fun isAuthorized(authority: Authorization?, resultHandler: Handler>?): User { resultHandler?.handle(Future.succeededFuture(true)) return this } + @Deprecated("interface deprecated") override fun clearCache(): User { return this } override fun principal(): JsonObject = principal + @Deprecated("interface deprecated") override fun setAuthProvider(authProvider: AuthProvider?) { } diff --git a/metrics/build.gradle b/metrics/build.gradle index 8353031d2..a3c0e2bda 100644 --- a/metrics/build.gradle +++ b/metrics/build.gradle @@ -22,6 +22,7 @@ dependencies { implementation 'io.opentelemetry:opentelemetry-exporter-otlp' implementation 'io.opentelemetry:opentelemetry-exporter-otlp-metrics' implementation 'io.opentelemetry:opentelemetry-proto' + implementation 'io.opentelemetry.semconv:opentelemetry-semconv-incubating' implementation 'io.prometheus:simpleclient' implementation 'io.prometheus:simpleclient_httpserver' diff --git a/metrics/src/main/kotlin/org/apache/tuweni/metrics/MetricsService.kt b/metrics/src/main/kotlin/org/apache/tuweni/metrics/MetricsService.kt index a8dc6b089..05f31844b 100644 --- a/metrics/src/main/kotlin/org/apache/tuweni/metrics/MetricsService.kt +++ b/metrics/src/main/kotlin/org/apache/tuweni/metrics/MetricsService.kt @@ -12,7 +12,7 @@ import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.sdk.trace.SdkTracerProvider import io.opentelemetry.sdk.trace.export.BatchSpanProcessor -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import io.opentelemetry.semconv.incubating.ServiceIncubatingAttributes import org.slf4j.LoggerFactory import java.util.concurrent.TimeUnit @@ -46,7 +46,7 @@ class MetricsService( val resource = Resource.getDefault() .merge( Resource.create( - Attributes.builder().put(ResourceAttributes.SERVICE_NAME, jobName).build(), + Attributes.builder().put(ServiceIncubatingAttributes.SERVICE_NAME, jobName).build(), ), ) val sdkMeterProviderBuilder = SdkMeterProvider.builder().setResource(resource) diff --git a/scuttlebutt-discovery/src/main/kotlin/org/apache/tuweni/scuttlebutt/discovery/ScuttlebuttLocalDiscoveryService.kt b/scuttlebutt-discovery/src/main/kotlin/org/apache/tuweni/scuttlebutt/discovery/ScuttlebuttLocalDiscoveryService.kt index 17fef893b..3bd671b88 100644 --- a/scuttlebutt-discovery/src/main/kotlin/org/apache/tuweni/scuttlebutt/discovery/ScuttlebuttLocalDiscoveryService.kt +++ b/scuttlebutt-discovery/src/main/kotlin/org/apache/tuweni/scuttlebutt/discovery/ScuttlebuttLocalDiscoveryService.kt @@ -7,7 +7,7 @@ import io.vertx.core.AsyncResult import io.vertx.core.Vertx import io.vertx.core.datagram.DatagramPacket import io.vertx.core.datagram.DatagramSocket -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicBoolean import java.util.function.Consumer @@ -70,7 +70,7 @@ class ScuttlebuttLocalDiscoveryService( }.listen( listenPort, listenNetworkInterface, - ).await() + ).coAwait() timerId = vertx.setPeriodic(60000) { broadcast() } } } @@ -115,7 +115,7 @@ class ScuttlebuttLocalDiscoveryService( suspend fun stop() { if (started.compareAndSet(true, false)) { vertx.cancelTimer(timerId) - udpSocket?.close()?.await() + udpSocket?.close()?.coAwait() } } diff --git a/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.kt b/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.kt index d91112107..991ab6b7c 100644 --- a/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.kt +++ b/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxClient.kt @@ -8,6 +8,7 @@ import io.vertx.core.net.NetClient import io.vertx.core.net.NetClientOptions import io.vertx.core.net.NetSocket import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.bytes.Bytes32 import org.apache.tuweni.concurrent.AsyncCompletion @@ -190,7 +191,7 @@ class SecureScuttlebuttVertxClient( handlerFactory: (sender: (Bytes) -> Unit, terminationFunction: () -> Unit) -> ClientHandler, ): ClientHandler { client = vertx.createNetClient(NetClientOptions().setTcpKeepAlive(true)) - val socket = client!!.connect(port, host).await() + val socket = client!!.connect(port, host).coAwait() val h = NetSocketClientHandler( socket, remotePublicKey, diff --git a/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.kt b/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.kt index c06a406b3..390e18b18 100644 --- a/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.kt +++ b/scuttlebutt-handshake/src/main/kotlin/org/apache/tuweni/scuttlebutt/handshake/vertx/SecureScuttlebuttVertxServer.kt @@ -7,7 +7,7 @@ import io.vertx.core.buffer.Buffer import io.vertx.core.net.NetServer import io.vertx.core.net.NetServerOptions import io.vertx.core.net.NetSocket -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.bytes.Bytes32 import org.apache.tuweni.crypto.sodium.Signature @@ -168,7 +168,7 @@ class SecureScuttlebuttVertxServer( server!!.connectHandler { netSocket: NetSocket -> NetSocketHandler().handle(netSocket) } - server!!.listen().await() + server!!.listen().coAwait() } /** @@ -177,6 +177,6 @@ class SecureScuttlebuttVertxServer( * @return a handle to the completion of the operation */ suspend fun stop() { - server!!.close().await() + server!!.close().coAwait() } } diff --git a/stratum-server/src/main/kotlin/org/apache/tuweni/stratum/server/StratumServer.kt b/stratum-server/src/main/kotlin/org/apache/tuweni/stratum/server/StratumServer.kt index 343f7a106..88e213f2d 100644 --- a/stratum-server/src/main/kotlin/org/apache/tuweni/stratum/server/StratumServer.kt +++ b/stratum-server/src/main/kotlin/org/apache/tuweni/stratum/server/StratumServer.kt @@ -9,7 +9,7 @@ import io.vertx.core.net.NetServer import io.vertx.core.net.NetServerOptions import io.vertx.core.net.NetSocket import io.vertx.core.net.SelfSignedCertificate -import io.vertx.kotlin.coroutines.await +import io.vertx.kotlin.coroutines.coAwait import io.vertx.kotlin.coroutines.dispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch @@ -102,7 +102,7 @@ class StratumServer( val server = vertx.createNetServer(options) server.exceptionHandler { e -> logger.error(e.message, e) } server.connectHandler(this::handleConnection) - server.listen().await() + server.listen().coAwait() tcpServer = server } } @@ -136,7 +136,7 @@ class StratumServer( suspend fun stop() { if (started.compareAndSet(true, false)) { - tcpServer?.close()?.await() + tcpServer?.close()?.coAwait() } } diff --git a/toml/src/main/java/org/apache/tuweni/toml/TomlVersion.java b/toml/src/main/java/org/apache/tuweni/toml/TomlVersion.java index 5e42072d0..aa500ce51 100644 --- a/toml/src/main/java/org/apache/tuweni/toml/TomlVersion.java +++ b/toml/src/main/java/org/apache/tuweni/toml/TomlVersion.java @@ -12,16 +12,16 @@ public enum TomlVersion { *

This specification can be found at https://github.com/toml-lang/toml/blob/master/versions/en/toml-v0.4.0.md. */ - V0_4_0(null), + V0_4_0(null, 0), /** * The 0.5.0 version of TOML. * *

This specification can be found at https://github.com/toml-lang/toml/blob/master/versions/en/toml-v0.5.0.md. */ - V0_5_0(null), + V0_5_0(null, 1), /** The latest stable specification of TOML. */ - LATEST(V0_5_0), + LATEST(V0_5_0, V0_5_0.index), /** * The head (development) specification of TOML. * @@ -31,15 +31,18 @@ public enum TomlVersion { *

Note: As the specification is under active development, this implementation may not match * the latest changes. */ - HEAD(null); + HEAD(null, 99); final TomlVersion canonical; - TomlVersion(@Nullable TomlVersion canonical) { + final int index; + + TomlVersion(@Nullable TomlVersion canonical, int index) { this.canonical = canonical != null ? canonical : this; + this.index = index; } boolean after(TomlVersion other) { - return this.ordinal() > other.ordinal(); + return this.index > other.index; } }