Skip to content

Commit

Permalink
Network: TorStream should inherit IO.Stream
Browse files Browse the repository at this point in the history
This commit adds IO.Stream as a parent class
to TorStream this extremely helps with compatibility
and lowers the number of necessary changes to end
users' code.
  • Loading branch information
aarani committed Apr 11, 2023
1 parent 5f483b3 commit 98b67fb
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 53 deletions.
4 changes: 2 additions & 2 deletions NOnion.Tests/HiddenServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private async Task<int> ReadExact(TorStream stream, byte[] buffer, int off, int
{
if (len - off <= 0) return 0;

var bytesRead = await stream.ReceiveAsync(buffer, off, len - off);
var bytesRead = await stream.ReadAsync(buffer, off, len - off);

if (bytesRead == 0 || bytesRead == -1)
throw new Exception("Not enough data");
Expand Down Expand Up @@ -134,7 +134,7 @@ public async Task EstablishAndCommunicateOverHSConnectionOnionStyle()
Task.Run(async () => {
var stream = await host.AcceptClientAsync();
var bytesToSendWithLength = BitConverter.GetBytes(dataToSendAndReceive.Length).Concat(dataToSendAndReceive).ToArray();
await stream.SendDataAsync(bytesToSendWithLength);
await stream.WriteAsync(bytesToSendWithLength, 0, bytesToSendWithLength.Length);
await stream.EndAsync();
});

Expand Down
2 changes: 0 additions & 2 deletions NOnion/Constants.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ module Constants =
// Time limit used for receving data in stream
let internal StreamReceiveTimeout = TimeSpan.FromSeconds 1.

let internal HttpClientBufferSize = 1024

let internal DefaultHttpHost = "127.0.0.1"

// NTor Handshake Constants
Expand Down
8 changes: 4 additions & 4 deletions NOnion/Directory/TorDirectory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type TorDirectory =
)

let circuit = TorCircuit(guard)
let stream = TorStream(circuit)
use stream = new TorStream(circuit)

(*
* We always use FastCreate authentication because privacy is not important for mono-hop
Expand Down Expand Up @@ -216,7 +216,7 @@ type TorDirectory =
)

let circuit = TorCircuit(guard)
let stream = TorStream(circuit)
use stream = new TorStream(circuit)

(*
* We always use FastCreate authentication because privacy is not important for mono-hop
Expand Down Expand Up @@ -252,7 +252,7 @@ type TorDirectory =
circuit.Create CircuitNodeDetail.FastCreate
|> Async.Ignore

let consensusStream = TorStream circuit
use consensusStream = new TorStream(circuit)
do! consensusStream.ConnectToDirectory() |> Async.Ignore

let consensusHttpClient =
Expand Down Expand Up @@ -340,7 +340,7 @@ type TorDirectory =
(digestsChunk: array<string>)
=
async {
let descriptorsStream = TorStream circuit
use descriptorsStream = new TorStream(circuit)

do!
descriptorsStream.ConnectToDirectory()
Expand Down
26 changes: 9 additions & 17 deletions NOnion/Http/TorHttpClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,8 @@ open NOnion.Utility
type TorHttpClient(stream: TorStream, host: string) =

// Receives all the data stream until it reaches EOF (until stream receive a RELAY_END)
let rec ReceiveAll(memStream: MemoryStream) =
async {
let buffer = Array.zeroCreate Constants.HttpClientBufferSize

// Try to fill the buffer
let! bytesRead =
stream.Receive buffer 0 Constants.HttpClientBufferSize

if bytesRead > 0 then
memStream.Write(buffer, 0, bytesRead)
return! ReceiveAll memStream
}
let ReceiveAll memStream =
stream.CopyToAsync memStream |> Async.AwaitTask

member __.GetAsString (path: string) (forceUncompressed: bool) =
async {
Expand All @@ -42,10 +32,11 @@ type TorHttpClient(stream: TorStream, host: string) =
|> List.map(fun (k, v) -> sprintf "%s: %s\r\n" k v)
|> String.concat String.Empty

do!
let buffer =
sprintf "GET %s HTTP/1.0\r\n%s\r\n" path headers
|> Encoding.UTF8.GetBytes
|> stream.SendData
|> Encoding.ASCII.GetBytes

do! stream.AsyncWrite(buffer, 0, buffer.Length)

use memStream = new MemoryStream()

Expand Down Expand Up @@ -136,10 +127,11 @@ type TorHttpClient(stream: TorStream, host: string) =
|> List.map(fun (k, v) -> sprintf "%s: %s\r\n" k v)
|> String.concat String.Empty

do!
let buffer =
sprintf "POST %s HTTP/1.0\r\n%s\r\n%s" path headers payload
|> Encoding.ASCII.GetBytes
|> stream.SendData

do! stream.AsyncWrite(buffer, 0, buffer.Length)

use memStream = new MemoryStream()

Expand Down
114 changes: 89 additions & 25 deletions NOnion/Network/TorStream.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace NOnion.Network

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open System.Threading.Tasks.Dataflow

Expand All @@ -22,7 +24,9 @@ type private StreamReceiveMessage =
type private StreamControlMessage =
| End of replyChannel: AsyncReplyChannel<OperationResult<unit>>
| Send of
array<byte> *
data: array<byte> *
offset: int *
length: int *
replyChannel: AsyncReplyChannel<OperationResult<unit>>
| StartServiceConnectionProcess of
port: int *
Expand All @@ -44,6 +48,7 @@ type private StreamControlMessage =
| SendSendMe of replyChannel: AsyncReplyChannel<OperationResult<unit>>

type TorStream(circuit: TorCircuit) =
inherit Stream()

let mutable streamState: StreamState = StreamState.Initialized

Expand Down Expand Up @@ -79,12 +84,15 @@ type TorStream(circuit: TorCircuit) =
| _ -> failwith "Unexpected state when trying to end the stream"
}

let safeSend(data: array<byte>) =
let safeSend (data: array<byte>) (offset: int) (length: int) =
async {
match streamState with
| Connected streamId ->
let dataChunks =
SeqUtils.Chunk Constants.MaximumRelayPayloadLength data
data
|> Seq.skip offset
|> Seq.take length
|> SeqUtils.Chunk Constants.MaximumRelayPayloadLength

let rec sendChunks dataChunks =
async {
Expand All @@ -98,7 +106,7 @@ type TorStream(circuit: TorCircuit) =
circuit.SendRelayCell
streamId
(head
|> Array.ofSeq
|> Seq.toArray
|> RelayData.RelayData)
None

Expand Down Expand Up @@ -224,9 +232,9 @@ type TorStream(circuit: TorCircuit) =
match command with
| End replyChannel ->
do! safeEnd() |> TryExecuteAsyncAndReplyAsResult replyChannel
| Send(data, replyChannel) ->
| Send(data, offset, length, replyChannel) ->
do!
safeSend data
safeSend data offset length
|> TryExecuteAsyncAndReplyAsResult replyChannel
| StartServiceConnectionProcess(port, streamObj, replyChannel) ->
do!
Expand Down Expand Up @@ -348,7 +356,7 @@ type TorStream(circuit: TorCircuit) =
do! refillBufferIfNeeded()

if isEOF then
return -1
return 0
else
let rec tryRead bytesRead bytesRemaining =
async {
Expand Down Expand Up @@ -401,9 +409,31 @@ type TorStream(circuit: TorCircuit) =
let streamReceiveMailBox =
MailboxProcessor.Start StreamReceiveMailBoxProcessor

override _.CanRead = not isEOF
override _.CanWrite = not isEOF

override _.CanSeek = false

override _.Length = failwith "Length is not supported"

override _.SetLength _ =
failwith "SetLength is not supported"

override _.Position
with get () = failwith "No seek, GetPosition is not supported"
and set _position = failwith "No seek, SetPosition is not supported"

override _.Seek(_, _) =
failwith "No seek, Seek is not supported"

override _.Flush() =
()

static member Accept (streamId: uint16) (circuit: TorCircuit) =
async {
let stream = TorStream circuit
// We can't use the "use" keyword since this stream needs
// to outlive this function.
let stream = new TorStream(circuit)
do! stream.RegisterIncomingStream streamId

do! circuit.SendRelayCell streamId (RelayConnected Array.empty) None
Expand All @@ -428,20 +458,6 @@ type TorStream(circuit: TorCircuit) =
member self.EndAsync() =
self.End() |> Async.StartAsTask


member __.SendData(data: array<byte>) =
async {
let! sendResult =
streamControlMailBox.PostAndAsyncReply(fun replyChannel ->
StreamControlMessage.Send(data, replyChannel)
)

return UnwrapResult sendResult
}

member self.SendDataAsync data =
self.SendData data |> Async.StartAsTask

member self.ConnectToService(port: int) =
async {
let! completionTaskRes =
Expand Down Expand Up @@ -500,7 +516,34 @@ type TorStream(circuit: TorCircuit) =
return UnwrapResult registerationResult
}

member self.Receive (buffer: array<byte>) (offset: int) (length: int) =
override _.Read(buffer: array<byte>, offset: int, length: int) =
let receiveResult =
streamReceiveMailBox.PostAndReply(fun replyChannel ->
{
StreamBuffer = buffer
BufferOffset = offset
BufferLength = length
ReplyChannel = replyChannel
}
)

UnwrapResult receiveResult

override _.Write(buffer: array<byte>, offset: int, length: int) =
let sendResult =
streamControlMailBox.PostAndReply(fun replyChannel ->
StreamControlMessage.Send(buffer, offset, length, replyChannel)
)

UnwrapResult sendResult

override _.ReadAsync
(
buffer: array<byte>,
offset: int,
length: int,
_cancelToken: CancellationToken
) =
async {
let! receiveResult =
streamReceiveMailBox.PostAndAsyncReply(fun replyChannel ->
Expand All @@ -514,9 +557,30 @@ type TorStream(circuit: TorCircuit) =

return UnwrapResult receiveResult
}
|> Async.StartAsTask

override _.WriteAsync
(
buffer: array<byte>,
offset: int,
length: int,
_cancelToken: CancellationToken
) =
async {
let! sendResult =
streamControlMailBox.PostAndAsyncReply(fun replyChannel ->
StreamControlMessage.Send(
buffer,
offset,
length,
replyChannel
)
)

member self.ReceiveAsync(buffer: array<byte>, offset: int, length: int) =
self.Receive buffer offset length |> Async.StartAsTask
return UnwrapResult sendResult
}
|> Async.StartAsTask
:> Task

interface ITorStream with
member __.HandleDestroyedCircuit() =
Expand Down
6 changes: 4 additions & 2 deletions NOnion/Services/TorServiceClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type TorServiceClient =
circuit.Extend hsDirectoryNode
|> Async.Ignore

let dirStream = TorStream circuit
use dirStream = new TorStream(circuit)

do!
dirStream.ConnectToDirectory()
Expand Down Expand Up @@ -493,7 +493,9 @@ type TorServiceClient =
Async.Parallel [ introduceJob; rendezvousJoin ]
|> Async.Ignore

let serviceStream = TorStream rendezvousCircuit
// We can't use the "use" keyword since this stream needs
// to outlive this function.
let serviceStream = new TorStream(rendezvousCircuit)
do! serviceStream.ConnectToService port |> Async.Ignore

return
Expand Down
5 changes: 4 additions & 1 deletion NOnion/Services/TorServiceHost.fs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ type TorServiceHost
do! circuit.Extend randomMiddleNode |> Async.Ignore
do! circuit.Extend hsDirectoryNode |> Async.Ignore

let dirStream = TorStream circuit
use dirStream = new TorStream(circuit)
do! dirStream.ConnectToDirectory() |> Async.Ignore

let! _response =
Expand Down Expand Up @@ -811,6 +811,9 @@ type TorServiceHost
}

let! (streamId, senderCircuit) = getConnectionRequest()
// We can't use the "use" keyword since this stream needs
// to outlive this function. Hopefully the caller will dispose
// this after they're done using it.
let! stream = TorStream.Accept streamId senderCircuit
return stream
}
Expand Down

0 comments on commit 98b67fb

Please sign in to comment.