Navigation Menu

Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Jan 24, 2020
1 parent 17dcb28 commit 952e6f3
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 81 deletions.
97 changes: 17 additions & 80 deletions src/commonMain/kotlin/com.koushikdutta.scratch/filter.kt
@@ -1,102 +1,39 @@
package com.koushikdutta.scratch

import com.koushikdutta.scratch.async.startSafeCoroutine
import com.koushikdutta.scratch.buffers.Buffers
import com.koushikdutta.scratch.buffers.ByteBufferList
import com.koushikdutta.scratch.buffers.ReadableBuffers
import com.koushikdutta.scratch.buffers.WritableBuffers
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine

/**
* Manages filtering and buffering of data through a pipe.
*/
abstract class AsyncFilter {
internal val unfiltered = ByteBufferList()
// need separate buffers for filtered and outgoing as filtering may take place during
// read/write operations
internal val filtered = ByteBufferList()
internal val outgoing = ByteBufferList()

protected abstract fun filter(unfiltered: Buffers, filtered: Buffers)

// invoke a filter on the pending data, even if it is empty.
protected fun invokeFilter() {
filter(unfiltered, filtered)
}
}

/**
* Create an AsyncRead that can be interrupted.
* The interrupted read will return true to indicate more data is present,
* and the WritableBuffers will be unchanged.
*/
class InterruptibleRead(private val input: AsyncRead) {
private var readResume: Continuation<ReadResult>? = null
private val transient = ByteBufferList()
private var reading = false

fun readTransient() {
val exit = synchronized(this) {
if (reading)
return@synchronized true
reading = true
false
}
if (exit)
return
private val pipe = PipeSocket()

init {
startSafeCoroutine {
val result = try {
try {
val ret = input(transient)
ReadResult(true, ret, null)
} finally {
reading = false
val buffer = ByteBufferList()
try {
pipe.write(ByteBufferList())

while (buffer.hasRemaining() || input(buffer)) {
if (buffer.hasRemaining())
pipe.write(buffer)
}
} catch (t: Throwable) {
ReadResult(true, null, t)
}

val resume: Continuation<ReadResult>? = synchronized(this) {
val resume = readResume
readResume = null
resume
catch (throwable: Throwable) {
pipe.close(throwable)
return@startSafeCoroutine
}
resume?.resume(result)
}
}

private data class ReadResult(val succeeded: Boolean, val result: Boolean? = null, val throwable: Throwable? = null)

suspend fun read(buffer: WritableBuffers): Boolean {
check(readResume == null) { "read already in progress" }

if (transient.hasRemaining()) {
transient.read(buffer)
return true
}

val result = suspendCoroutine<ReadResult> read@{
readResume = it
readTransient()
pipe.close()
}

if (!result.succeeded)
return true
if (result.throwable != null)
throw result.throwable
transient.read(buffer)
return result.result!!
}

fun interrupt() {
val resume: Continuation<ReadResult>? = synchronized(this) {
val resume = readResume
readResume = null
resume
}
resume?.resume(ReadResult(false))
}
}
suspend fun read(buffer: WritableBuffers) = pipe.read(buffer)
fun interrupt() = pipe.interruptRead()
fun readTransient() = pipe.interruptWrite()
}
24 changes: 24 additions & 0 deletions src/commonTest/kotlin/com/koushikdutta/scratch/NioTests.kt
Expand Up @@ -31,6 +31,30 @@ class NioTests {
assertTrue(highWater)
}

@Test
fun testNioWriter2() {
var highWater = false
val pipe = NonBlockingWritePipe(0) {
highWater = true
}

val keepGoing = pipe.write(ByteBufferList().putUtf8String("Hello World"))
pipe.write(ByteBufferList().putUtf8String("Hello World"))
pipe.write(ByteBufferList().putUtf8String("Hello World"))
pipe.end()

// start reading after end, to ensure data after read is still available.
var data = ""
async {
data = readAllString({pipe.read(it)})
}

assertEquals(data, "Hello WorldHello WorldHello World")

assertTrue(!keepGoing)
assertTrue(highWater)
}

@Test
fun testNioWriterWritable() {
val yielder = Yielder()
Expand Down
43 changes: 42 additions & 1 deletion src/jvmTest/kotlin/com/koushikdutta/scratch/TLSTests.kt
@@ -1,6 +1,7 @@
package com.koushikdutta.scratch

import com.koushikdutta.scratch.buffers.ByteBufferList
import com.koushikdutta.scratch.buffers.createByteBufferList
import com.koushikdutta.scratch.http.AsyncHttpRequest
import com.koushikdutta.scratch.http.AsyncHttpResponse
import com.koushikdutta.scratch.http.OK
Expand All @@ -18,7 +19,6 @@ import javax.net.ssl.SSLContext
import kotlin.test.assertEquals

class TLSTests {

@Test
fun testConscryptTlsServer() {
val conscrypt = Conscrypt.newProvider()
Expand Down Expand Up @@ -53,6 +53,47 @@ class TLSTests {
assertEquals(data, "hello worldhello world")
}

@Test
fun testAlpn() {
val conscrypt = Conscrypt.newProvider()
val keypairCert = createSelfSignedCertificate("TestServer")

val serverContext = SSLContext.getInstance("TLS", conscrypt)
serverContext.init(keypairCert.first, keypairCert.second)


val server = createAsyncPipeServerSocket()
val tlsServer = server.listenTls {
val engine = serverContext.createSSLEngine()
Conscrypt.setApplicationProtocols(engine, arrayOf("foo"))
engine
}

val clientContext = SSLContext.getInstance("TLS", conscrypt)
clientContext.init(keypairCert.second)

var protocol = ""
async {
val tlsClient = tlsServer.accept().iterator().next()
protocol = Conscrypt.getApplicationProtocol(tlsClient.engine)
tlsClient::write.drain("hello world".createByteBufferList())
tlsClient.close()
}

var data = ""
async {
val socket = server.connect()
val engine = clientContext.createSSLEngine("TestServer", 0)
engine.useClientMode = true
Conscrypt.setApplicationProtocols(engine, arrayOf("foo"))
val tlsSocket = tlsHandshake(socket, engine)
data = readAllString(tlsSocket::read)
}

assert(data == "hello world")
assert(protocol == "foo")
}

@Test
fun testHttp2Alpn() {
val conscrypt = Conscrypt.newProvider()
Expand Down

0 comments on commit 952e6f3

Please sign in to comment.