Skip to content
Permalink
Browse files

Improvements to WebSocket binary file upload support

  • Loading branch information
darkfrog26 committed Nov 15, 2019
1 parent 5b4cf76 commit 1ad222d0c1aa53fa212b33ed6a79c7ff1f87190c
@@ -1,18 +1,17 @@
package io.youi.app

import io.youi.History
import io.youi.client.WebSocketClient
import io.youi.client.{BlobData, WebSocketClient}
import io.youi.communication.Connection
import io.youi.http.ConnectionStatus
import io.youi.net.{Protocol, URL}
import io.youi.util.Time
import org.scalajs.dom.{Event, File, FileReader}
import org.scalajs.dom.File

import scala.concurrent.{Future, Promise}
import scala.concurrent.Future
import scribe.Execution.global

import scala.concurrent.duration._
import scala.scalajs.js.typedarray.{ArrayBuffer, TypedArrayBuffer}

trait ClientConnectedApplication[C <: Connection] extends ClientApplication with YouIConnectedApplication[C] {
def communicationURL: Future[URL] = Future.successful {
@@ -60,19 +59,21 @@ trait ClientConnectedApplication[C <: Connection] extends ClientApplication with
}

def upload(file: File): Future[String] = {
val fileReader = new FileReader
val promise = Promise[String]
fileReader.onload = (_: Event) => {
val fileName = file.name
val bytes = file.size.toLong
val future = connection.upload(fileName, bytes)
val arrayBuffer = fileReader.result.asInstanceOf[ArrayBuffer]
val webSocket = connection.webSocket().getOrElse(throw new RuntimeException("Not connected!"))
webSocket.send.binary @= TypedArrayBuffer.wrap(arrayBuffer)
promise.completeWith(future)
val webSocket = connection.webSocket().getOrElse(throw new RuntimeException("Not connected!"))
val fileName = file.name
val bytes = file.size.toLong
val upload = connection.upload(fileName, bytes)
webSocket.send.binary @= BlobData(file)
upload.progress.attach { p =>
scribe.info(s"Progress: $p")
}
fileReader.readAsArrayBuffer(file)
promise.future
upload.percentage.attach { p =>
scribe.info(s"Percentage: $p")
}
upload.remaining.attach { r =>
scribe.info(s"Remaining: $r")
}
upload.future
}

private def updateConnection(): Future[Unit] = {
@@ -4,7 +4,7 @@ import sbtcrossproject.CrossType

name := "youi"
organization in ThisBuild := "io.youi"
version in ThisBuild := "0.12.6"
version in ThisBuild := "0.12.7-SNAPSHOT"
scalaVersion in ThisBuild := "2.13.1"
crossScalaVersions in ThisBuild := List("2.13.1", "2.12.10")
resolvers in ThisBuild ++= Seq(
@@ -1,7 +1,6 @@
package io.youi.communication

import java.nio.ByteBuffer
import io.youi.http.{ConnectionStatus, WebSocket}
import io.youi.http.{BinaryData, ByteBufferData, ConnectionStatus, WebSocket}
import profig.JsonUtil
import reactify.{Val, Var}
import reactify.reaction.Reaction
@@ -18,6 +17,7 @@ trait Connection {

// Created for binary output and removed upon completion
private var writer: Option[ByteBufferWriter] = None
private var uploads: Map[Long, Upload] = Map.empty

object hookups {
private var map = Map.empty[String, Hookup[Any]]
@@ -34,10 +34,14 @@ trait Connection {
hookup.receive(message)
}

def upload(fileName: String, bytes: Long): Future[String] = {
queue.enqueue(Message.uploadStart(fileName, bytes)).map { response =>
def upload(fileName: String, bytes: Long): Upload = synchronized {
val message = Message.uploadStart(fileName, bytes)
val future = queue.enqueue(message).map { response =>
response.name.get
}
val upload = Upload(fileName, bytes, future)
uploads += message.id -> upload
upload
}

protected def interface[Interface](implicit ec: ExecutionContext): Interface with Hookup[Interface] = macro HookupMacros.interface[Interface]
@@ -82,16 +86,18 @@ trait Connection {
}
}

private val receiveBinary: Reaction[ByteBuffer] = Reaction[ByteBuffer] { message =>
private val receiveBinary: Reaction[BinaryData] = Reaction[BinaryData] { message =>
lastActive.asInstanceOf[Var[Long]] @= System.currentTimeMillis()

writer match {
case Some(w) => {
w.write(message)
// TODO: support MessageType.UploadStatus
if (w.remaining == 0L) {
w.close()
w.promise.success(())
case Some(w) => message match {
case ByteBufferData(m) => {
w.write(m)
// TODO: support MessageType.UploadStatus
if (w.remaining == 0L) {
w.close()
w.promise.success(())
}
}
}
case None => scribe.info("No writer assigned!")
@@ -0,0 +1,15 @@
package io.youi.communication

import reactify._

import scala.concurrent.Future

case class Upload(fileName: String, bytes: Long, future: Future[String]) {
val progress: Val[Long] = Var(0L)
val percentage: Val[Int] = Val(math.floor((progress.toDouble / bytes.toDouble) * 100.0).toInt)
val remaining: Val[Long] = Val(bytes - progress)
}

object Upload {
def progress(upload: Upload, progress: Long): Unit = upload.progress.asInstanceOf[Var[Long]] @= progress
}
@@ -0,0 +1,6 @@
package io.youi.client

import io.youi.http.BinaryData
import org.scalajs.dom.Blob

case class BlobData(blob: Blob) extends BinaryData
@@ -1,8 +1,8 @@
package io.youi.client

import io.youi.http.{ConnectionStatus, WebSocket}
import io.youi.http.{ByteBufferData, ConnectionStatus, WebSocket}
import io.youi.net.URL
import org.scalajs.dom.{Blob, Event, FileReader, MessageEvent, WebSocket => WS}
import org.scalajs.dom.{Blob, Event, MessageEvent, WebSocket => WS}

import scala.concurrent.Future
import scribe.Execution.global
@@ -15,7 +15,7 @@ class WebSocketClient(url: URL) extends WebSocket {

override def connect(): Future[ConnectionStatus] = {
_status @= ConnectionStatus.Connecting
webSocket
webSocket.binaryType = "blob"
webSocket.addEventListener("open", (_: Event) => {
updateStatus()
})
@@ -30,26 +30,33 @@ class WebSocketClient(url: URL) extends WebSocket {
webSocket.addEventListener("message", (evt: MessageEvent) => {
evt.data match {
case s: String => receive.text @= s
case blob: Blob => {
val fileReader = new FileReader
fileReader.onload = (_: Event) => {
val arrayBuffer = fileReader.result.asInstanceOf[ArrayBuffer]
receive.binary @= TypedArrayBuffer.wrap(arrayBuffer)
}
fileReader.readAsArrayBuffer(blob)
}
case ab: ArrayBuffer => receive.binary @= TypedArrayBuffer.wrap(ab)
case blob: Blob => receive.binary @= BlobData(blob)
case ab: ArrayBuffer => receive.binary @= ByteBufferData(TypedArrayBuffer.wrap(ab))
}
})
send.text.attach(webSocket.send)
send.binary.attach { message =>
if (message.hasTypedArray()) {
webSocket.send(message.typedArray().buffer)
} else {
val array = new Array[Byte](message.remaining())
message.get(array)
val arrayBuffer = array.toTypedArray.buffer
webSocket.send(arrayBuffer)
send.binary.attach {
case BlobData(blob) => {
val chunkSize = 1024L * 1024L * 1L // 1 meg
val chunks = math.ceil(blob.size / chunkSize).toInt
val blobs = (0 until chunks).toList.map { index =>
val start = chunkSize * index
val end = start + chunkSize
blob.slice(start, end)
}
blobs.foreach { b =>
webSocket.send(b)
}
}
case ByteBufferData(message) => {
if (message.hasTypedArray()) {
webSocket.send(message.typedArray().buffer)
} else {
val array = new Array[Byte](message.remaining())
message.get(array)
val arrayBuffer = array.toTypedArray.buffer
webSocket.send(arrayBuffer)
}
}
}
status.future(s => s != ConnectionStatus.Connecting)
@@ -0,0 +1,7 @@
package io.youi.http

import java.nio.ByteBuffer

trait BinaryData

case class ByteBufferData(bb: ByteBuffer) extends BinaryData
@@ -1,11 +1,9 @@
package io.youi.http

import java.nio.ByteBuffer

import reactify.Channel

class WebSocketChannels {
val text: Channel[String] = Channel[String]
val binary: Channel[ByteBuffer] = Channel[ByteBuffer]
val binary: Channel[BinaryData] = Channel[BinaryData]
val close: Channel[Unit] = Channel[Unit]
}
@@ -1,13 +1,12 @@
package io.youi.example.screen

import java.nio.ByteBuffer

import io.youi.Template
import io.youi.{History, Template}
import io.youi.ajax.AjaxManager
import io.youi.app.screen.PreloadedContentScreen
import io.youi.dom._
import io.youi.example.ClientExampleApplication
import io.youi.net._
import org.scalajs.dom.{Event, FileReader, html, window}
import org.scalajs.dom.{Event, File, FormData, html, window}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
@@ -101,9 +100,32 @@ object CommunicationScreen extends ExampleScreen with PreloadedContentScreen {
evt.preventDefault()
evt.stopPropagation()
val file = uploadInput.files.item(0)
ClientExampleApplication.upload(file).foreach { actualFileName =>
scribe.info(s"Uploaded successfully: $actualFileName")
}
scribe.info("Upload start!")
uploadWebSocket(file)
// uploadAJAX(file)
})
}

private def uploadWebSocket(file: File): Unit = {
ClientExampleApplication.upload(file).foreach { actualFileName =>
scribe.info(s"Uploaded successfully: $actualFileName")
}
}

private lazy val ajax = new AjaxManager(1)

private def uploadAJAX(file: File): Unit = {
val action = ajax.enqueue(
url = History.url.withPath(path"/upload"),
data = Some(new FormData {
append("file", file, file.name)
})
)
action.percentage.attach { p =>
scribe.info(s"Percentage: $p")
}
action.future.foreach { request =>
scribe.info(s"Completed! ${action.percentage()}, ${action.cancelled()}, ${action.loaded()}")
}
}
}
@@ -0,0 +1,28 @@
package io.youi.example

import io.youi.http.content.Content
import io.youi.http.{HttpConnection, HttpStatus}
import io.youi.net.ContentType
import io.youi.server.handler.HttpHandler

import scala.concurrent.Future
import scribe.Execution.global

object AJAXUploadExample extends HttpHandler {
override def handle(connection: HttpConnection): Future[HttpConnection] = Future {
connection.request.content match {
case Some(content) => {
scribe.info(s"RECEIVED: $content")
connection.modify { response =>
response.withContent(Content.string("Received", ContentType.`text/plain`))
}
}
case None => {
scribe.info("Nothing received!")
connection.modify { response =>
response.withStatus(HttpStatus.NoContent).withContent(Content.string("No content sent", ContentType.`text/plain`))
}
}
}
}
}
@@ -35,6 +35,7 @@ object ServerExampleApplication extends ExampleApplication with ServerConnectedA
) / Application / ServerApplication.AppTemplate,
path"/cookies.html" / CookiesExample,
path"/session.html" / SessionExample,
path"/upload" / AJAXUploadExample,
ClassLoaderPath(pathTransform = (path: String) => s"content$path") / CachingManager.LastModified(),
path.startsWith("/app") / ClassLoaderPath()
)
@@ -10,7 +10,7 @@ import io.undertow.util.Headers
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder
import io.undertow.websockets.client.{WebSocketClientNegotiation, WebSocketClient => UndertowWebSocketClient}
import io.undertow.websockets.core._
import io.youi.http.{ConnectionStatus, WebSocket}
import io.youi.http.{ByteBufferData, ConnectionStatus, WebSocket}
import io.youi.net.URL
import io.youi.server.KeyStore
import io.youi.server.util.SSLUtil
@@ -97,9 +97,11 @@ class WebSocketClient(url: URL,
checkBacklog()
sendMessage(message)
}
send.binary.attach { message =>
checkBacklog()
sendMessage(message)
send.binary.attach {
case ByteBufferData(message) => {
checkBacklog()
sendMessage(message)
}
}
_status @= ConnectionStatus.Open
scribe.info(s"Connected to $url successfully")
@@ -215,8 +215,8 @@ object UndertowServerImplementation extends ServerImplementationCreator {
listener.send.text.attach { message =>
WebSockets.sendText(message, channel, null)
}
listener.send.binary.attach { message =>
WebSockets.sendBinary(message, channel, null)
listener.send.binary.attach {
case ByteBufferData(message) => WebSockets.sendBinary(message, channel, null)
}
listener.send.close.attach { _ =>
if (channel.isOpen) {
@@ -233,7 +233,7 @@ object UndertowServerImplementation extends ServerImplementationCreator {
}

override def onFullBinaryMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = {
message.getData.getResource.foreach(listener.receive.binary @= _)
message.getData.getResource.foreach(bb => listener.receive.binary @= ByteBufferData(bb))
super.onFullBinaryMessage(channel, message)
}

0 comments on commit 1ad222d

Please sign in to comment.
You can’t perform that action at this time.