Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose eventloop on Stream #290

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/core/Stream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.libp2p.core

import io.libp2p.protocol.ProtocolMessageHandler
import io.libp2p.protocol.ProtocolMessageHandlerAdapter
import io.netty.channel.EventLoop
import java.util.concurrent.CompletableFuture

/**
Expand All @@ -10,6 +11,11 @@ import java.util.concurrent.CompletableFuture
interface Stream : P2PChannel {
val connection: Connection

/**
* Return the underlying EventLoop
*/
fun eventLoop(): EventLoop

/**
* Returns the [PeerId] of the remote peer [Connection] which this
* [Stream] created on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ import io.libp2p.core.Stream
import io.libp2p.etc.PROTOCOL
import io.libp2p.etc.types.toVoidCompletableFuture
import io.netty.channel.Channel
import io.netty.channel.EventLoop
import java.util.concurrent.CompletableFuture

class StreamOverNetty(
ch: Channel,
override val connection: Connection,
initiator: Boolean
) : Stream, P2PChannelOverNetty(ch, initiator) {

val group: EventLoop
init {
nettyChannel.attr(PROTOCOL).set(CompletableFuture())
group = ch.eventLoop()
}

override fun eventLoop() = group

/**
* Returns the [PeerId] of the remote peer [Connection] which this
* [Stream] created on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.libp2p.tools.p2pd.libp2pj.Muxer;
import io.libp2p.tools.p2pd.libp2pj.Stream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.*;

import java.nio.ByteBuffer;

Expand All @@ -30,6 +30,11 @@ public NettyStream(Channel channel, boolean initiator) {
this(channel, initiator, null, null);
}

@Override
public EventLoop eventLoop() {
return channel.eventLoop();
}

@Override
public void write(ByteBuffer data) {
channel.write(Unpooled.wrappedBuffer(data));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.libp2p.tools.p2pd.libp2pj;

import io.netty.channel.*;

import java.nio.ByteBuffer;

/**
Expand All @@ -9,6 +11,8 @@ public interface Stream<TEndpoint> {

boolean isInitiator();

EventLoop eventLoop();

void write(ByteBuffer data);

void flush();
Expand Down
6 changes: 6 additions & 0 deletions libp2p/src/testFixtures/kotlin/io/libp2p/tools/Stubs.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.libp2p.core.Stream
import io.libp2p.core.multiformats.Multiaddr
import io.libp2p.etc.util.P2PService
import io.netty.channel.ChannelHandler
import io.netty.channel.EventLoop
import java.util.concurrent.CompletableFuture

class ConnectionStub : Connection {
Expand All @@ -25,6 +26,11 @@ class ConnectionStub : Connection {
class StreamStub : Stream {
private val remotePeerId = PeerId.random()
override val connection = ConnectionStub()

override fun eventLoop(): EventLoop {
TODO("Not yet implemented")
}

override fun remotePeerId() = remotePeerId
override fun getProtocol() = CompletableFuture.completedFuture("nop")
override fun pushHandler(handler: ChannelHandler) = TODO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.libp2p.etc.util.netty.nettyInitializer
import io.libp2p.transport.implementation.P2PChannelOverNetty
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.EventLoop
import io.netty.channel.embedded.EmbeddedChannel
import java.util.concurrent.CompletableFuture

Expand Down Expand Up @@ -42,6 +43,10 @@ private class TestStream(ch: Channel, initiator: Boolean) : P2PChannelOverNetty(
nettyChannel.attr(PROTOCOL).set(CompletableFuture())
}

override fun eventLoop(): EventLoop {
TODO("Not yet implemented")
}

override fun remotePeerId(): PeerId {
return PeerId(ByteArray(32))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.libp2p.etc.PROTOCOL
import io.libp2p.etc.types.toVoidCompletableFuture
import io.libp2p.transport.implementation.P2PChannelOverNetty
import io.netty.channel.Channel
import io.netty.channel.EventLoop
import java.util.concurrent.CompletableFuture

class Libp2pStreamImpl(
Expand All @@ -28,4 +29,8 @@ class Libp2pStreamImpl(

override fun closeWrite(): CompletableFuture<Unit> =
nettyChannel.disconnect().toVoidCompletableFuture()

override fun eventLoop(): EventLoop {
TODO("Not yet implemented")
}
}