Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Server uses nio backing (though queuing is kind of bad)

  • Loading branch information...
commit 67830011e1cb5a3cd524c9c0b6659dbeb0584478 1 parent 78d2404
@untoldwind authored
View
6 client/src/main/java/com/objectcode/lostsocks/client/engine/ThreadCommunication.java
@@ -194,7 +194,7 @@ public void run() {
CompressedPacket connectionCloseResult = sendHttpMessage(configuration, RequestType.CONNECTION_CLOSE, connectionId, null);
- if ( connectionCloseResult == null ) {
+ if (connectionCloseResult == null) {
log.error("<CLIENT> SERVER fail closing");
}
log.info("<CLIENT> Disconnecting application (regular)");
@@ -240,7 +240,7 @@ public void run() {
public static CompressedPacket sendHttpMessage(IConfiguration config, RequestType requestType, String connectionId, CompressedPacket input) {
HttpClient client = config.createHttpClient();
- HttpRequest request = requestType.getHttpRequest(config.getTargetPath(), connectionId, input != null ? input.toEntity(): null);
+ HttpRequest request = requestType.getHttpRequest(config.getTargetPath(), connectionId, input != null ? input.toEntity() : null);
for (int retry = 0; retry <= config.getMaxRetries(); retry++) {
try {
@@ -249,7 +249,7 @@ public static CompressedPacket sendHttpMessage(IConfiguration config, RequestTyp
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
return CompressedPacket.fromEntity(response.getEntity());
}
- log.error("Failed request (try #" + retry + ") " + request + " Status: " + response.getStatusLine());
+ log.error("Failed request (try #" + retry + ") " + request.getRequestLine().getMethod() + " " + request.getRequestLine().getUri() + " Status: " + response.getStatusLine());
} catch (IOException e) {
log.error("IOException (try #" + retry + ") " + e, e);
return null;
View
3  project/Build.scala
@@ -22,7 +22,8 @@ object ApplicationBuild extends Build {
settings = buildSettings ++ assemblySettings ++ addArtifact(Artifact("client", "assembly"), assembly) ).settings(
organization := "com.objectcode.lostsocks",
libraryDependencies ++= Seq(
- "org.apache.httpcomponents" % "httpclient" % "4.2"
+ "org.apache.httpcomponents" % "httpclient" % "4.2",
+ "io.netty" % "netty" % "3.3.0.Final"
),
crossPaths := false,
unmanagedJars in Compile += file(System.getProperty("java.home") + "/lib/javaws.jar"),
View
88 server/app/controllers/Api.scala
@@ -1,18 +1,34 @@
package controllers
+import play.api.Play.current
import scala.math.min
-import play.api.mvc.{Action, Controller}
-import play.api.Logger
+import play.api.mvc.Controller
import models.{ConnectionTable, IdGenerator, CompressedPacket}
import utils.IPHelper
-import engine.{Connection, ExtendedConnection}
-
+import play.api.Logger
+import play.api.libs.concurrent.Akka
+import engine.{ConnectionActor, Connection, ExtendedConnection}
+import akka.actor.Props
+import akka.util.duration._
+import akka.pattern._
+import java.util.concurrent.LinkedBlockingQueue
+import play.api.libs.iteratee.Input
+import akka.dispatch.Await
+import akka.actor.Status.{Failure, Success}
+import collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+import java.util.ArrayList
+import akka.util.{ByteString, Timeout}
+import play.api.libs.iteratee.Input.{El, EOF, Empty}
+import engine.ConnectionActor.Disconnect
object Api extends Controller with Secured with CompressedPacketFormat {
val SERVER_TIMOUT = 120
val SUPPORTED_CLIENT_VERSIONS = List("1.0.2")
+ implicit val timeout = Timeout(10 seconds)
+
def versionCheck = BasicAuthenticated(compressedPacket) {
implicit request =>
if (SUPPORTED_CLIENT_VERSIONS.contains(request.body.asString)) {
@@ -39,15 +55,14 @@ object Api extends Controller with Secured with CompressedPacketFormat {
var userTimeout = parts(2).toInt
if (userTimeout < 0) userTimeout = 0
- val conn = new Connection()
- if (!conn.connect(host, port)) {
- Logger.warn("Connection " + connectionId + " failed from " + iprev + "(" + ip + ") to " + host + ":" + port)
- MethodNotAllowed("Server was unable to connect to " + host + ":" + port)
- } else {
- Logger.info("Connection " + connectionId + " created from " + iprev + "(" + ip + ") to " + host + ":" + port)
+ val downQueue = new LinkedBlockingQueue[Input[ByteString]]
+ val connectionActor = Akka.system.actorOf(Props(new ConnectionActor(host, port, downQueue)))
+
+ try {
+ Await.result(connectionActor ? ConnectionActor.Connect, timeout.duration)
+ Logger.info("Connection " + connectionId + " created from " + iprev + "(" + ip + ") to " + host + ":" + port)
val extConn = new ExtendedConnection()
- extConn.conn = conn
extConn.ip = ip
extConn.iprev = iprev
extConn.destIP = host
@@ -60,11 +75,17 @@ object Api extends Controller with Secured with CompressedPacketFormat {
else extConn.timeout = min(userTimeout, SERVER_TIMOUT)
}
extConn.authorizedTime = 0
-
+ extConn.connectionActor = connectionActor
+ extConn.downQueue = downQueue
// Add this to the ConnectionTable
ConnectionTable(request.user).put(connectionId, extConn)
Ok(CompressedPacket(connectionId + ":" + host + ":" + port, false))
+ } catch {
+ case cause =>
+ Logger.warn("Connection " + connectionId + " failed from " + iprev + "(" + ip + ") to " + host + ":" + port + " " + cause)
+ connectionActor ! Disconnect
+ MethodNotAllowed("Server was unable to connect to " + host + ":" + port + " " + cause)
}
}
@@ -74,36 +95,45 @@ object Api extends Controller with Secured with CompressedPacketFormat {
extConn =>
val lastAccessDate = extConn.lastAccessDate
extConn.lastAccessDate = new java.util.Date()
- val conn = extConn.conn
+ val connectionActor = extConn.connectionActor
+ val downQueue = extConn.downQueue
// Add the sended bytes
extConn.uploadedBytes += request.body.data.size
// write the bytes
- conn.write(request.body.data)
+ connectionActor ! ConnectionActor.Write(request.body.data)
// Update the upload speed
val div = 1 + extConn.lastAccessDate.getTime - lastAccessDate.getTime
extConn.currentUploadSpeed = request.body.data.size.toDouble / div
// Build the response
- val buf = conn.read
- if (buf == null) {
+ var data = ByteString.empty
+ val available = downQueue.size
+ var hasEOF = false
+
+ for (i <- 1 to available) {
+ downQueue.take() match {
+ case El(bytes) => data = data ++ bytes
+ case EOF => hasEOF = true
+ case Empty =>
+ }
+ }
+
+ if (hasEOF) {
Logger.info("Connection closed: " + id)
// Remove the connection from the ConnectionTable
ConnectionTable(request.user).remove(id)
+ }
- Ok(CompressedPacket(Array.empty[Byte], true))
- } else {
- // Add the received bytes
- extConn.downloadedBytes += buf.size
+ // Add the received bytes
+ extConn.downloadedBytes += data.size
- // Update the download speed
- val div = 1 + extConn.lastAccessDate.getTime - lastAccessDate.getTime
- extConn.currentDownloadSpeed = buf.size.toDouble / div
+ // Update the download speed
+ extConn.currentDownloadSpeed = data.size.toDouble / div
- Ok(CompressedPacket(buf, false))
- }
+ Ok(CompressedPacket(data.toArray, hasEOF))
}.getOrElse(NotFound)
}
@@ -116,21 +146,21 @@ object Api extends Controller with Secured with CompressedPacketFormat {
Logger.info("Connection destroy : " + id)
extConn.lastAccessDate = new java.util.Date()
- val conn = extConn.conn
+ val connectionActor = extConn.connectionActor
// Close it
- conn.disconnect
+ connectionActor ! ConnectionActor.Disconnect
// Remove it from the ConnectionTable
ConnectionTable(request.user).remove(id)
// Build the response
- Ok(CompressedPacket("Destroyed", true));
+ Ok(CompressedPacket("Destroyed", true))
}.getOrElse {
println("Again1")
Logger.info("Connection already destroyed by timeout : " + id)
- Ok(CompressedPacket("Already destroyed", true));
+ Ok(CompressedPacket("Already destroyed", true))
}
}
View
53 server/app/engine/ConnectionActor.scala
@@ -0,0 +1,53 @@
+package engine
+
+import akka.actor.{ActorRef, IO, IOManager, Actor}
+import akka.actor.Status.{Failure, Success}
+import akka.util.ByteString
+import akka.actor.IO.{SocketHandle}
+import play.api.libs.iteratee.Input
+import play.api.libs.iteratee.Input.{El, EOF}
+import java.util.concurrent.{LinkedBlockingQueue, LinkedBlockingDeque}
+
+
+object ConnectionActor {
+ case class Connect()
+
+ case class Disconnect()
+
+ case class Write(bytes:Array[Byte])
+}
+
+class ConnectionActor(val host: String, val port: Int, val downQueue:LinkedBlockingQueue[Input[ByteString]]) extends Actor {
+
+ import ConnectionActor._
+
+ var socket: SocketHandle = null
+ var receiver: ActorRef = null
+
+ override protected def receive = {
+ case Connect =>
+ receiver = sender
+ socket = IOManager(context.system).connect(host, port)
+
+ case Disconnect =>
+ socket.close()
+ context.stop(self)
+
+ case Write(bytes) =>
+ socket.write(ByteString(bytes))
+
+ case IO.Connected(socket, address) =>
+ if ( receiver != null )
+ receiver ! Success
+ receiver = null
+
+ case IO.Closed(handle, causeOpt) =>
+ if ( receiver != null )
+ receiver ! causeOpt.map(Failure(_)).getOrElse(Failure(new RuntimeException))
+ receiver = null
+ downQueue.offer(EOF)
+
+ case IO.Read(socket, bytes) =>
+ downQueue.offer(El(bytes))
+ }
+}
View
8 server/app/engine/ExtendedConnection.scala
@@ -2,12 +2,15 @@ package engine
import play.api.libs.openid.UserInfo
import java.util.Date
+import akka.actor.ActorRef
+import play.api.libs.iteratee.Input
+import java.util.concurrent.LinkedBlockingQueue
+import akka.util.ByteString
class ExtendedConnection {
var ip:String = "?"
var iprev:String = "?"
var user:UserInfo = null
- var conn:Connection = null
val creationDate = new Date()
var lastAccessDate = new Date()
var uploadedBytes:Long = 0
@@ -19,4 +22,7 @@ class ExtendedConnection {
var currentDownloadSpeed:Double = 0
var timeout:Long = 0
var authorizedTime:Long = 0
+
+ var connectionActor:ActorRef = null
+ var downQueue:LinkedBlockingQueue[Input[ByteString]] = null
}
View
4 server/app/engine/ThreadPing.scala
@@ -28,14 +28,14 @@ object ThreadPing {
id =>
Logger.info("Closed connection " + id + " : Timeout reached...")
val extConn = table.get(id)
- extConn.map(_.conn.disconnect)
+ extConn.map(_.connectionActor ! ConnectionActor.Disconnect)
table.remove(id)
}
closeForAuthorizedTime.foreach {
id =>
Logger.info("Closed connection " + id + " : Maximum time reached...")
val extConn = table.get(id)
- extConn.map(_.conn.disconnect)
+ extConn.map(_.connectionActor ! ConnectionActor.Disconnect)
table.remove(id)
}
}
View
2  server/conf/application.conf
@@ -39,7 +39,7 @@ db.default.url="postgres://lostsocks:lostsocks@192.168.97.128/lostsocks"
# You can also configure logback (http://logback.qos.ch/), by providing a logger.xml file in the conf directory .
# Root logger:
-logger.root=ERROR
+logger.root=DEBUG
# Logger used by the framework:
logger.play=INFO
Please sign in to comment.
Something went wrong with that request. Please try again.