Permalink
Browse files

sensor sketch

  • Loading branch information...
0 parents commit d780cf2eb80b49a30a1e6bf31b1807cd64f83bb8 @savonarola committed Apr 8, 2012
@@ -0,0 +1,17 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
@@ -0,0 +1 @@
+--color -fd
@@ -0,0 +1,4 @@
+source 'https://rubygems.org'
+
+# Specify your gem's dependencies in pulse-meter.gemspec
+gemspec
@@ -0,0 +1,22 @@
+Copyright (c) 2012 Ilya Averyanov
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,29 @@
+# Pulse::Meter
+
+TODO: Write a gem description
+
+## Installation
+
+Add this line to your application's Gemfile:
+
+ gem 'pulse-meter'
+
+And then execute:
+
+ $ bundle
+
+Or install it yourself as:
+
+ $ gem install pulse-meter
+
+## Usage
+
+TODO: Write usage instructions here
+
+## Contributing
+
+1. Fork it
+2. Create your feature branch (`git checkout -b my-new-feature`)
+3. Commit your changes (`git commit -am 'Added some feature'`)
+4. Push to the branch (`git push origin my-new-feature`)
+5. Create new Pull Request
@@ -0,0 +1,2 @@
+#!/usr/bin/env rake
+require "bundler/gem_tasks"
@@ -0,0 +1,14 @@
+require "pulse-meter/version"
+require "pulse-meter/sensor"
+
+module PulseMeter
+ @@redis = nil
+
+ def self.redis
+ @@redis
+ end
+
+ def self.redis=(redis)
+ @@redis = redis
+ end
+end
@@ -0,0 +1,18 @@
+module PulseMeter
+ class SensorError < StandardError; end
+
+
+ class BadSensorName < SensorError
+ def initialize(name, options = {})
+ super("Bad sensor name: `#{name}', only a-z letters and _ are allowed")
+ end
+ end
+
+ class RedisNotInitialized < SensorError
+ def initialize
+ super("PulseMeter.redis is not set")
+ end
+ end
+
+end
+
@@ -0,0 +1,40 @@
+module PulseMeter
+ module Sensor
+ class Base
+ attr_accessor :redis
+ attr_reader :name
+
+ def initialize(name, options)
+ @name = name.to_s
+ raise BadSensorName, @name unless @name =~ /\A\w+\z/
+ raise RedisNotInitialized unless PulseMeter.redis
+ @redis = PulseMeter.redis
+ end
+
+ def annotate(description)
+ redis.set(desc_key, name)
+ end
+
+ def cleanup
+ redis.del(desc_key)
+ end
+
+ def event(value)
+ # do nothing here
+ end
+
+ protected
+
+ def desc_key
+ "#{name}:desc"
+ end
+
+ def multi
+ redis.multi
+ yield
+ redis.exec
+ end
+
+ end
+ end
+end
@@ -0,0 +1,25 @@
+module PulseMeter
+ module Sensor
+ class Counter < Base
+
+ def cleanup
+ redis.del(value_key)
+ super
+ end
+
+ def incr
+ event(1)
+ end
+
+ def event(value)
+ redis.incrby(value_key, value.to_i)
+ end
+
+ protected
+
+ def value_key
+ @values_key ||= "#{name}:value"
+ end
+ end
+end
+
@@ -0,0 +1,22 @@
+module PulseMeter
+ module Sensor
+ class Indicator < Base
+
+ def cleanup
+ redis.del(value_key)
+ super
+ end
+
+ protected
+
+ def process_event(accessor, value)
+ redis.set(value_key, value)
+ end
+
+ def value_key
+ @value_key ||= "#{name}:value"
+ end
+
+ end
+end
+
@@ -0,0 +1,114 @@
+require 'sequrerandom'
+
+module PulseMeter
+ module Sensor
+ class Timeline < Base
+
+ attr_reader :interval, :ttl, :raw_data_ttl
+
+ def initialize(name, options)
+ super
+ @interval = assert_positive_integer!(options, :interval)
+ @ttl = assert_positive_integer!(options, :ttl)
+ @raw_data_ttl = assert_positive_integer!(options, :raw_data_ttl)
+ end
+
+ def cleanup
+ keys = []
+ keys << current_buket_id_key
+ keys << current_buket_key
+ redis.keys(raw_completed_bucket_key('*')).each do |key|
+ keys << key
+ end
+ redis.keys(completed_bucket_key('*')).each do |key|
+ keys << key
+ end
+ redis.multi
+ keys.each{|key| redis.del(key)}
+ redis.exec
+ end
+
+ def event(value)
+ rotate
+ aggregate_event(value)
+ end
+
+ protected
+
+ def assert_positive_integer!(options, key)
+ value = options[key]
+ raise ArgumentError, "#{key} should be integer" unless value.respond_to?(:to_i)
+ raise ArgumentError, "#{key} should be positive" unless value.to_i > 0
+ options[key] = value.to_i
+ end
+
+ @queue = :pulse_meter
+
+ def current_buket_id_key
+ "#{name}:current_buket_id"
+ end
+
+
+ def current_buket_key
+ "#{name}:current_buket"
+ end
+ alias :current_key :current_buket_key
+
+ def raw_completed_bucket_key(id)
+ "#{name}:raw:#{id}"
+ end
+
+ def completed_bucket_key(id)
+ "#{name}:comp:#{id}"
+ end
+
+ def current_buket_id
+ (((Time.now.to_i / interval) + 1) * interval).to_s
+ end
+
+ def rotate
+ old_id = redis.get(current_buket_id_key)
+ current_id = current_buket_id
+ if old_id < current_id
+ raw_data_key = raw_completed_bucket_key(old_id)
+ multi do
+ redis.set(current_buket_id_key, current_id)
+ redis.renamenx(current_buket_key, raw_data_key)
+ redis.expire(raw_data_key, raw_data_ttl)
+ end
+ Resque.enqueue(self.class, raw_data_key, completed_bucket_key(old_id), options)
+ end
+ end
+
+ def self.perform(summarize_from, summarize_to, options)
+ redis = PulseMeter.redis
+ tmp_key = temp_key(summarize_from, summarize_to)
+ already_summarized = begin
+ redis.rename(summarize_from, tmp_key)
+ false
+ rescue RuntimeError => e
+ if e.to_s =~ /no such key/
+ true
+ else
+ raise e
+ end
+ end
+ unless already_summarized
+ summarize(redis, tmp_key, summarize_to, options)
+ redis.del(tmp_key)
+ redis.expire(summarize_to, ttl)
+ end
+ end
+
+ def self.temp_key(summarize_from, summarize_to)
+ "#{name}:summarize:#{SecureRandom.hex(32)}"
+ end
+
+ def aggregate_event(value)
+ # simple
+ redis.set(current_buket_key, value)
+ end
+
+ end
+end
+
@@ -0,0 +1,28 @@
+module PulseMeter
+ module Sensor
+ module Timeline
+ class Average
+ protected
+
+ def aggregate_event(value)
+ multi do
+ redis.hincrby(current_key, :count, 1)
+ redis.hincrbyfloat(current_key, :value, value)
+ end
+ end
+
+ def self.summarize(redis, summarize_from, summarize_to, options)
+ count = redis.hget(summarize_from, :count)
+ value = redis.hget(summarize_from, :value)
+ res = if count && !count.empty?
+ value.to_f / count.to_f
+ else
+ 0
+ end
+ redis.set(summarize_to, res)
+ end
+ end
+ end
+ end
+end
+
@@ -0,0 +1,22 @@
+module PulseMeter
+ module Sensor
+ module Timeline
+ class Counter
+ def incr
+ event(1)
+ end
+
+ protected
+
+ def aggregate_event(value)
+ redis.incrby(current_key, value.to_i)
+ end
+
+ def self.summarize(redis, summarize_from, summarize_to, options)
+ redis.renamenx(summarize_from, summarize_to)
+ end
+ end
+ end
+ end
+end
+
Oops, something went wrong.

0 comments on commit d780cf2

Please sign in to comment.