Permalink
Browse files

apache log parsing examples -- including histograms, sessionizing, pa…

…ge-page covisit graph
  • Loading branch information...
1 parent d1f15d8 commit 37c692dfa3d2ff172ae426af4c4f89ff86c8111c Philip (flip) Kromer committed Aug 18, 2012
@@ -1,19 +1,13 @@
-#!/usr/bin/env ruby -E ASCII-8BIT
-require 'rubygems'
-require 'wukong/script'
-$: << File.dirname(__FILE__)
-require 'logline'
+#!/usr/bin/env ruby
+$LOAD_PATH.unshift File.expand_path('../../lib', File.dirname(__FILE__))
+require 'wukong/script'
+require_relative './logline'
class ApacheLogParser < Wukong::Streamer::LineStreamer
-
# create a Logline object from each record and serialize it flat to disk
def process line
yield Logline.parse(line)
end
end
-Wukong.run( ApacheLogParser, nil, :sort_fields => 7 ) if $0 == __FILE__
-
-
-
-
+Wukong.run( ApacheLogParser )
@@ -0,0 +1,40 @@
+#!/usr/bin/env ruby
+$LOAD_PATH.unshift File.expand_path('../../lib', File.dirname(__FILE__))
+require 'wukong/script'
+require_relative './logline'
+
+class BreadcrumbsMapper < Wukong::Streamer::ModelStreamer
+ self.model_klass = Logline
+ def process visit, *args
+ # return unless Settings.page_types.include?(visit.page_type)
+ yield [visit.ip, visit.visit_time.to_i, visit.path]
+ end
+end
+
+class BreadcrumbEdgesReducer < Wukong::Streamer::Reducer
+ def get_key ip, itime, path
+ [ip]
+ end
+ def start!(*args)
+ @paths = Set.new
+ super
+ end
+ def accumulate ip, itime, path
+ @paths << path
+ end
+
+ # for each pair of paths, emit the edge in both directions
+ def finalize
+ @paths = @paths.to_a
+ while @paths.present?
+ from = @paths.shift
+ @paths.each do |into|
+ yield [key, from, into]
+ yield [key, into, from]
+ end
+ end
+ end
+end
+
+
+Wukong.run( BreadcrumbsMapper, BreadcrumbEdgesReducer, :sort_fields => 2 )
@@ -1,27 +1,9 @@
#!/usr/bin/env ruby
-require 'rubygems'
-require 'wukong/script'
-
-class Logline < Struct.new(
- :ip, :date, :time, :http_method, :protocol, :path, :response_code, :duration, :referer, :ua, :tz)
-
- def page_type
- case
- when path =~ /\.(css|js)$/ then :asset
- when path =~ /\.(png|gif|ico)$/ then :image
- when path =~ /\.(pl|s?html?|asp|jsp|cgi)$/ then :page
- else :other
- end
- end
-
- def is_page?
- page_type == :page
- end
-
- def day_hr
- visit.date + visit.time[0..1]
- end
-end
+$LOAD_PATH.unshift File.expand_path('../../lib', File.dirname(__FILE__))
+require 'configliere'
+Settings.define :page_types, type: Array, default: ['page', 'video'], description: "Acceptable page types"
+require 'wukong/script'
+require_relative './logline'
#
@@ -38,9 +20,11 @@ def day_hr
#
# where the partition key is visitor_id, and we sort by visitor_id and datetime.
#
-class VisitorDatePath < Wukong::Streamer::StructStreamer
+class BreadcrumbsMapper < Wukong::Streamer::ModelStreamer
+ self.model_klass = Logline
def process visit, *args
- yield [visit.ip, visit.day_hr, visit.path]
+ # return unless Settings.page_types.include?(visit.page_type)
+ yield [visit.ip, visit.day_hr, visit.visit_time.to_i, visit.path]
end
end
@@ -65,11 +49,23 @@ def process visit, *args
# page_trails <pagen> <n_pages_in_visit> <duration> <timestamp> < page1,page2,... >
#
# to discover all trails passing through a given page.
-class VisitorDatePath < Wukong::Streamer::Reducer
- def get_key ip, day_hr, path, *args
- [ip, day_hr]
+class BreadcrumbsReducer < Wukong::Streamer::Reducer
+ def get_key ip, day_hr, itime, path, *args
+ [ip]
+ end
+ def start!(*args)
+ @path_times = []
+ super
end
- def process_group visit, *args
- yield [visit.ip, visit.day_hr, visit.path]
+ def accumulate ip, day_hr, itime, path, *args
+ # @path_times << "(#{itime},#{path})"
+ @path_times << "#{itime}:#{path}"
+ end
+ def finalize
+ # yield [key, "{" << @path_times.join(",") << "}"]
+ yield [key, @path_times.join("|")]
end
end
+
+
+Wukong.run( BreadcrumbsMapper, BreadcrumbsReducer, :sort_fields => 2 )
@@ -0,0 +1,33 @@
+#!/usr/bin/env ruby
+$LOAD_PATH.unshift File.expand_path('../../lib', File.dirname(__FILE__))
+require 'wukong/script'
+require_relative './logline'
+
+# cat data/swk-100.tsv | ./histograms.rb --map | sort > data/swk-hist-map.tsv
+# cat data/swk-hist-map.tsv | ./histograms.rb --reduce > data/swk-hist.tsv
+
+class HistogramsMapper < Wukong::Streamer::ModelStreamer
+ self.model_klass = Logline
+ def process visit
+ yield [visit.path, visit.day_hr]
+ end
+end
+
+class HistogramsReducer < Wukong::Streamer::Reducer
+ def get_key path, day_hr
+ [path, day_hr]
+ end
+ def start!(*args)
+ @count = 0
+ super
+ end
+ def accumulate path, day_hr
+ @count += 1
+ end
+ def finalize
+ yield [key, @count]
+ end
+end
+
+# Wukong.run( HistogramsMapper )
+Wukong.run( HistogramsMapper, HistogramsReducer, :sort_fields => 3 )
@@ -1,51 +1,103 @@
-class Logline < Struct.new(
- :ip, :dt, :tm, :http_method, :protocol, :path, :response_code, :size, :referer, :ua, :tz, :j1, :j2)
- # 1 2 3 4 5 6 7 8 9 10 11
- def page_type
- case
- when path =~ /\.(css|js)$/ then :asset
- when path =~ /\.(png|gif|ico)$/ then :image
- when path =~ /\.(pl|s?html?|asp|jsp|cgi)$/ then :page
- else :other
- end
- end
+# # Parse logs to TSV
+#
+# bzcat data/star_wars_kid.log.bz2 | head -n 100200 | tail -n 100 > data/swk-100.log
+# cat data/swk-100.tsv
+# cat data/swk-100.log | ./apache_log_parser.rb --map | wu-lign | cutc 150
+# cat data/swk-100.log | ./apache_log_parser.rb --map > data/swk-100.tsv
+# ./histograms.rb --run data/star_wars_kid.log data/star_wars_kid.tsv
+
+# # Histograms
+#
+# cat data/swk-100.tsv | ./histograms.rb --map | wu-lign
+# cat data/swk-hist-map.tsv | ./histograms.rb --reduce
+# ./histograms.rb --run data/star_wars_kid.tsv data/star_wars_kid-pages_by_hour.tsv
+
+# # Sessionize
+#
+# cat data/swk-100.tsv | ./histograms.rb --map | wu-lign
+# cat data/swk-hist-map.tsv | ./histograms.rb --reduce
+# ./histograms.rb --run data/star_wars_kid.tsv data/star_wars_kid-pages_by_hour.tsv
+
+
+class Logline
+ include Gorillib::Model
+ include Gorillib::Model::PositionalFields
+
+ field :ip, String
+ field :junk1, String
+ field :junk2, String
+ #
+ field :visit_time, Time
+ field :http_method, String
+ field :path, String
+ field :protocol, String
+ field :response_code, Integer
+ field :size, Integer, blankish: ['', nil, '-']
+ field :referer, String
+ field :ua, String
+ field :cruft, String
#
# Regular expression to parse an apache log line.
#
# 83.240.154.3 - - [07/Jun/2008:20:37:11 +0000] "GET /faq HTTP/1.1" 200 569 "http://infochimps.org/search?query=CAC" "Mozilla/5.0 (Windows; U; Windows NT 5.1; fr; rv:1.9.0.16) Gecko/2009120208 Firefox/3.0.16"
#
+ # fails if the referer string has a '"' in it.
+ #
LOG_RE = Regexp.compile(%r{\A
- (\S+) # ip 83.240.154.3
- \s(\S+) # j1 -
- \s(\S+) # j2 -
- \s\[(\d+)/(\w+)/(\d+) # date part [07/Jun/2008
- :(\d+):(\d+):(\d+) # time part :20:37:11
- \s(\+.*)\] # timezone +0000]
- \s\"(?:(\S+) # http_method "GET
- \s(\S+) # path /faq
- \s(\S+)|-)" # protocol HTTP/1.1"
- \s(\d+) # response_code 200
- \s(\d+) # size 569
- \s\"([^\"]*)\" # referer "http://infochimps.org/search?query=CAC"
- \s\"([^\"]*)\" # ua "Mozilla/5.0 (Windows; U; Windows NT 5.1; fr; rv:1.9.0.16) Gecko/2009120208 Firefox/3.0.16"
- \z}x)
- MONTHS = { 'Jan' => '01', 'Feb' => '02', 'Mar' => '03', 'Apr' => '04', 'May' => '05', 'Jun' => '06', 'Jul' => '07', 'Aug' => '08', 'Sep' => '09', 'Oct' => '10', 'Nov' => '11', 'Dec' => '12', }
+ ([\d\.]+) # ip 83.240.154.3
+ \s(\S+) # j1 -
+ \s(\S+) # j2 -
+ \s\[(\d+/\w+/\d+ # date part [07/Jun/2008
+ :\d+:\d+:\d+ # time part :20:37:11
+ \s[\+\-]\S*)\] # timezone +0000]
+ \s\"(?:(\S+) # http_method "GET
+ \s(\S+) # path /faq
+ \s+(HTTP/[\d\.]+)|-)\" # protocol HTTP/1.1"
+ \s(\d+) # response_code 200
+ \s(\d+|-) # size 569
+ \s\"([^\"]*)\" # referer "http://infochimps.org/search?query=CAC"
+ \s\"([^\"]*)\" # ua "Mozilla/5.0 (Windows; U; Windows NT 5.1; fr; rv:1.9.0.16) Gecko/2009120208 Firefox/3.0.16"
+ \z}x)
+ MONTHS = { 'Jan' => 1, 'Feb' => 2, 'Mar' => 3, 'Apr' => 4, 'May' => 5, 'Jun' => 6, 'Jul' => 7, 'Aug' => 8, 'Sep' => 9, 'Oct' => 10, 'Nov' => 11, 'Dec' => 12, }
+
+
+
+
+ def receive_visit_time(val)
+ if %r{(\d+)/(\w+)/(\d+):(\d+):(\d+):(\d+)\s([\+\-]\d\d)(\d\d)} === val
+ day, mo, yr, hour, min, sec, tz1, tz2 = [$1, $2, $3, $4, $5, $6, $7, $8]
+ val = Time.new(yr.to_i, MONTHS[mo], day.to_i,
+ hour.to_i, min.to_i, sec.to_i, "#{tz1}:#{tz2}")
+ end
+ super(val)
+ end
# Use the regex to break line into fields
# Emit each record as flat line
- def self.parse line
- m = LOG_RE.match(line.chomp) or return BadRecord.new(line)
- (ip, j1, j2,
- ts_day, ts_mo, ts_year,
- ts_hour, ts_min, ts_sec, tz,
- http_method, path, protocol,
- response_code, size,
- referer, ua, *cruft) = m.captures
- dt = [ts_year, MONTHS[ts_mo], ts_day].join("")
- tm = [ts_hour, ts_min, ts_sec].join("")
- self.new( ip, dt, tm, http_method, protocol, path, response_code, size, referer, ua, tz, j1, j2 )
+ def self.parse(line)
+ match = LOG_RE.match(line.chomp)
+ unless match then warn(line) ; return BadRecord.new('no match', line) ; end
+ new(* match.captures)
end
+ FILE_EXT_RE = %r{\.[^/]+\z}
+ def page_type
+ file_ext = path[FILE_EXT_RE]
+ case file_ext
+ when nil then 'page'
+ when '.wmv' then 'video'
+ when '.html','.shtml' then 'page'
+ when '.css', '.js' then 'asset'
+ when '.png', '.gif', '.ico' then 'image'
+ when '.wmv' then 'image'
+ when '.pl','.asp','.jsp','.cgi' then 'page'
+ else 'other'
+ end
+ end
+
+ def day_hr
+ [visit_time.year, visit_time.month, visit_time.day, visit_time.hour].join
+ end
end
@@ -0,0 +1,48 @@
+
+
+LOAD common_pages FROM 'data/common_pages' AS (ip:chararray, from_path:chararray, into_path:chararray);
+
+--
+-- Build adjacency list <A pr B,C,D> from edges (<A B>, <A C>, <A D>)
+--
+
+adj_list_j = GROUP common_pages BY from_path;
+adj_list = FOREACH adj_list_j GENERATE
+ group AS from_path,
+ 1.0F AS pagerank:float,
+ common_pages.(dest) AS into_paths
+ ;
+STORE adj_list INTO 'data/pagerank/pr_iter_00';
+
+
+--
+-- Iterate pagerank <A pr_00 B,C,D> to become <A pr_01 B,C,D>
+--
+
+-- find partial share: A.rank / A.into_paths.length
+-- dispatch <into_path partial_share> to each page
+sent_shares = FOREACH adj_list GENERATE
+ FLATTEN(into_paths) AS path,
+ (float)(pagerank / (float)SIZE(into_paths)) AS share:float;
+
+-- dispatch <from_path into_paths> to yourself, so you have the links still around
+sent_edges = FOREACH adj_list GENERATE
+ from_path AS path, into_paths;
+
+-- assemble all the received shared, and the self-sent edge list;
+rcvd_shares = COGROUP sent_edges BY path INNER, sent_shares BY path PARALLEL $PARALLEL;
+
+-- calculate the new rank, and emit a record that looked just like the input.
+next_iter = FOREACH rcvd_shares {
+ raw_rank = (float)SUM(sent_shares.share);
+ -- treat the case that a node has no in links
+ damped_rank = ((raw_rank IS NOT NULL AND raw_rank > 1.0e-12f) ? raw_rank*0.85f + 0.15f : 0.0f);
+ GENERATE
+ group AS from_path,
+ damped_rank AS rank,
+ FLATTEN(sent_edges.into_paths)
+ ; };
+
+STORE next_iter INTO 'data/pagerank/pr_iter_01';
+
+

0 comments on commit 37c692d

Please sign in to comment.