Skip to content

Commit

Permalink
Build basic pre-forking server
Browse files Browse the repository at this point in the history
  • Loading branch information
kylef committed Dec 5, 2015
1 parent 750e319 commit 8b4c6c3
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 47 deletions.
93 changes: 93 additions & 0 deletions Sources/Arbiter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import Glibc
import Nest


enum Address {
case IP(address: String, port: UInt16)

func socket() throws -> Socket {
let socket = try Socket()
try socket.bind("0.0.0.0", port: 8000)
try socket.listen(20)
// TODO: Set socket non blocking
return socket
}
}


/// Arbiter maintains the worker processes
class Arbiter<Worker : WorkerType> {
var listeners: [Socket] = []
var workers: [pid_t: Worker] = [:]

let numberOfWorkers: Int
let addresses: [Address]

let application: RequestType -> ResponseType

init(application: RequestType -> ResponseType, workers: Int, addresses: [Address]) {
self.application = application
self.numberOfWorkers = workers
self.addresses = addresses
}

func createSockets() throws {
for address in addresses {
listeners.append(try address.socket())
print("[arbiter] Listening on port 8000")
}
}

// Main run loop for the master process
func run() throws {
try createSockets()

manageWorkers()

while true {
sleep()
manageWorkers()
}
}

func sleep() {
// Wait's for stuff happening on our signal
// TODO make signals for worker<>arbiter communcation
Glibc.sleep(10) // Until method is implemented, don't use CPU too much
}

// MARK: Worker

// Maintain number of workers by spawning or killing as required.
func manageWorkers() {
spawnWorkers()
killWorkers()
}

// Spawn workers until we have enough
func spawnWorkers() {
let neededWorkers = numberOfWorkers - workers.count
for _ in 0..<neededWorkers {
spawnWorker()
}
}

// Kill unused workers, oldest first
func killWorkers() {
// TODO
}

// Spawns a new worker process
func spawnWorker() {
let worker = Worker(listeners: listeners, application: application)

let pid = fork()
if pid != 0 {
workers[pid] = worker
print("[arbiter] Started worker process \(pid)")
return
}

worker.run()
}
}
11 changes: 6 additions & 5 deletions Sources/Currasow.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import Darwin
import Dispatch
import Nest
import Commander
import Inquiline

/*
@noreturn func serve(address: String, _ port: UInt16, closure: RequestType -> ResponseType) {
let socket: Socket
Expand Down Expand Up @@ -60,7 +59,7 @@ import Inquiline
print("Listening on \(address):\(port)")
dispatch_main()
}

*/

