Skip to content

Commit

Permalink
Merge pull request #1 from noppoMan/fix_memory_leaks
Browse files Browse the repository at this point in the history
Fix memory leaks
  • Loading branch information
noppoMan committed Mar 9, 2016
2 parents 07792af + 35d109e commit 0a48eac
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 37 deletions.
3 changes: 2 additions & 1 deletion Sources/HTTPResponse.swift
Expand Up @@ -21,6 +21,7 @@ public class HTTPResponse {
*/
public var context: [String: Any] = [:]


private let shouldCloseConnection: Bool

/**
Expand Down Expand Up @@ -65,7 +66,7 @@ public class HTTPResponse {
self.parser = ResponseParser(
headerCompletion: onHeaderCompletion,
onBody: onBody,
messageCompletion: { response in
messageCompletion: { [unowned self] response in
if let cb = self.afterWriteCallback {
cb(response)
}
Expand Down
70 changes: 40 additions & 30 deletions Sources/HTTPServer.swift
Expand Up @@ -51,6 +51,12 @@ extension HTTP.Response {
}
}

private func closeAndDestroyHandle<T: Handle>(handle: UnsafeMutablePointer<T>){
handle.memory.close()
handle.destroy()
handle.dealloc(1)
}

public class HTTPServer {

/**
Expand All @@ -64,9 +70,9 @@ public class HTTPServer {
public var backlog: UInt = 1024

/**
Seconds for keep alive timeout, if zero keep alive is disabled. Default is 75 (Same as Nginx)
Seconds for keep alive timeout, if zero keep alive is disabled. Default is 15 (Same as Nginx)
*/
public var keepAliveTimeout: UInt = 75
public var keepAliveTimeout: UInt = 15

/**
Sets the maximum number of requests that can be served through one keep-alive connection. After the maximum number of requests are made, the connection is closed.
Expand Down Expand Up @@ -140,14 +146,15 @@ public class HTTPServer {
}

private func onConnection(queue: Pipe?) {
let client = TCP(loop: loop)
let client = UnsafeMutablePointer<TCP>.alloc(1)
client.initialize(TCP(loop: loop))

// accept connection
do {
try server.accept(client, queue: queue)
try server.accept(client.memory, queue: queue)
} catch {
self.userOnConnection(.Error(error))
client.close()
closeAndDestroyHandle(client)
}

// send handle to worker via ipc socket
Expand All @@ -163,81 +170,84 @@ public class HTTPServer {
// }
// } catch {
// debug(error)
// return client.close()
// return client.memory.close()
// }
// }

let req = HTTPRequest(request)

var req: HTTPRequest? = HTTPRequest(request)
var res: HTTPResponse? = nil

let onHeaderCompletion = { (response: HTTP.Response) -> () in
if response.shouldChunkedRespond {
client.write(response.headerDescription.bytes) { _ in
client.unref()
client.memory.write(response.headerDescription.bytes) { _ in
client.memory.unref()
}
}
}

let onBody = { (bytes: [Int8]) -> () in
client.write(HTTPResponse.encodeAsStreamChunk(bytes)) { _ in
client.unref()
client.memory.write(HTTPResponse.encodeAsStreamChunk(bytes)) { _ in
client.memory.unref()
}
}

let completionHandler = { [unowned self] (response: HTTP.Response?) -> () in
let completionHandler = { (response: HTTP.Response?) -> () in
req = nil
res = nil
if self.keepAliveTimeout == 0 || !request.keepAlive {
return client.close()
return closeAndDestroyHandle(client)
}
client.unref()
client.memory.unref()
}

let onMessageComplete = { (response: HTTP.Response) -> () in
let bodyBytes = response.shouldChunkedRespond ? "\(0)\(CRLF)\(CRLF)".bytes : response.byteDescription
client.write(bodyBytes) { _ in
client.memory.write(bodyBytes) { _ in
completionHandler(response)
}
}

let onParseFailed = { [unowned self] (error: ErrorType, streaming: Bool) -> () in
let onParseFailed = { (error: ErrorType, streaming: Bool) -> () in
debug(error)
if streaming {
return client.write("\(0)\(CRLF)\(CRLF)".bytes) { _ in
client.unref()
return client.memory.write("\(0)\(CRLF)\(CRLF)".bytes) { _ in
client.memory.unref()
}
}

let response = self.errorResponse(.InternalServerError)
client.write(response.description.bytes) { _ in
client.memory.write(response.description.bytes) { _ in
completionHandler(nil)
}
}

let res = HTTPResponse(
req,
res = HTTPResponse(
req!,
shouldCloseConnection: self.keepAliveTimeout == 0,
onHeaderCompletion: onHeaderCompletion,
onBody: onBody,
onMessageComplete: onMessageComplete,
onParseFailed: onParseFailed
)

self.userOnConnection(.Success(req, res))
self.userOnConnection(.Success(req!, res!))
}

client.read { [unowned client] result in
client.memory.read { [unowned self] result in
if case let .Data(buf) = result {
do {
let data: [Int8] = buf.bytes.map{ Int8(bitPattern: $0) }
try parser.parse(data)
} catch {
self.userOnConnection(.Error(error))
client.close()
closeAndDestroyHandle(client)
}
} else if case .Error(let error) = result {
self.userOnConnection(.Error(error))
client.close()
closeAndDestroyHandle(client)
} else {
// EOF
client.close()
closeAndDestroyHandle(client)
}
}
}
Expand All @@ -247,12 +257,12 @@ public class HTTPServer {
}

// sending handles over a pipe
private func sendHandleToWorker(client: TCP){
private func sendHandleToWorker(client: UnsafeMutablePointer<TCP>){
let worker = Cluster.workers[self.roundRobinCounter]

// send stream to worker with ipc
client.write2(worker.ipcPipe!)
client.close() // not immediately
client.memory.write2(worker.ipcPipe!)
closeAndDestroyHandle(client)

roundRobinCounter = (roundRobinCounter + 1) % Cluster.workers.count
}
Expand Down
2 changes: 1 addition & 1 deletion Suv
Submodule Suv updated 125 files
10 changes: 5 additions & 5 deletions makefile
@@ -1,7 +1,7 @@
CLibUv=CLibUv-0.1.0
COpenSSL=COpenSSL-0.1.0
HTTPParser=HTTPParser-0.1.2
CURIParser=CURIParser-0.1.0
CLibUv=CLibUv-*
COpenSSL=COpenSSL-*
HTTPParser=HTTPParser-*
CURIParser=CURIParser-*

BUILDOPTS=-Xlinker -L/usr/lib \
-Xcc -IPackages/$(CLibUv) \
Expand All @@ -19,7 +19,7 @@ OS := $(shell uname)
ifeq ($(OS),Darwin)
SWIFTC=xcrun -sdk macosx swiftc
BUILDOPTS=-Xlinker -L/usr/local/lib -Xcc -I/usr/local/include
COpenSSL=COpenSSL-OSX-0.1.0
COpenSSL=COpenSSL-OSX-*
endif

all: release
Expand Down

0 comments on commit 0a48eac

Please sign in to comment.