From 5b721d1eede4123c76ab680380e1950690b69767 Mon Sep 17 00:00:00 2001 From: Marcelo Boeira Date: Sat, 3 Sep 2016 01:23:08 +0200 Subject: [PATCH] feature: Use channels do deal with memory concurrency By using a Channel is expected that the memory get no concurrent access, the Fibers should send all the requests over the Channel. The same will receive and perform in the received order. Additional changes: - Add a 'request' abstraction, to deal with parsing the received message and handling commands --- src/bojack/request.cr | 39 ++++++++++++++++++++++++++++++++++ src/bojack/server.cr | 49 ++++++++++++------------------------------- 2 files changed, 52 insertions(+), 36 deletions(-) create mode 100644 src/bojack/request.cr diff --git a/src/bojack/request.cr b/src/bojack/request.cr new file mode 100644 index 0000000..51edf10 --- /dev/null +++ b/src/bojack/request.cr @@ -0,0 +1,39 @@ +require "socket" +require "logger" +require "./logger" +require "./command" +require "./memory" + +module BoJack + class Request + @logger : ::Logger = BoJack::Logger.instance + def initialize(@body : String, @socket : TCPSocket, @memory : BoJack::Memory(String, Array(String))); end + + def perform + @logger.info("#{@socket.remote_address} requested: #{@body.strip}") + params = parse(@body) + command = BoJack::Command.from(params[:command]) + + response = command.run(@socket, @memory, params) + + @socket.puts(response) + rescue e + message = "error: #{e.message}" + @logger.error(message) + @socket.puts(message) + end + + private def parse(body) : Hash(Symbol, String | Array(String)) + body = body.split(" ").map { |item| item.strip } + + command = body[0] + result = Hash(Symbol, String | Array(String)).new + result[:command] = command + + result[:key] = body[1] if body[1]? + result[:value] = body[2].split(",") if body[2]? + + result + end + end +end diff --git a/src/bojack/server.cr b/src/bojack/server.cr index 10b626c..0d62ef2 100644 --- a/src/bojack/server.cr +++ b/src/bojack/server.cr @@ -1,8 +1,8 @@ require "socket" require "logger" require "./logger" +require "./request" require "./memory" -require "./command" require "./logo" module BoJack @@ -10,6 +10,8 @@ module BoJack @hostname : String @port : Int8 | Int16 | Int32 | Int64 @logger : ::Logger = BoJack::Logger.instance + @channel : Channel::Unbuffered(BoJack::Request) = Channel(BoJack::Request).new + @memory = BoJack::Memory(String, Array(String)).new def initialize(@hostname = "127.0.0.1", @port = 5000); end @@ -17,7 +19,6 @@ module BoJack server = TCPServer.new(@hostname, @port) server.tcp_nodelay = true server.recv_buffer_size = 4096 - memory = BoJack::Memory(String, Array(String)).new BoJack::Logo.render @@ -29,51 +30,27 @@ module BoJack exit end + spawn do + loop do + if request = @channel.receive + request.perform + end + end + end + loop do if socket = server.accept @logger.info("#{socket.remote_address} connected") spawn do loop do - request = socket.gets - break unless request - - @logger.info("#{socket.remote_address} requested: #{request.strip}") - - begin - params = parse_request(request) - command = BoJack::Command.from(params[:command]) - - response = command.run(socket, memory, params) - - if command.is_a?(BoJack::Commands::Close) - break - end - - socket.puts(response) - rescue e - message = "error: #{e.message}" - @logger.error(message) - socket.puts(message) + if message = socket.gets + @channel.send(BoJack::Request.new(message, socket, @memory)) end end end end end end - - private def parse_request(request) : Hash(Symbol, String | Array(String)) - request = request.split(" ").map { |item| item.strip } - - command = request[0] - result = Hash(Symbol, String | Array(String)).new - result[:command] = command - - result[:key] = request[1] if request[1]? - result[:value] = request[2].split(",") if request[2]? - - result - end end - end