Permalink
Browse files

A stab at a multithreaded implementation of the log parser.

  • Loading branch information...
1 parent dcfa0b1 commit a0d4054070953964a4ab01334385119bf794484a @wvanbergen committed Sep 13, 2009
View
@@ -33,6 +33,7 @@ begin
command_line.option(:file, :alias => :e)
command_line.option(:mail, :alias => :m)
command_line.option(:parse_strategy, :default => 'assume-correct')
+ command_line.option(:parallel_lines, :default => '1')
command_line.option(:dump)
command_line.option(:aggregator, :alias => :a, :multiple => true)
@@ -73,10 +73,12 @@ def self.build(arguments)
options.store(:source_files, arguments.parameters)
end
+ options[:parallel_lines] = [arguments[:parallel_lines].to_i, 1].max
+ options[:parse_strategy] = arguments[:parse_strategy]
+
controller = Controller.new(RequestLogAnalyzer::Source::LogParser.new(file_format, options), options)
#controller = Controller.new(RequestLogAnalyzer::Source::DatabaseLoader.new(file_format, options), options)
-
- options[:parse_strategy] = arguments[:parse_strategy]
+
# register filters
if arguments[:after] || arguments[:before]
@@ -37,6 +37,7 @@ def initialize(format, options = {})
@current_file = nil
@current_lineno = nil
@source_files = options[:source_files]
+ @parallel_lines = options[:parallel_lines] || 1
@options[:parse_strategy] ||= DEFAULT_PARSE_STRATEGY
raise "Unknown parse strategy" unless PARSE_STRATEGIES.include?(@options[:parse_strategy])
@@ -51,7 +52,7 @@ def initialize(format, options = {})
def each_request(options = {}, &block) # :yields: :request, request
case @source_files
- when IO;
+ when IO
puts "Parsing from the standard input. Press CTRL+C to finish." # FIXME: not here
parse_stream(@source_files, options, &block)
when String
@@ -61,6 +62,8 @@ def each_request(options = {}, &block) # :yields: :request, request
else
raise "Unknown source provided"
end
+
+ warn(:unfinished_request_on_eof, "End of file reached, but last request was not completed!") unless @current_request.nil?
end
# Make sure the Enumerable methods work as expected
@@ -115,7 +118,7 @@ def parse_file(file, options = {}, &block)
# Parses an IO stream. It will simply call parse_io. This function does not support progress updates
# because the length of a stream is not known.
# <tt>stream</tt>:: The IO stream that should be parsed.
- # <tt>options</tt>:: A Hash of options that will be pased to parse_io.
+ # <tt>options</tt>:: A Hash of options that will be pased to parse_io.
def parse_stream(stream, options = {}, &block)
parse_io(stream, options, &block)
end
@@ -133,20 +136,29 @@ def parse_stream(stream, options = {}, &block)
# <tt>io</tt>:: The IO instance to use as source
# <tt>options</tt>:: A hash of options that can be used by the parser.
def parse_io(io, options = {}, &block) # :yields: request
- @current_lineno = 1
- io.each_line do |line|
- @progress_handler.call(:progress, io.pos) if @progress_handler && io.kind_of?(File)
-
- if request_data = file_format.parse_line(line) { |wt, message| warn(wt, message) }
- @parsed_lines += 1
- update_current_request(request_data.merge(:source => @current_source, :lineno => @current_lineno), &block)
+ require 'thread'
+ q = Queue.new
+
+ parser = Thread.new do
+ current_lineno = 1
+ while line = io.gets
+ @progress_handler.call(:progress, io.pos) if @progress_handler && io.kind_of?(File)
+ if request_data = file_format.parse_line(line) { |wt, message| warn(wt, message) }
+
+ q << request_data.merge(:source => @current_source, :lineno => current_lineno)
+ end
+ current_lineno += 1
end
-
- @current_lineno += 1
end
- warn(:unfinished_request_on_eof, "End of file reached, but last request was not completed!") unless @current_request.nil?
- @current_lineno = nil
+ request_builder = Thread.new do
+ while request_data = q.shift
+ @parsed_lines += 1
+ update_current_request(request_data, &block)
+ end
+ end
+
+ parser.join
end
# Add a block to this method to install a progress handler while parsing.
@@ -176,8 +188,8 @@ def source_changes=(proc)
#
# <tt>type</tt>:: The warning type (a Symbol)
# <tt>message</tt>:: A message explaining the warning
- def warn(type, message)
- @warning_handler.call(type, message, @current_lineno) if @warning_handler
+ def warn(type, message, lineno = nil)
+ @warning_handler.call(type, message, lineno) if @warning_handler
end
protected
@@ -210,16 +222,16 @@ def update_current_request(request_data, &block) # :yields: request
case options[:parse_strategy]
when 'assume-correct'
handle_request(@current_request, &block)
- @current_request = @file_format.request(request_data)
+ @current_request = file_format.request(request_data)
when 'cautious'
@skipped_lines += 1
warn(:unclosed_request, "Encountered header line (#{request_data[:line_definition].name.inspect}), but previous request was not closed!")
@current_request = nil # remove all data that was parsed, skip next request as well.
end
elsif footer_line?(request_data)
- handle_request(@file_format.request(request_data), &block)
+ handle_request(file_format.request(request_data), &block)
else
- @current_request = @file_format.request(request_data)
+ @current_request = file_format.request(request_data)
end
else
if @current_request
@@ -230,7 +242,7 @@ def update_current_request(request_data, &block) # :yields: request
end
else
@skipped_lines += 1
- warn(:no_current_request, "Parsebale line (#{request_data[:line_definition].name.inspect}) found outside of a request!")
+ warn(:no_current_request, "Parseble line (#{request_data[:line_definition].name.inspect}) found outside of a request!")
end
end
end
@@ -242,7 +254,7 @@ def update_current_request(request_data, &block) # :yields: request
# - It will update the parsed_requests and skipped_requests variables accordingly
#
# <tt>request</tt>:: The parsed request instance (RequestLogAnalyzer::Request)
- def handle_request(request, &block) # :yields: :request, request
+ def handle_request(request, &block) # :yields: request
@parsed_requests += 1
request.validate
accepted = block_given? ? yield(request) : true
@@ -70,7 +70,7 @@
end
it "should read the correct values from a valid HTTP/1.0 access log line" do
- @log_parser.parse_io(@sample_1) do |request|
+ @log_parser.parse_io(StringIO.new(@sample_1)) do |request|
request[:remote_host].should == '1.129.119.13'
request[:timestamp].should == 20090908075409
request[:http_status].should == 200
@@ -82,7 +82,7 @@
end
it "should read the correct values from a valid 200 access log line" do
- @log_parser.parse_io(@sample_2) do |request|
+ @log_parser.parse_io(StringIO.new(@sample_2)) do |request|
request[:remote_host].should == '1.82.235.29'
request[:timestamp].should == 20090908075405
request[:http_status].should == 200
@@ -122,7 +122,7 @@
end
it "should read the correct values from a valid 404 access log line" do
- @log_parser.parse_io(@sample_1) do |request|
+ @log_parser.parse_io(StringIO.new(@sample_1)) do |request|
request[:remote_host].should == '69.41.0.45'
request[:timestamp].should == 20090902120240
request[:http_status].should == 404
@@ -136,7 +136,7 @@
end
it "should read the correct values from a valid 200 access log line" do
- @log_parser.parse_io(@sample_2) do |request|
+ @log_parser.parse_io(StringIO.new(@sample_2)) do |request|
request[:remote_host].should == '10.0.1.1'
request[:timestamp].should == 20090902050833
request[:http_status].should == 200

0 comments on commit a0d4054

Please sign in to comment.