Skip to content

Commit

Permalink
Merge branch 'v0.9.3' into v0.9-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
asmuth committed May 15, 2012
2 parents b4f0917 + b0fd775 commit 2f60384
Show file tree
Hide file tree
Showing 35 changed files with 14,698 additions and 0 deletions.
7 changes: 7 additions & 0 deletions __merge/bin/fnordquery
@@ -0,0 +1,7 @@
#!/usr/bin/env ruby
$: << ::File.expand_path('../../lib', __FILE__)

require "fnordquery"
require "fnordquery/runner"

FnordQuery::Runner.new
21 changes: 21 additions & 0 deletions __merge/concept.txt
@@ -0,0 +1,21 @@
fnordquery
==========

fnordquery analyzes streams of usage/traffic/activity/etc-data. you feed in json-objects ("events") and it spits out pretty reports. the only schema-wise requirement is that each event contains a "_time" key.

fnordquery requires a fyrehose backend.


categorical_correlation (keyword correlation)
categorical_crosstab (chanels x filters)
categorical_uniqueness (zipfs law)

cluster_cardinality (number of searches per session)
cluster_repetition (search term repetition per session)

numeric_distribution (price/age dist, w. variance/dispersion and stuff)
numeric_correlation (numeric dependency)

retention_analysis

click through?
21 changes: 21 additions & 0 deletions __merge/lib/fnordquery.rb
@@ -0,0 +1,21 @@
module FnordQuery; end

require "eventmachine"
require "json"
require "haml"
require "sinatra"

require "fnordquery/web/web"
require "fnordquery/web/app"

require "fnordquery/backends/redis_backend"
require "fnordquery/acceptor"
require "fnordquery/acceptors/tcp_acceptor"
require "fnordquery/acceptors/udp_acceptor"
require "fnordquery/query"

require "fnordquery/report"
require "fnordquery/report_builder"
require "fnordquery/report_manager"
require "fnordquery/reports/numeric_timeseries_report"
require "fnordquery/reports/categorical_topk_report"
33 changes: 33 additions & 0 deletions __merge/lib/fnordquery/acceptor.rb
@@ -0,0 +1,33 @@
class FnordQuery::Acceptor

def initialize(opts)
@opts = opts
end

def execute(runner, backend)
inbound_class = if @opts[:protocol] == :udp
FnordQuery::UDPAcceptor
else
FnordQuery::TCPAcceptor
end

@opts[:listen] = [
@opts[:host] || "0.0.0.0",
@opts[:port] || 2323
]

@opts.merge!(
:runner => runner,
:backend => backend
)

begin
inbound_stream = inbound_class.start(@opts)
puts "listening on #{@opts[:protocol]}://#{@opts[:listen][0..1].join(":")}"
#rescue
# puts "error: cant start #{inbound_class.name}. port in use?"
# exit!(1)
end
end

end
54 changes: 54 additions & 0 deletions __merge/lib/fnordquery/acceptors/tcp_acceptor.rb
@@ -0,0 +1,54 @@
class FnordQuery::TCPAcceptor < EventMachine::Connection
@@opts = nil

def self.start(opts)
@@opts = opts
EM.start_server(*(opts[:listen] + [self]))
end

def self.options(opts)
@@opts = opts
end

def receive_data(chunk)
@buffer << chunk
next_event
end

def next_event
read_next_event
push_next_event
end

def read_next_event
while (event = @buffer.slice!(/^(.*)\n/))
@events_buffered += 1
@events << event
end
end

def push_next_event
return true if @events.empty?
@events_buffered -= 1
@backend.publish(@events.pop)
close_connection?
EM.next_tick(&method(:push_next_event))
end

def close_connection?
@backend.hangup unless @streaming || (@events_buffered!=0)
end

def post_init
@backend = @@opts[:backend][0].new(@@opts[:backend][1])
@events_buffered = 0
@streaming = true
@buffer = ""
@events = []
end

def unbind
@streaming = false
close_connection?
end
end
37 changes: 37 additions & 0 deletions __merge/lib/fnordquery/acceptors/udp_acceptor.rb
@@ -0,0 +1,37 @@
class FnordQuery::UDPAcceptor < EventMachine::Connection

class << self
attr_accessor :opts
end

def self.start(opts)
self.opts = opts

EM.open_datagram_socket(*(opts[:listen] << self << opts))
end

def receive_data(event)
events << event
push_next_event
end

def push_next_event
return true if events.empty?
ev = @events.pop
backend.publish(ev)
EM.next_tick(&method(:push_next_event))
end

def unbind
backend.hangup
end

def backend
@backend ||= @opts[:backend][0].new(@opts[:backend][1])
end

def events
@events ||= []
end

end
78 changes: 78 additions & 0 deletions __merge/lib/fnordquery/backends/redis_backend.rb
@@ -0,0 +1,78 @@
class FnordQuery::RedisBackend

require "em-hiredis"

def initialize(opts={})
@opts = opts
@prefix = "fnordquery"

@channel = EM::Channel.new
@sub_redis = EM::Hiredis.connect() # FIXPAUL
@redis = EM::Hiredis.connect() # FIXPAUL

redis_subscribe
end

def subscribe(query, &block)
if query.until == :stream
@channel.subscribe do |event|
block.call(event) if query.matches?(event)
end
end
if query.since != :now
q_until = query.until.is_a?(Symbol)? Time.now.to_i : query.until
@redis.zrangebyscore(@prefix, query.since, q_until) do |res|
res.each do |raw|
begin
event = JSON.parse(raw)
rescue
puts "redisbackend: read invalid json"
else
block.call(event) if query.matches?(event)
end
end
if query.until != :stream
on_finish
end
end
end
end

def on_finish(&block)
return @on_finish = block if block_given?
@on_finish.call() if @on_finish
end

def publish(message, opts={})
if message.is_a?(String)
begin
message = JSON.parse(message)
rescue
puts "redisbackend: published invalid json"
return false
end
end

message["_time"] ||= Time.now.to_i
message["_eid"] ||= rand(8**64).to_s(36)

@redis.publish(@prefix, message.to_json)
@redis.zadd(@prefix, message["_time"], message.to_json)
end

private

def redis_subscribe
@sub_redis.subscribe(@prefix)
@sub_redis.on(:message) do |chan, raw|
begin
message = JSON.parse(raw)
rescue
puts "redisbackend: received invalid json"
else
@channel.push(message)
end
end
end

end

0 comments on commit 2f60384

Please sign in to comment.