extension UInt16 : ArgumentConvertible {
public init(parser: ArgumentParser) throws {
Expand All @@ -79,9 +78,11 @@ extension UInt16 : ArgumentConvertible {

@noreturn public func serve(closure: RequestType -> ResponseType) {
command(
Option("workers", 1),
Option("address", "0.0.0.0"),
Option("port", UInt16(8000))
) { address, port in
serve(address, port, closure: closure)
) { workers, address, port in
let arbiter = Arbiter<SyncronousWorker>(application: closure, workers: workers, addresses: [Address.IP(address: address, port: port)])
try arbiter.run()
}.run()
}
20 changes: 20 additions & 0 deletions Sources/HTTPParser.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import Nest
import Inquiline


enum HTTPParserError : ErrorType {

}

class HTTPParser {
let socket: Socket

init(socket: Socket) {
self.socket = socket
}

func parse() throws -> RequestType {
// TODO build parser
return Request(method: "GET", path: "/")
}
}
66 changes: 24 additions & 42 deletions Sources/Socket.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import Darwin
import Dispatch
import Glibc


class Data {
Expand Down Expand Up @@ -56,7 +55,7 @@ class Socket {
let descriptor: Descriptor

init() throws {
descriptor = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
descriptor = socket(AF_INET, Int32(SOCK_STREAM.rawValue), 0)
assert(descriptor > 0)

var value: Int32 = 1;
Expand All @@ -70,72 +69,55 @@ class Socket {
}

func listen(backlog: Int32) throws {
if Darwin.listen(descriptor, backlog) == -1 {
if Glibc.listen(descriptor, backlog) == -1 {
throw SocketError()
}
}

func bind(address: String, port: Port) throws {
var addr = sockaddr_in(
sin_len: __uint8_t(sizeof(sockaddr_in)),
sin_family: sa_family_t(AF_INET),
sin_port: htons(in_port_t(port)),
sin_addr: in_addr(s_addr: inet_addr(address)),
sin_zero: (0, 0, 0, 0, 0, 0, 0, 0)
)

var saddr = sockaddr(
sa_len: 0,
sa_family: 0,
sa_data: (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
)

memcpy(&saddr, &addr, Int(addr.sin_len))

guard Darwin.bind(descriptor, &saddr, socklen_t(addr.sin_len)) != -1 else {
var addr = sockaddr_in()
addr.sin_family = sa_family_t(AF_INET)
addr.sin_port = in_port_t(htons(in_port_t(port)))
addr.sin_addr = in_addr(s_addr: in_addr_t(0))
addr.sin_zero = (0, 0, 0, 0, 0, 0, 0, 0)

let len = socklen_t(UInt8(sizeof(sockaddr_in)))
guard Glibc.bind(descriptor, sockaddr_cast(&addr), len) != -1 else {
throw SocketError()
}
}

func accept() throws -> Socket {
let descriptor = Darwin.accept(self.descriptor, nil, nil)
let descriptor = Glibc.accept(self.descriptor, nil, nil)
if descriptor == -1 {
throw SocketError()
}
return Socket(descriptor: descriptor)
}

func close() {
Darwin.close(descriptor)
Glibc.close(descriptor)
}

func send(output: String) {
output.withCString { bytes in
Darwin.send(descriptor, bytes, Int(strlen(bytes)), 0)
Glibc.send(descriptor, bytes, Int(strlen(bytes)), 0)
}
}

func consume(closure: (dispatch_source_t, Socket) -> ()) {
let source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, UInt(descriptor), 0, dispatch_get_main_queue())
dispatch_source_set_event_handler(source) {
closure(source, self)
}
dispatch_source_set_cancel_handler(source) {
self.close()
}
dispatch_resume(source)
}

func consumeData(closure: (Socket, Data) -> ()) {
consume { source, socket in
let estimated = dispatch_source_get_data(source)
let data = Data(capacity: Int(estimated))
let _ = read(socket.descriptor, data.bytes, data.capacity)
closure(socket, data)
}
/*
func read(bytes: Int) throws -> Data {
let data = Data(capacity: bytes)
let _ = Glibc.read(socket.descriptor, data.bytes, data.capacity)
return data
}
*/

private func htons(value: CUnsignedShort) -> CUnsignedShort {
return (value << 8) + (value >> 8)
}

private func sockaddr_cast(p: UnsafeMutablePointer<Void>) -> UnsafeMutablePointer<sockaddr> {
return UnsafeMutablePointer<sockaddr>(p)
}
}
105 changes: 105 additions & 0 deletions Sources/Worker.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import Nest
import Inquiline


protocol WorkerType {
init(listeners: [Socket], application: RequestType -> ResponseType)
func run()
}


final class SyncronousWorker : WorkerType {
let listeners: [Socket]
let timeout: Double = 0.5
let application: RequestType -> ResponseType
var isAlive: Bool = false

init(listeners: [Socket], application: RequestType -> ResponseType) {
self.listeners = listeners
self.application = application
}

func run() {
isAlive = true

if listeners.count == 1 {
runOne(listeners.first!)
} else {
runMultiple(listeners)
}
}

func runOne(listener: Socket) {
while isAlive {
notify()
accept(listener)
wait()
}
}

func runMultiple(listeners: [Socket]) {
// TODO multiple listners
fatalError("Currasow Syncronous worker cannot yet handle multiple listeners")
}

func notify() {
// TODO communicate with arbiter
}

func wait() -> [Socket] {
return []
}

func accept(listener: Socket) {
if let client = try? listener.accept() {
// TODO: Set socket non blocking
handle(client)
}
}

func handle(client: Socket) {
let parser = HTTPParser(socket: client)
if let request = try? parser.parse() {
handle(client, request: request)
}

client.close()
}

func handle(client: Socket, request: RequestType) {
let response = application(request)

client.send("HTTP/1.1 \(response.statusLine)\r\n")

client.send("Connection: Close\r\n")
var hasLength = false

for (key, value) in response.headers {
if key != "Connection" {
client.send("\(key): \(value)\r\n")
}

if key == "Content-Length" {
hasLength = true
}
}

if !hasLength {
if let body = response.body {
// TODO body shouldn't be a string
client.send("Content-Length: \(body.characters.count)\r\n")
} else {
client.send("Content-Length: 0\r\n")
}
}

client.send("\r\n")

if let body = response.body {
client.send(body)
}

print("[worker] \(request.method) \(request.path) - \(response.statusLine)")
}
}

0 comments on commit 8b4c6c3

Please sign in to comment.