diff --git a/README.md b/README.md index fde774f..7193639 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,12 @@ -# native-loop (PRE-RELEASE) -Extensible event loop and async-oriented IO for Scala Native; powered by libuv. +# scala-native-loop -## UNDER CONSTRUCTION - -If you're looking for the new 0.4 rewrite, check the `04` branch. The current state of master is mostly extracted from the book [Modern Systems Programming in Scala Native](https://pragprog.com/book/rwscala/modern-systems-programming-with-scala-native). +Async IO and event loop for Scala Native ## What is it? -scala-native-loop provides a real, asynchronous ExecutionContext implementation for Scala Native. -It's backed by libuv, the same C library that the node.js ecosystem runs on; in addition to basic -Future dispatching, we can also use libuv to provide other basic functionality, like: - -- File IO -- Pipe IO -- TCP Sockets -- UDP Sockets -- Timers - -To provide a working API for practical, async Scala Native programs, we have two subprojects, -`client` and `server`, which provide an async HTTP client and server, respectively, by integrating addtional C libraries: [nodejs/http-parser](https://github.com/nodejs/http-parser) for request parsing, and [curl](https://github.com/curl/curl) for a full featured client with HTTPS support. - -That said - providing a full-featured ecosystem in a single library isn't feasible - instead, we provide a `LoopExtension` trait that allows other C libraries to be integrated to the underlying event loop, in the same way that libcurl and http-parser are integrated; this opens up the possiblity of fully asynchronous bindings for postgres, redis, and many others. - -## Why is this here? - -To demonstrate the architectural style of a full, extensible async ecosystem for Scala Native, with an idiomatic Future-based API, implemented entirely as a library, and to start discussion about what we our priorities are. - -## LoopExtension trait - -To attach a new library to the event loop, all we need to do is provide the `LoopExtension` trait: - -``` -trait LoopExtension { - def activeRequests:Int -} -``` - -And then register the component at runtime with `EventLoop.addExtension()`. - -This is necessary because we need some way to know if there are pending IO tasks being managed by a C library, even if there are no outstanding Futures, and prevent the event loop from shutting down prematurely in that case. - -## Maintenance Status - -This code is a pre-release preview - I am cleaning up both the style and the implementation, -aiming to align with Scala Native 0.4 for something more robust. - -For now, I'm filing issues to remind myself of work that needs to be done. - -I'll also create a few "discussion" issues for broader conversation. - -Please feel free to file additional issues with questions, comments, and concerns! - -## Server API Example - -``` - def main(args:Array[String]):Unit = { - Service() - .getAsync("/async") { r => Future { - s"got (async routed) request $r" - }.map { message => OK( - Map("asyncMessage" -> message) - ) - } - } - .getAsync("/fetch/example") { r => - Curl.get(c"https://www.example.com").map { response => - Response(200,"OK",Map(),response.body) - } - } - .get("/") { r => OK { - Map("default_message" -> s"got (default routed) request $r") - } - } - .run(9999) - uv_run(EventLoop.loop, UV_RUN_DEFAULT) - } -``` - -## Streaming API Example +scala-native-loop provides asynchronous utilities for Scala Native. +It's backed by libuv, the same C library that the Node.js ecosystem runs on. +It currently offers: -``` - def main(args:Array[String]):Unit = { - val p = FilePipe(c"./data.txt") - .map { d => - println(s"consumed $d") - d - }.addDestination(Tokenizer("\n")) - .addDestination(Tokenizer(" ")) - .map { d => d + "\n" } - .addDestination(FileOutputPipe(c"./output.txt", false)) - println("running") - uv_run(EventLoop.loop,UV_RUN_DEFAULT) - } -``` \ No newline at end of file +- `scala.scalanative.loop.Timer`: to schedule callbacks to execute after a timeout +- `scala.scalanative.loop.Poll`: to schedule callbacks when data is read/written on a file descriptor diff --git a/build.sbt b/build.sbt index 74f6a45..8fc66a7 100644 --- a/build.sbt +++ b/build.sbt @@ -26,24 +26,9 @@ val publishSettings = Seq( url("https://github.com/scala-native/scala-native-loop"), "scm:git:git@github.com:scala-native/scala-native-loop.git" ) - ), - developers := List( - Developer( - "rwhaling", - "Richard Whaling", - "richard@whaling.dev", - url("http://whaling.dev") - ) ) ) -val noPublishSettings = Seq( - publish := {}, - publishLocal := {}, - publishArtifact := false, - skip in publish := true -) - lazy val commonSettings = Seq( scalacOptions ++= Seq( "-deprecation", @@ -56,11 +41,6 @@ lazy val commonSettings = Seq( ), libraryDependencies += "com.lihaoyi" %%% "utest" % "0.7.11" % Test, testFrameworks += new TestFramework("utest.runner.Framework"), - Test / nativeLinkStubs := true, -) - -lazy val examplesSettings = Seq( - test := {} ) lazy val core = project @@ -70,75 +50,10 @@ lazy val core = project .settings(publishSettings) .enablePlugins(ScalaNativePlugin) -lazy val pipe = project - .in(file("pipe")) - .settings(commonSettings) - .settings(test := {}) - .settings(noPublishSettings) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core) - -lazy val client = project - .in(file("client")) - .settings(commonSettings) - .settings(test := {}) - .settings(noPublishSettings) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core) - -lazy val server = project - .in(file("server")) - .settings(commonSettings) - .settings(test := {}) - .settings(noPublishSettings) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core) - lazy val scalaJsCompat = project .in(file("scalajs-compat")) .settings(name := "native-loop-js-compat") .settings(commonSettings) .settings(publishSettings) - .settings(test := {}) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core) - -lazy val serverExample = project - .in(file("examples/server")) - .settings( - commonSettings, - examplesSettings - ) - .settings(noPublishSettings) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core, server, client) - -lazy val pipeExample = project - .in(file("examples/pipe")) - .settings( - commonSettings, - examplesSettings - ) - .settings(noPublishSettings) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core, pipe, client) - -lazy val curlExample = project - .in(file("examples/curl")) - .settings( - commonSettings, - examplesSettings - ) - .settings(noPublishSettings) - .enablePlugins(ScalaNativePlugin) - .dependsOn(core, client) - -lazy val timerExample = project - .in(file("examples/timer")) - .settings( - commonSettings, - examplesSettings - ) - .settings(noPublishSettings) .enablePlugins(ScalaNativePlugin) .dependsOn(core) diff --git a/client/curl.scala b/client/curl.scala deleted file mode 100644 index 4efae65..0000000 --- a/client/curl.scala +++ /dev/null @@ -1,414 +0,0 @@ -package scala.scalanative.loop -import scala.scalanative.unsafe._ -import scala.scalanative.unsigned._ -import scala.collection.mutable -import scala.scalanative.libc.stdlib._ -import scala.scalanative.libc.string._ -import scala.concurrent._ -import scala.concurrent.duration._ - -case class ResponseState( - var code: Int = 200, - var headers: mutable.Map[String, String] = mutable.Map(), - var body: String = "" -) - -object Curl { - import LibCurl._ - import LibCurlConstants._ - import LibUVConstants._ - - var serial = 0L - - var multi: MultiCurl = null - - val requestPromises = mutable.Map[Long, Promise[ResponseState]]() - val requests = mutable.Map[Long, ResponseState]() - - var initialized = false - - def init(): Unit = { - if (!initialized) { - println("initializing curl") - global_init(1) - multi = multi_init() - println(s"initilized multiHandle $multi") - println("socket function") - val setopt_r_1 = - multi_setopt_ptr(multi, SOCKETFUNCTION, func_to_ptr(socketCB)) - println("timer function") - val setopt_r_2 = - multi_setopt_ptr(multi, TIMERFUNCTION, func_to_ptr(startTimerCB)) - println(s"timerCB: $startTimerCB") - - initialized = true - println("done") - } - } - - def startRequest( - method: Int, - url: String, - headers: Seq[String] = Seq.empty, - body: String = "" - ): Future[ResponseState] = Zone { implicit z => - init() - val curlHandle = easy_init() - serial += 1 - val reqId = serial - println(s"initializing handle $curlHandle for request $reqId") - val req_id_ptr = malloc(sizeof[Long]).asInstanceOf[Ptr[Long]] - !req_id_ptr = reqId - requests(reqId) = ResponseState() - val promise = Promise[ResponseState]() - requestPromises(reqId) = promise - - method match { - case GET => - check(easy_setopt_ptr(curlHandle, URL, toCString(url)), "easy_setopt") - check( - easy_setopt_ptr(curlHandle, WRITECALLBACK, func_to_ptr(dataCB)), - "easy_setopt" - ) - check( - easy_setopt_ptr( - curlHandle, - WRITEDATA, - req_id_ptr.asInstanceOf[Ptr[Byte]] - ), - "easy_setopt" - ) - check( - easy_setopt_ptr(curlHandle, HEADERCALLBACK, func_to_ptr(headerCB)), - "easy_setopt" - ) - check( - easy_setopt_ptr( - curlHandle, - HEADERDATA, - req_id_ptr.asInstanceOf[Ptr[Byte]] - ), - "easy_setopt" - ) - check( - easy_setopt_ptr( - curlHandle, - PRIVATEDATA, - req_id_ptr.asInstanceOf[Ptr[Byte]] - ), - "easy_setopt" - ) - case POST => - // TODO - // notes: https://curl.haxx.se/libcurl/c/http-post.html - // https://curl.haxx.se/libcurl/c/CURLOPT_POST.html - ??? - case PUT => - // TODO - // notes: https://curl.haxx.se/libcurl/c/httpput.html - // https://curl.haxx.se/libcurl/c/CURLOPT_PUT.html - ??? - } - multi_add_handle(multi, curlHandle) - - println("request initialized") - promise.future - } - - val dataCB: CurlDataCallback =( - ptr: Ptr[Byte], - size: CSize, - nmemb: CSize, - data: Ptr[Byte] - ) => { - val serial = !(data.asInstanceOf[Ptr[Long]]) - val len = stackalloc[Double]() - !len = 0 - val strData = bufferToString(ptr, size, nmemb) - println(s"req $serial: got data of size ${size} x ${nmemb}") - - val resp = requests(serial) - resp.body = resp.body + strData - requests(serial) = resp - - size * nmemb - } - - val headerCB: CurlDataCallback = ( - ptr: Ptr[Byte], - size: CSize, - nmemb: CSize, - data: Ptr[Byte] - )=> { - val serial = !(data.asInstanceOf[Ptr[Long]]) - val len = stackalloc[Double]() - !len = 0 - val strData = bufferToString(ptr, size, nmemb) - println(s"req $serial: got header line of size ${size} x ${nmemb}") - - val resp = requests(serial) - resp.body = resp.body + strData - requests(serial) = resp - - size * nmemb - } - - val socketCB: CurlSocketCallback = ( - curl: Curl, - socket: Int, - action: Int, - data: Ptr[Byte], - socket_data: Ptr[Byte] - ) => { - println(s"socketCB called with action ${action}") - val pollHandle = if (socket_data == null) { - println(s"initializing handle for socket ${socket}") - val poll = Poll(socket) - check( - multi_assign(multi, socket, poll.ptr), - "multi_assign" - ) - poll - } else { - new Poll(socket_data) - } - - val in = action == POLL_IN || action == POLL_INOUT - val out = action == POLL_OUT || action == POLL_INOUT - - if (in || out) { - println( - s"starting poll with in = $in and out = $out" - ) - pollHandle.start(in, out) { res => - println( - s"ready_for_curl fired with status ${res.result} and readable = ${res.readable} writable = ${res.writable}" - ) - var actions = 0 - if (res.readable) actions |= 1 - if (res.writable) actions |= 2 - val running_handles = stackalloc[Int]() - val result = - multi_socket_action(multi, socket, actions, running_handles) - println(("multi_socket_action", result)) - } - } else { - println("stopping poll") - pollHandle.stop() - startTimerCB(multi, 1, null) - } - 0 - } - - val startTimerCB: CurlTimerCallback = (curl: MultiCurl, timeout_ms: Long, data: Ptr[Byte]) => { - println(s"start_timer called with timeout ${timeout_ms} ms") - val time = if (timeout_ms < 1) { - println("setting effective timeout to 1") - 1 - } else timeout_ms - println("starting timer") - Timer.timeout(time.millis) { () => - println("in timeout callback") - val running_handles = stackalloc[Int]() - multi_socket_action(multi, -1, 0, running_handles) - println(s"on_timer fired, ${!running_handles} sockets running") - } - println("cleaning up requests") - cleanup_requests() - println("done") - 0 - } - - def cleanup_requests(): Unit = { - val messages = stackalloc[Int]() - val privateDataPtr = stackalloc[Ptr[Long]]() - var message: Ptr[CurlMessage] = multi_info_read(multi, messages) - while (message != null) { - println( - s"Got a message ${message._1} from multi_info_read, ${!messages} left in queue" - ) - val handle: Curl = message._2 - println(s"about to getInfo on handle $handle") - check( - easy_getinfo( - handle, - GET_PRIVATEDATA, - privateDataPtr.asInstanceOf[Ptr[Byte]] - ), - "getinfo" - ) - // Printf.printf(c"private data ptr: %p\n",privateDataPtr) - println(s"ok? $privateDataPtr") - val privateData = !privateDataPtr - // stdio.printf(c"privateDataPtr: %p privateData: %p\n", privateDataPtr, privateData) - println(s"getting refId from $privateData") - val reqId = !privateData - val reqData = requests.remove(reqId).get - // Curl.complete_request(reqId,reqData) - val promise = Curl.requestPromises.remove(reqId).get - promise.success(reqData) - message = multi_info_read(multi, messages) - } - println("done handling messages") - } - - def bufferToString(ptr: Ptr[Byte], size: CSize, nmemb: CSize): String = { - val byteSize = size * nmemb - val buffer = malloc(byteSize + 1L.toULong) - strncpy(buffer, ptr, byteSize + 1L.toULong) - val res = fromCString(buffer) - free(buffer) - return (res) - } - - def multi_setopt(curl: MultiCurl, option: CInt, parameters: CVarArg*): Int = - Zone { implicit z => - curl_multi_setopt(curl, option, toCVarArgList(parameters.toSeq)) - } - - def easy_setopt(curl: Curl, option: CInt, parameters: CVarArg*): Int = Zone { - implicit z => - curl_easy_setopt(curl, option, toCVarArgList(parameters.toSeq)) - } - - def func_to_ptr(f: CFuncPtr): Ptr[Byte] = { - CFuncPtr.toPtr(f) - } - -} - -object LibCurlConstants { - val WRITEDATA = 10001 - val URL = 10002 - val PORT = 10003 - val USERPASSWORD = 10005 - val READDATA = 10009 - val HEADERDATA = 10029 - val PRIVATEDATA = 10103 - val WRITECALLBACK = 20011 - val READCALLBACK = 20012 - val HEADERCALLBACK = 20079 - val TIMEOUT = 13 - val GET = 80 - val POST = 47 - val PUT = 54 - val CONTENTLENGTHDOWNLOADT = 0x300000 + 15 - val GET_PRIVATEDATA = 0x100000 + 21 - val SOCKETFUNCTION = 20001 - val SOCKETDATA = 20002 - val TIMERFUNCTION = 20004 - val TIMERDATA = 20005 - val HTTPHEADER = 10023 - - val POLL_NONE = 0 - val POLL_IN = 1 - val POLL_OUT = 2 - val POLL_INOUT = 3 - val POLL_REMOVE = 4 -} - -@link("curl") -@extern object LibCurl { - type Curl = Ptr[Byte] - type CurlBuffer = CStruct2[CString, CSize] - type CurlOption = Int - type CurlRequest = CStruct4[Ptr[Byte], Long, Long, Int] - type CurlMessage = CStruct3[Int, Curl, Ptr[Byte]] - - type CurlDataCallback = CFuncPtr4[Ptr[Byte], CSize, CSize, Ptr[Byte], CSize] - type CurlSocketCallback = - CFuncPtr5[Curl, CInt, CInt, Ptr[Byte], Ptr[Byte], CInt] - type CurlTimerCallback = CFuncPtr3[MultiCurl, Long, Ptr[Byte], CInt] - - @name("curl_global_init") - def global_init(flags: Long): Unit = extern - - @name("curl_global_cleanup") - def global_cleanup(): Unit = extern - - @name("curl_easy_init") - def easy_init(): Curl = extern - - @name("curl_easy_cleanup") - def easy_cleanup(handle: Curl): Unit = extern - - @name("curl_easy_setopt") - def curl_easy_setopt( - handle: Curl, - option: CInt, - parameter: CVarArgList - ): CInt = extern - - @name("curl_easy_setopt") - def easy_setopt_ptr(handle: Curl, option: CInt, parameter: Ptr[Byte]): CInt = - extern - - @name("curl_easy_getinfo") - def easy_getinfo(handle: Curl, info: CInt, parameter: Ptr[Byte]): CInt = - extern - - @name("curl_easy_perform") - def easy_perform(easy_handle: Curl): CInt = extern - - // START:curl_multi_bindings - type MultiCurl = Ptr[Byte] - - @name("curl_multi_init") - def multi_init(): MultiCurl = extern - - @name("curl_multi_add_handle") - def multi_add_handle(multi: MultiCurl, easy: Curl): Int = extern - - @name("curl_multi_setopt") - def curl_multi_setopt( - multi: MultiCurl, - option: CInt, - parameter: CVarArg - ): CInt = extern - - @name("curl_multi_setopt") - def multi_setopt_ptr( - multi: MultiCurl, - option: CInt, - parameter: Ptr[Byte] - ): CInt = extern - - @name("curl_multi_assign") - def multi_assign( - multi: MultiCurl, - socket: Int, - socket_data: Ptr[Byte] - ): Int = extern - - @name("curl_multi_socket_action") - def multi_socket_action( - multi: MultiCurl, - socket: Int, - events: Int, - numhandles: Ptr[Int] - ): Int = extern - - @name("curl_multi_info_read") - def multi_info_read(multi: MultiCurl, message: Ptr[Int]): Ptr[CurlMessage] = - extern - - @name("curl_multi_perform") - def multi_perform(multi: MultiCurl, numhandles: Ptr[Int]): Int = extern - - @name("curl_multi_cleanup") - def multi_cleanup(multi: MultiCurl): Int = extern - // END:curl_multi_bindings - - // START:curlHeaders - type CurlSList = CStruct2[Ptr[Byte], CString] - - @name("curl_slist_append") - def slist_append(slist: Ptr[CurlSList], string: CString): Ptr[CurlSList] = - extern - - @name("curl_slist_free_all") - def slist_free_all(slist: Ptr[CurlSList]): Unit = extern - // END:curlHeaders - - def curl_easy_strerror(code: Int): CString = extern -} diff --git a/examples/curl/main.scala b/examples/curl/main.scala deleted file mode 100644 index 2df54ab..0000000 --- a/examples/curl/main.scala +++ /dev/null @@ -1,11 +0,0 @@ -import scala.scalanative.loop._ -import LibCurlConstants._ -import scala.concurrent.ExecutionContext.Implicits.global - -object Main { - def main(args: Array[String]): Unit = { - Curl.startRequest(GET, "http://www.example.com", Seq()).map { response => - println(s"got response: $response") - } - } -} diff --git a/examples/server/main.scala b/examples/server/main.scala deleted file mode 100644 index 4108473..0000000 --- a/examples/server/main.scala +++ /dev/null @@ -1,16 +0,0 @@ -import scala.scalanative.loop._ - -object Main { - def main(args: Array[String]): Unit = { - Server.init(9999) { (r, c) => - println(s"received request $r on connection $c") - Server.respond( - c, - 200, - "OK", - Seq(("Content-Type", "text/plain"), ("Content-Length", "6")), - "hello!" - ) - } - } -} diff --git a/examples/timer/main.scala b/examples/timer/main.scala deleted file mode 100644 index 075ca19..0000000 --- a/examples/timer/main.scala +++ /dev/null @@ -1,19 +0,0 @@ -import scala.scalanative.loop._ -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global - -object Main { - def main(args: Array[String]): Unit = { - Timer - .delay(3.seconds) - .flatMap { _ => - println("beep") - Timer.delay(2.seconds) - } - .flatMap { _ => - println("boop") - Timer.delay(1.second) - } - .onComplete { _ => println("done") } - } -} diff --git a/pipe/pipe.scala b/pipe/pipe.scala deleted file mode 100644 index 866d608..0000000 --- a/pipe/pipe.scala +++ /dev/null @@ -1,130 +0,0 @@ -package scala.scalanative.loop -import scala.scalanative.unsafe._ -import scala.scalanative.unsigned._ -import scala.scalanative.libc.stdlib - -import scala.collection.mutable -import scala.concurrent.Future -import scala.concurrent.{Promise} - -case class Handle(serial: Long, handle: Ptr[Byte]) { - import LibUV._ - - def stream( - itemHandler: StreamIO.ItemHandler, - doneHandler: StreamIO.DoneHandler - ): Unit = { - StreamIO.streams(serial) = (itemHandler, doneHandler) - uv_read_start(handle, StreamIO.allocCB, StreamIO.readCB) - } - - def streamUntilDone(handler: StreamIO.ItemHandler): Future[Long] = { - val promise = Promise[Long]() - - val itemHandler: StreamIO.ItemHandler = (data, handle, id) => - try { - handler(data, handle, id) - } catch { - case t: Throwable => promise.failure(t) - } - - val doneHandler: StreamIO.DoneHandler = (handle, id) => promise.success(id) - - stream(itemHandler, doneHandler) - promise.future - } -} - -object StreamIO { - import LibUV._, LibUVConstants._ - type ItemHandler = ((String, PipeHandle, Long) => Unit) - type DoneHandler = ((PipeHandle, Long) => Unit) - type Handlers = (ItemHandler, DoneHandler) - var streams = mutable.HashMap[Long, Handlers]() - var serial = 0L - - def fromPipe(fd: Int): Handle = { - val id = serial - serial += 1 - val handle = stdlib.malloc(uv_handle_size(UV_PIPE_T)) - uv_pipe_init(EventLoop.loop, handle, 0) - val pipe_data = handle.asInstanceOf[Ptr[Long]] - !pipe_data = id - uv_pipe_open(handle, fd) - Handle(id, handle) - } - - def stream( - fd: Int - )(itemHandler: ItemHandler, doneHandler: DoneHandler): Handle = { - val pipeId = serial - serial += 1 - val handle = stdlib.malloc(uv_handle_size(UV_PIPE_T)) - uv_pipe_init(EventLoop.loop, handle, 0) - val pipe_data = handle.asInstanceOf[Ptr[Long]] - !pipe_data = serial - streams(serial) = (itemHandler, doneHandler) - - uv_pipe_open(handle, fd) - uv_read_start(handle, allocCB, readCB) - - Handle(pipeId, handle) - } - - def open(fd: Int): (PrepareHandle, Long) = ??? - - def write(handle: PrepareHandle, content: String): Long = ??? - - // def defaultDone(handle:PipeHandle,id:Long):Unit = () - - val defaultDone: DoneHandler = (handle, id) => () - - val promises = mutable.HashMap[Long, Promise[Long]]() - def streamUntilDone(fd: Int)(handler: ItemHandler): Future[Long] = { - val promise = Promise[Long]() - - val itemHandler: ItemHandler = (data, handle, id) => - try { - handler(data, handle, id) - } catch { - case t: Throwable => promise.failure(t) - } - - val doneHandler: DoneHandler = (handle, id) => promise.success(id) - - stream(fd)(itemHandler, doneHandler) - promise.future - } - - val allocCB: AllocCB = (client: PipeHandle, size: CSize, buffer: Ptr[Buffer]) => { - val buf = stdlib.malloc(4096L.toULong) - buffer._1 = buf - buffer._2 = 4096L.toULong - } - - val readCB: ReadCB = (handle: PipeHandle, size: CSSize, buffer: Ptr[Buffer]) => { - val pipe_data = handle.asInstanceOf[Ptr[Int]] - val pipeId = !pipe_data - if (size < 0L) { - val doneHandler = streams(pipeId)._2 - doneHandler(handle, pipeId) - streams.remove(pipeId) - } else { - val data = bytesToString(buffer._1, size) - stdlib.free(buffer._1) - val itemHandler = streams(pipeId)._1 - itemHandler(data, handle, pipeId) - } - () - } - - def bytesToString(data: Ptr[Byte], len: CSSize): String = { - val bytes = new Array[Byte](len.toInt) - var c = 0 - while (c < len) { - bytes(c) = !(data + c) - c += 1 - } - new String(bytes) - } -} diff --git a/server/parsing.scala b/server/parsing.scala deleted file mode 100644 index 0e5f061..0000000 --- a/server/parsing.scala +++ /dev/null @@ -1,155 +0,0 @@ -package scala.scalanative.loop -import scala.scalanative.unsafe._ -import scala.scalanative.unsigned._ -import scala.collection.mutable -import scala.scalanative.libc.stdlib - -case class RequestState( - url: String, - method: String, - var lastHeader: String = "None", - headerMap: mutable.Map[String, String] = mutable.Map[String, String](), - var body: String = "" -) - -object Parser { - import HttpParser._ - - val requests: mutable.Map[Long, RequestState] = mutable.Map() - def http_method_str(i: Int) = c"GET" - - val HTTP_REQUEST = 0 - val HTTP_RESPONSE = 1 - val HTTP_BOTH = 2 - - val connections = mutable.Map[Long, RequestState => Unit]() - def initConnection(id: Long)(callback: RequestState => Unit): Ptr[Parser] = { - connections(id) = callback - val parser = stdlib.malloc(sizeof[Parser]).asInstanceOf[Ptr[Parser]] - HttpParser.http_parser_init(parser, HTTP_REQUEST) - parser._8 = id - parser - } - - val onUrl: DataCB = (p: Ptr[Parser], data: CString, len: Long) => { - val url = bytesToString(data, len) - println(s"got url: $url") - // val state = (p._8).asInstanceOf[Ptr[ConnectionState]] - val message_id = p._8 - val m = p._6 - val method = fromCString(http_method_str(m)) - println(s"method: $method ($m), request id:$message_id") - requests(message_id) = RequestState(url, method) - 0 - } - - val onHeaderKey: DataCB = (p: Ptr[Parser], data: CString, len: Long) => { - val k = bytesToString(data, len) - println(s"got key: $k") - // val state = (p._8).asInstanceOf[Ptr[ConnectionState]] - // val message_id = state._1 - val message_id = p._8 - val request = requests(message_id) - - request.lastHeader = k - requests(message_id) = request - 0 - } - - val onHeaderValue: DataCB = (p: Ptr[Parser], data: CString, len: Long) => { - val v = bytesToString(data, len) - println(s"got value: $v") - // val state = (p._8).asInstanceOf[Ptr[ConnectionState]] - // val message_id = state._1 - val message_id = p._8 - val request = requests(message_id) - - request.headerMap(request.lastHeader) = v - requests(message_id) = request - 0 - } - - val onBody: DataCB = (p: Ptr[Parser], data: CString, len: Long) => { - // val state = (p._8).asInstanceOf[Ptr[ConnectionState]] - // val message_id = state._1 - val message_id = p._8 - val request = requests(message_id) - - val b = bytesToString(data, len) - request.body += b - requests(message_id) = request - 0 - } - - val onComplete: HttpCB = (p: Ptr[Parser]) => { - // val state = (p._8).asInstanceOf[Ptr[ConnectionState]] - // val message_id = state._1 - val message_id = p._8 - val request = requests(message_id) - val callback = connections(message_id) - callback(request) - // handleRequest(message_id,tcpHandle,request) - // println(s"message ${message_id} done! $request") - 0 - } - - def bytesToString(data: Ptr[Byte], len: Long): String = { - val bytes = new Array[Byte](len.toInt) - var c = 0 - while (c < len) { - bytes(c) = !(data + c) - c += 1 - } - - new String(bytes) - } - - val parserSettings = - stdlib.malloc(sizeof[ParserSettings]).asInstanceOf[Ptr[ParserSettings]] - http_parser_settings_init(parserSettings) - parserSettings._2 = onUrl - parserSettings._4 = onHeaderKey - parserSettings._5 = onHeaderValue - parserSettings._7 = onBody - parserSettings._8 = onComplete -} - -@extern -@link("http_parser") -object HttpParser { - type Parser = CStruct8[ - Long, // private data - Long, // private data - UShort, // major version - UShort, // minor version - UShort, // status (request only) - CChar, // method - CChar, // Error (last bit upgrade) - Long // user data (serial #) - ] - - type HttpCB = CFuncPtr1[Ptr[Parser], Int] - type DataCB = CFuncPtr3[Ptr[Parser], CString, Long, Int] - - type ParserSettings = CStruct8[ - HttpCB, // on_message_begin - DataCB, // on_url - DataCB, // on_status - DataCB, // on_header_field - DataCB, // on_header_value - HttpCB, // on_headers_complete - DataCB, // on_body - HttpCB // on_message_complete - ] - - def http_parser_init(p: Ptr[Parser], parser_type: Int): Unit = extern - def http_parser_settings_init(s: Ptr[ParserSettings]): Unit = extern - def http_parser_execute( - p: Ptr[Parser], - s: Ptr[ParserSettings], - data: Ptr[Byte], - len: CSSize - ): Long = extern - def http_method_str(method: CChar): CString = extern - -} diff --git a/server/server.scala b/server/server.scala deleted file mode 100644 index 9d19b47..0000000 --- a/server/server.scala +++ /dev/null @@ -1,133 +0,0 @@ -package scala.scalanative.loop -import scala.scalanative.unsafe._ -import scala.scalanative.unsigned._ -import scala.collection.mutable -import scala.scalanative.libc.stdlib._ -import scala.scalanative.libc.string._ -import scala.concurrent._ - -object Server { - import LibUV._, LibUVConstants._ - import Parser._ - - var serial = 0 - - val HTTP_REQUEST = 0 - val HTTP_RESPONSE = 1 - val HTTP_BOTH = 2 - - type ConnectionState = CStruct3[Long, LibUV.TCPHandle, Ptr[HttpParser.Parser]] - type RequestHandler = (RequestState, TCPHandle) => Unit - type WriteState = (Promise[Unit], Ptr[Buffer]) - - val listeners = mutable.Map[Long, RequestHandler]() - val writes = mutable.Map[Long, WriteState]() - - def respond( - client: TCPHandle, - code: Int, - desc: String, - headers: Seq[(String, String)], - body: String - ): Future[Unit] = { - var resp = new StringBuilder(s"HTTP/1.1 $code $desc\r\n") - for (h <- headers) { - resp ++= s"${h._1}: ${h._2}\r\n" - } - resp ++= "\r\n" + body - write(client, resp.toString) - } - - def write(client: TCPHandle, s: String): Future[Unit] = { - val buffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] - Zone { implicit z => - val temp_resp = toCString(s) - val resp_len = strlen(temp_resp) + 1L.toULong - buffer._1 = malloc(resp_len) - buffer._2 = resp_len - strncpy(buffer._1, temp_resp, resp_len) - } - - val writeReq = malloc(uv_req_size(UV_WRITE_REQ_T)).asInstanceOf[WriteReq] - serial += 1 - val id = serial - !(writeReq.asInstanceOf[Ptr[Long]]) = id - val promise = Promise[Unit]() - writes(id) = (promise, buffer) - check(uv_write(writeReq, client, buffer, 1, onWrite), "uv_write") - promise.future - } - - def close(client: TCPHandle): Unit = { - println(s"closing connection $client") - uv_close(client, null) - } - - def init(port: Int)(handler: RequestHandler): Unit = { - listeners(port) = handler - val addr = malloc(64L.toULong) - check(uv_ip4_addr(c"0.0.0.0", 9999, addr), "uv_ip4_addr") - val server = malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[TCPHandle] - check(uv_tcp_init(EventLoop.loop, server), "uv_tcp_init") - !(server.asInstanceOf[Ptr[Long]]) = port - check(uv_tcp_bind(server, addr, 0), "uv_tcp_bind") - check(uv_listen(server, 4096, onConnect), "uv_listen") - println("running") - println(s"callbacks: ${onConnect}, ${onAlloc}, ${onRead}, ${onWrite}") - } - - val onConnect: ConnectionCB = (server: TCPHandle, status: Int) => { - val port = !(server.asInstanceOf[Ptr[Long]]) - println(s"connection incoming on port $port with status $status") - val client = malloc(uv_handle_size(UV_TCP_T)).asInstanceOf[TCPHandle] - val handler = listeners(port) - - val state = malloc(sizeof[ConnectionState]) - .asInstanceOf[Ptr[ConnectionState]] - serial += 1 - val id = serial - - state._1 = id - state._2 = client - state._3 = Parser.initConnection(id) { r => handler(r, client) } - !(client.asInstanceOf[Ptr[Ptr[Byte]]]) = state.asInstanceOf[Ptr[Byte]] - - uv_tcp_init(EventLoop.loop, client) - uv_accept(server, client) - uv_read_start(client, onAlloc, onRead) - () - } - - val onAlloc: AllocCB = (handle: TCPHandle, size: CSize, buffer: Ptr[Buffer]) => { - val buf = malloc(4096L.toULong) - buf(4095) = 0.toByte - buffer._1 = buf - buffer._2 = 4095L.toULong - } - - val onRead: ReadCB = (handle: TCPHandle, size: CSSize, buffer: Ptr[Buffer]) => { - val state_ptr = handle.asInstanceOf[Ptr[Ptr[ConnectionState]]] - val parser = (!state_ptr)._3 - val message_id = (!state_ptr)._1 - println(s"conn $message_id: read message of size $size") - - if (size < 0L) { - uv_close(handle, null) - free(buffer._1) - } else { - HttpParser.http_parser_execute(parser, parserSettings, buffer._1, size) - free(buffer._1) - } - } - - val onWrite: WriteCB = (writeReq: WriteReq, status: Int) => { - val id = !(writeReq.asInstanceOf[Ptr[Long]]) - println(s"write $id completed") - val (promise, buffer) = writes.remove(id).get - free(buffer._1) - free(buffer.asInstanceOf[Ptr[Byte]]) - free(writeReq.asInstanceOf[Ptr[Byte]]) - promise.success(()) - () - } -}