Skip to content

Commit

Permalink
Finished preliminary layer of upload support through WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Oct 21, 2019
1 parent 200db9d commit ce0e1a2
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 74 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Expand Up @@ -2,8 +2,6 @@
target/
node_modules/
npm-debug.log
*.js
*.js.map
package-lock.json
temp/
utilities/config.json
Expand Down
Expand Up @@ -6,11 +6,13 @@ import io.youi.communication.Connection
import io.youi.http.ConnectionStatus
import io.youi.net.{Protocol, URL}
import io.youi.util.Time
import org.scalajs.dom.{Event, File, FileReader}

import scala.concurrent.Future
import scala.concurrent.{Future, Promise}
import scribe.Execution.global

import scala.concurrent.duration._
import scala.scalajs.js.typedarray.{ArrayBuffer, TypedArrayBuffer}

trait ClientConnectedApplication[C <: Connection] extends ClientApplication with YouIConnectedApplication[C] {
def communicationURL: URL = {
Expand Down Expand Up @@ -54,6 +56,22 @@ trait ClientConnectedApplication[C <: Connection] extends ClientApplication with
ws.connect()
}

def upload(file: File): Future[String] = {
val fileReader = new FileReader
val promise = Promise[String]
fileReader.onload = (_: Event) => {
val fileName = file.name
val bytes = file.size.toLong
val future = connection.upload(fileName, bytes)
val arrayBuffer = fileReader.result.asInstanceOf[ArrayBuffer]
val webSocket = connection.webSocket().getOrElse(throw new RuntimeException("Not connected!"))
webSocket.send.binary @= TypedArrayBuffer.wrap(arrayBuffer)
promise.completeWith(future)
}
fileReader.readAsArrayBuffer(file)
promise.future
}

private def updateConnection(): Future[Unit] = {
val lastCommunication = System.currentTimeMillis() - connection.lastActive
if (connection.status() == ConnectionStatus.Open && lastCommunication > 30.seconds.toMillis) {
Expand Down
@@ -0,0 +1,5 @@
package io.youi.communication

object CommunicationPlatform {
def createWriter(fileName: String, bytes: Long): ByteBufferWriter = throw new UnsupportedOperationException("Generating files in the browser is not yet supported")
}
@@ -0,0 +1,18 @@
package io.youi.communication

import java.nio.ByteBuffer
import java.nio.channels.FileChannel

case class ChannelWriter(fileName: String, actualFileName: String, channel: FileChannel, bytes: Long) extends ByteBufferWriter {
private var _written: Long = 0L

override def written: Long = _written

override def remaining: Long = bytes - written

override def write(bb: ByteBuffer): Unit = {
_written += channel.write(bb)
}

override def close(): Unit = channel.close()
}
@@ -0,0 +1,12 @@
package io.youi.communication

import java.io.{File, FileOutputStream}

object CommunicationPlatform {
def createWriter(fileName: String, bytes: Long): ByteBufferWriter = {
// TODO: configure temp directory
val file = File.createTempFile("youi-", fileName)
val channel = new FileOutputStream(file).getChannel
ChannelWriter(fileName, file.getName, channel, bytes)
}
}

This file was deleted.

@@ -0,0 +1,16 @@
package io.youi.communication

import java.nio.ByteBuffer

import scala.concurrent.Promise

trait ByteBufferWriter {
val promise: Promise[Unit] = Promise[Unit]

def fileName: String
def actualFileName: String
def written: Long
def remaining: Long
def write(bb: ByteBuffer): Unit
def close(): Unit
}
Expand Up @@ -11,13 +11,14 @@ import scala.language.experimental.macros
import scribe.Execution.global

trait Connection {
private lazy val platform: ConnectionPlatform = new ConnectionPlatform(this)

val webSocket: Var[Option[WebSocket]] = Var(None)
val queue: HookupQueue = new HookupQueue
val status: Val[ConnectionStatus] = Val(webSocket().map(_.status()).getOrElse(ConnectionStatus.Closed))
val lastActive: Val[Long] = Var[Long](0L)

// Created for binary output and removed upon completion
private var writer: Option[ByteBufferWriter] = None

object hookups {
private var map = Map.empty[String, Hookup[Any]]

Expand All @@ -33,6 +34,12 @@ trait Connection {
hookup.receive(message)
}

def upload(fileName: String, bytes: Long): Future[String] = {
queue.enqueue(Message.uploadStart(fileName, bytes)).map { response =>
response.name.get
}
}

protected def interface[Interface](implicit ec: ExecutionContext): Interface with Hookup[Interface] = macro HookupMacros.interface[Interface]
protected def implementation[Interface, Implementation <: Interface](implicit ec: ExecutionContext): Implementation with Hookup[Interface] = macro HookupMacros.implementation[Interface, Implementation]

Expand All @@ -52,6 +59,18 @@ trait Connection {
} else {
scribe.warn(s"No id found for ${message.id}. Cannot apply: $message")
}
case MessageType.UploadStart => {
val w = CommunicationPlatform.createWriter(message.name.get, message.bytes.get)
writer = Some(w)
w.promise.future.foreach { _ =>
queue.enqueue(Message.uploadComplete(message.id, w.actualFileName))
}
}
case MessageType.UploadComplete => if (queue.success(message)) {
// Success
} else {
scribe.warn(s"No id found for ${message.id}. Cannot apply: $message")
}
case MessageType.Error => if (queue.failure(message.id, new RuntimeException(message.toString))) {
// Success
} else {
Expand All @@ -66,7 +85,17 @@ trait Connection {
private val receiveBinary: Reaction[ByteBuffer] = Reaction[ByteBuffer] { message =>
lastActive.asInstanceOf[Var[Long]] @= System.currentTimeMillis()

platform.receiveBinary(message)
writer match {
case Some(w) => {
w.write(message)
// TODO: support MessageType.UploadStatus
if (w.remaining == 0L) {
w.close()
w.promise.success(())
}
}
case None => scribe.info("No writer assigned!")
}
}

webSocket.changes {
Expand Down
Expand Up @@ -30,8 +30,10 @@ class HookupQueue {
def next(): Option[HookupRequest] = {
val o = Option(queue.poll())
o.foreach { r =>
running.put(r.request.id, r)
_hasRunning @= true
if (r.isRunning) {
running.put(r.request.id, r)
_hasRunning @= true
}
}
_hasNext @= !queue.isEmpty
o
Expand Down
Expand Up @@ -3,6 +3,7 @@ package io.youi.communication
import scala.concurrent.Promise

case class HookupRequest(request: Message, promise: Promise[Message]) {
def isRunning: Boolean = request.`type`.running
def success(response: Message): Unit = if (!promise.isCompleted) promise.success(response)
def failure(throwable: Throwable): Unit = if (!promise.isCompleted) promise.failure(throwable)
}
12 changes: 12 additions & 0 deletions communication/src/main/scala/io/youi/communication/Message.scala
Expand Up @@ -10,6 +10,7 @@ case class Message(id: Long,
method: Option[String] = None,
params: Option[Json] = None,
returnValue: Option[Json] = None,
bytes: Option[Long] = None,
errorMessage: Option[String] = None)

object Message {
Expand All @@ -29,6 +30,17 @@ object Message {
method = Some(method),
returnValue = Some(returnValue)
)
def uploadStart(fileName: String, bytes: Long): Message = Message(
id = idGenerator.incrementAndGet(),
`type` = MessageType.UploadStart,
name = Some(fileName),
bytes = Some(bytes)
)
def uploadComplete(id: Long, fileName: String): Message = Message(
id = id,
`type` = MessageType.UploadComplete,
name = Some(fileName)
)
def error(id: Long, message: String): Message = Message(
id = id,
`type` = MessageType.Error,
Expand Down
Expand Up @@ -5,6 +5,7 @@ import io.circe.{Decoder, Encoder, HCursor, Json}

sealed trait MessageType {
def name: String
def running: Boolean
}

object MessageType {
Expand All @@ -20,17 +21,30 @@ object MessageType {

case object Invoke extends MessageType {
override def name: String = "invoke"
override def running: Boolean = true
}
case object Response extends MessageType {
override def name: String = "response"
override def running: Boolean = false
}
case object UploadStart extends MessageType {
override def name: String = "uploadStart"
override def running: Boolean = true
}
case object UploadComplete extends MessageType {
override def name: String = "uploadComplete"
override def running: Boolean = false
}
case object Error extends MessageType {
override def name: String = "error"
override def running: Boolean = false
}

def byName(name: String): MessageType = name match {
case "invoke" => Invoke
case "response" => Response
case "uploadStart" => UploadStart
case "uploadComplete" => UploadComplete
case "error" => Error
}
}
Expand Up @@ -101,25 +101,9 @@ object CommunicationScreen extends ExampleScreen with PreloadedContentScreen {
evt.preventDefault()
evt.stopPropagation()
val file = uploadInput.files.item(0)
val webSocket = connection.webSocket().get

val fileReader = new FileReader
fileReader.onload = (_: Event) => {
val arrayBuffer = fileReader.result.asInstanceOf[ArrayBuffer]
scribe.info(s"Sending: ${file.name} / ${arrayBuffer.byteLength} bytes")
val nameBytes = file.name.getBytes("UTF-8")
val nameLength = ByteBuffer.allocate(java.lang.Integer.BYTES)
nameLength.putInt(nameBytes.length)
nameLength.flip()
val byteLength = ByteBuffer.allocate(java.lang.Long.BYTES)
byteLength.putLong(file.size.toLong)
byteLength.flip()
webSocket.send.binary @= nameLength
webSocket.send.binary @= ByteBuffer.wrap(nameBytes)
webSocket.send.binary @= byteLength
webSocket.send.binary @= TypedArrayBuffer.wrap(arrayBuffer)
ClientExampleApplication.upload(file).foreach { actualFileName =>
scribe.info(s"Uploaded successfully: $actualFileName")
}
fileReader.readAsArrayBuffer(file)
})
}
}

0 comments on commit ce0e1a2

Please sign in to comment.