Permalink
Browse files

Remove obsolete trigger stuff from hastur-server

  • Loading branch information...
1 parent 9b7fd4f commit c42d3166e05f8eba5e45b8464821b973097d24d0 Noah Gibbs committed Nov 8, 2012
View
12 README.md
@@ -10,8 +10,7 @@ ZeroMQ, allowing the agent daemon to be thin and simple.
Message storage is done with Cassandra, a highly-scalable key-value
store with excellent support for time-series data.
-Hastur supports RESTful querying of data using the retrieval service
-and streaming examination of data using triggers.
+Hastur supports RESTful querying of data using the retrieval service.
Components
----------
@@ -28,9 +27,6 @@ Components
retrieves hostname data, UUID messages sources and message names to
allow exploration of who is sending what data.
-* Hastur Syndicator - a server daemon to receive events from Core and
- send them out to workers with triggers as a realtime stream.
-
API Documentation
-----------------
@@ -46,9 +42,8 @@ daemon. The agent opens a UDP port (normally 8125) for local use.
The agent sends back system information always, and also forwards
Hastur messages from individual Hastur-enabled applications.
-The agent forwards messages to the Core servers, which then store
-them, forward them to streaming Syndicators and write to Cassandra for
-later retrieval.
+The agent forwards messages to the Core servers, which then write to
+Cassandra for later retrieval.
Using ZeroMQ and/or Cassandra, each component other than the agent can
have as many copies as desired for availability and/or fault
@@ -95,4 +90,3 @@ Deployment
The agent is deployed via Debian packages (other methods later)
Core is deployed via debian packages
-Triggers - automated deployment pending
View
2 Rakefile
@@ -50,7 +50,7 @@ namespace "test" do
shameful_integration_tests = integration_tests
- LIST_OF_SHAME = [ 'bring_router_down', 'bring_down', 'bring_sink_down', 'bring_up', 'plugin_registration', 'event', 'plugin', 'heartbeat', 'core_router', 'basic_trigger' ]
+ LIST_OF_SHAME = [ 'bring_router_down', 'bring_down', 'bring_sink_down', 'bring_up', 'plugin_registration', 'event', 'plugin', 'heartbeat', 'core_router' ]
unless LIST_OF_SHAME.nil? || LIST_OF_SHAME.empty?
puts "****************************************************"
View
67 bin/bluepill-hastur-trigger.pill
@@ -1,67 +0,0 @@
-require 'bluepill'
-require 'logger'
-
-HASTUR_ROOT="/opt/hastur"
-SYNDICATOR_PIDFILE = "/run/hastur/trigger-syndicator.pid"
-WORKER_PIDFILE = "/run/hastur/trigger-worker.pid"
-
-cassandra_servers = []
-File.foreach("#{HASTUR_ROOT}/conf/cassandra-servers.txt") do |line|
- line.chomp!
- line.gsub /\s+#.*$/, ''
- cassandra_servers << line unless line.empty?
-end
-
-TRIGGER_SYNDICATOR_COMMAND = [
- "#{HASTUR_ROOT}/bin/trigger-syndicator"
-].flatten.join(" ")
-
-TRIGGER_WORKER_COMMAND = [
- "#{HASTUR_ROOT}/bin/trigger-worker",
- "-t", "/opt/hastur/triggers/glowworm_trigger.rb",
- "--cassandra", cassandra_servers,
-].flatten.join(" ")
-
-Bluepill.application("trigger_syndicator") do |app|
- app.process("trigger_syndicator") do |process|
- process.environment = {
- "PATH" => "#{HASTUR_ROOT}/bin:/opt/local/bin:/usr/local/bin:/usr/local/sbin:/bin:/usr/bin:/sbin:/usr/sbin",
- "LD_LIBRARY_PATH" => "#{HASTUR_ROOT}/lib",
- }
- process.start_command = TRIGGER_SYNDICATOR_COMMAND
- process.stop_signals = [:quit, 15.seconds, :term, 5.seconds, :kill]
- process.working_dir = HASTUR_ROOT
- process.pid_file = SYNDICATOR_PIDFILE
- process.uid = "role-hastur"
- process.gid = "role-hastur"
- process.daemonize = true
- process.start_grace_time = 15.seconds
- process.stop_grace_time = 30.seconds
- process.restart_grace_time = 45.seconds
- process.checks :mem_usage, :every => 10.seconds, :below => 50.megabytes, :times => [3,5]
- process.checks :flapping, :times => 2, :within => 30.seconds, :retry_in => 15.seconds
- end
-end
-
-Bluepill.application("trigger_worker") do |app|
- app.process("trigger_worker") do |process|
- process.environment = {
- "PATH" => "#{HASTUR_ROOT}/bin:/opt/local/bin:/usr/local/bin:/usr/local/sbin:/bin:/usr/bin:/sbin:/usr/sbin",
- "LD_LIBRARY_PATH" => "#{HASTUR_ROOT}/lib",
- }
- process.start_command = TRIGGER_WORKER_COMMAND
- process.stop_signals = [:quit, 15.seconds, :term, 5.seconds, :kill]
- process.working_dir = HASTUR_ROOT
- process.pid_file = WORKER_PIDFILE
- process.uid = "role-hastur"
- process.gid = "role-hastur"
- process.daemonize = true
- process.start_grace_time = 15.seconds
- process.stop_grace_time = 30.seconds
- process.restart_grace_time = 45.seconds
- process.checks :mem_usage, :every => 10.seconds, :below => 50.megabytes, :times => [3,5]
- process.checks :flapping, :times => 2, :within => 30.seconds, :retry_in => 15.seconds
- end
-end
-
-# vim: ft=ruby
View
265 lib/hastur-server/trigger.rb
@@ -1,265 +0,0 @@
-require "hastur-server/trigger/state_handler"
-
-require "hastur-server/trigger/pager_duty"
-require "hastur-server/trigger/email"
-require "hastur-server/trigger/web_hook"
-
-require "hastur-server/util"
-require "hastur-server/message"
-require "hastur-server/envelope"
-require "hastur-server/syndicator"
-
-# Hastur Triggers are small code snippets used to process streams of
-# messages. They are commonly used for alerting and synthetic derived
-# messages like statistics or events.
-
-module Hastur
- module Trigger
- class Context
- # Intentionally left out of filtering:
- # - body
- # - timestamp
- # - plugin registration
- # - agent registration
- # TODO(noah) - remove type when stat subtypes become message types
- # TODO(noah) - add name groups when they exist
- FILTER_BY = %w(name value type attn subject labels uuid)
-
- attr_accessor :state
-
- class << self
- attr_accessor :syndicator
- attr_accessor :on_sub_handlers
- attr_accessor :subscriptions
- attr_accessor :contexts
- attr_accessor :cassandra
- attr_accessor :logger
- end
-
- ::Hastur::Trigger::Context.on_sub_handlers = []
- ::Hastur::Trigger::Context.syndicator = ::Hastur::Syndicator.new
- ::Hastur::Trigger::Context.subscriptions = {}
- ::Hastur::Trigger::Context.contexts = []
- ::Hastur::Trigger::Context.cassandra = true
-
- def initialize(options = {})
- ::Hastur::Trigger::Context.contexts << self
-
- immediate_caller = caller[0]
- if immediate_caller =~ /^([^:]+):(\d+)/
- # TODO: Trim off repo root from filename, if present
- @filename = $1
- else
- raise "Can't get caller of Hastur::Trigger::Context.new from caller #{immediate_caller}!"
- end
-
- cass_spec = ::Hastur::Trigger::Context.cassandra
- cass_spec = [] if cass_spec == true
- if cass_spec
- @state_handler = StateHandler.new(@filename, *cass_spec)
- end
- @state = @state_handler ? @state_handler.get_state : {}
-
- @msg_socket = Object.new
- def @msg_socket.sendmsgs(messages)
- sub_id_msg, body_msg = messages
- receive_message(sub_id_msg.copy_out_string, body_msg.copy_out_string)
- end
- end
-
- def [](key)
- @state[key]
- end
-
- def []=(key, val)
- @state[key] = val
- end
-
- def contexts
- ::Hastur::Trigger::Context.contexts.dup
- end
-
- def self.on_subscribe(&block)
- ::Hastur::Trigger::Context.on_sub_handlers << block
- end
-
- private
-
- def subscribe_to(filter_opts)
- syndicator = Hastur::Trigger::Context.syndicator
-
- sub_id = syndicator.add_filter(filter_opts)
-
- syndicator.add_socket(@msg_socket, sub_id)
-
- ::Hastur::Trigger::Context.on_sub_handlers.each do |on_sub|
- on_sub.call(self, sub_id, filter_opts)
- end
-
- sub_id
- end
-
- #
- # Stream messages of the specified type with the
- # given filters. This method requires a block,
- # which will be run on each message in turn.
- #
- # @param [Hash] filters Filters on events to deliver
- # @yield A block to call on each message
- #
- def message_stream(filters = {}, &block)
- raise "Filter must specify a type!" unless filters[:type]
-
- bad_filter_keys = filters.keys.map(&:to_s) - FILTER_BY
- unless bad_filter_keys.empty?
- raise "You're trying to filter on #{bad_filter_keys.join(", ")}! Allowed: #{FILTER_BY.join(", ")}"
- end
-
- sub_id = subscribe_to(filters)
- ::Hastur::Trigger::Context.subscriptions ||= {}
- ::Hastur::Trigger::Context.subscriptions[sub_id] =
- { :filters => filters, :proc => block, :context => self }
- end
-
- def self.receive_message(sub_id, message)
- raise "No such subscription as #{sub_id.inspect}!" unless ::Hastur::Trigger::Context.subscriptions &&
- ::Hastur::Trigger::Context.subscriptions[sub_id]
-
- # Dispatch message to correct receiver
- proc = ::Hastur::Trigger::Context.subscriptions[sub_id][:proc]
- context = ::Hastur::Trigger::Context.subscriptions[sub_id][:context]
- context.instance_exec(message, &proc)
-
- @state_handler.set_state(@state) if @state_handler
- end
-
- public
-
- def self.message_from_firehose(sub_id, envelope, body)
- syndicator = ::Hastur::Trigger::Context.syndicator
- filter = syndicator.filter_for_id(sub_id)
- raise "No filter for subscription ID: #{sub_id}" unless filter
-
- message = ::Hastur::Trigger::Message.new(envelope, body)
- filter_value = message.body_hash.merge(message.envelope_hash)
-
- if syndicator.apply_one_filter(filter, filter_value)
- @logger.info "Valid message: #{message}" if @logger
- receive_message(sub_id, message)
- end
- end
-
- def counters(filters = {}, &block)
- message_stream filters.merge(:type => :counter), &block
- end
-
- def gauges(filters = {}, &block)
- message_stream filters.merge(:type => :gauge), &block
- end
-
- def marks(filters = {}, &block)
- message_stream filters.merge(:type => :mark), &block
- end
-
- def compound(filters = {}, &block)
- message_stream filters.merge(:type => :compound), &block
- end
-
- def events(filters = {}, &block)
- message_stream filters.merge(:type => :event), &block
- end
-
- def process_heartbeats(filters = {}, &block)
- message_stream filters.merge(:type => :hb_process), &block
- end
-
- def hb_processes(filters = {}, &block)
- message_stream filters.merge(:type => :hb_process), &block
- end
-
- def agent_heartbeats(filters = {}, &block)
- message_stream filters.merge(:type => :hb_agent), &block
- end
-
- def hb_agents(filters = {}, &block)
- message_stream filters.merge(:type => :hb_agent), &block
- end
-
- def every(period, &block)
- raise "No block given to .every!" unless block_given?
-
- context = self
-
- Hastur.every(period) do
- context.instance_eval(&block)
- end
- end
- end
-
- class Message
- ENVELOPE_ATTRS = [ :version, :type_id, :to, :from, :ack, :resend,
- :sequence, :timestamp, :uptime, :hmac, :routers ]
-
- # :nodoc: YARD doesn't like *ENVELOPE_ATTRS so it is copied here
- attr_reader :version, :type_id, :to, :from, :ack, :resend, :sequence,
- :timestamp, :uptime, :hmac, :routers, :body_hash, :envelope_hash
-
- # Alias for "from"
- attr_reader :uuid
-
- def initialize(envelope, body)
- @body_hash = MultiJson.load(body)
- raise "Can't parse JSON: #{body}!" unless @body_hash
-
- @envelope = ::Hastur::Envelope.parse(envelope)
- @envelope_hash = {}
- ENVELOPE_ATTRS.each do |attribute|
- attr_value = @envelope.send(attribute)
- instance_variable_set("@#{attribute}", attr_value)
- @envelope_hash[attribute] = attr_value
- end
-
- # Convenience aliases
- @uuid = @envelope.from
- @envelope_hash[:uuid] = @envelope_hash[:from]
- @envelope_hash[:type] = @envelope_hash[:type_id]
- end
-
- def from_hostname
- # TODO(noah): lookup from Hastur Retrieval Service
- "#{from}.fake-domain.com"
- end
-
- def to_hostname
- "#{to}.fake-domain.com"
- end
-
- def hostname
- from_hostname
- end
-
- def to_hash
- {
- "envelope" => @envelope.to_hash,
- "body" => @body_hash,
- }
- end
-
- def to_json
- MultiJson.dump(self.to_hash)
- end
-
- def method_missing(*args)
- return @body_hash[args[0].to_s] if args.size == 1 && @body_hash[args[0].to_s]
-
- super
- end
-
- def respond_to?(method_name)
- return true if @body_hash.has_key?(method_name)
-
- super
- end
- end
- end
-end
View
35 lib/hastur-server/trigger/email.rb
@@ -1,35 +0,0 @@
-require "pony"
-
-module Hastur
- module Trigger
- class Context
- #
- # Send email to alert about a problem, error or other remarkable data feature.
- #
- # @param recipients [String or Array<String>] Who to send to
- # @param subject [String] Email subject line
- # @param body [String] Optional email body
- # @param opts [Hash] Options
- # @option opts [String or Array] :cc
- # @option opts [String or Array] :bcc
- # @option opts [String] :from
- #
- def send_email(recipients, subject, body = "", opts = {})
- recipients = [recipients].flatten
- from = opts[:from] || "Hastur Triggers"
- cc = [opts[:cc]].flatten || []
- bcc = [opts[:bcc]].flatten || []
-
- ret = Pony.mail :to => recipients.join(";"),
- :cc => cc.join(";"),
- :bcc => bcc.join(";"),
- :subject => subject,
- :body => body,
- :via => :sendmail
-
- # Could set :via_options to set sendmail
- # location and arguments.
- end
- end
- end
-end
View
49 lib/hastur-server/trigger/pager_duty.rb
@@ -1,49 +0,0 @@
-require "multi_json"
-require "httparty"
-
-module Hastur
- module Trigger
- class Context
- # This is a PagerDuty API key
- @@pagerduty_key = "43692e10760a012fb67b22000a9040cf"
-
- #
- # Sends a page to PagerDuty, currently always to the Tools and Automation project.
- #
- # @param incident_id [String] the PagerDuty incident key
- # @param msg [String] the PagerDuty incident description
- # @param json_data [Hash] additional JSON data sent as details
- # @param options [Hash] options
- # @option options [boolean] :no_create Don't really create an incident, this is test-only
- #
- def pager_duty(incident_id, msg, json_data = {}, options = {})
- # TODO(noah): Check squelches
-
- if @logger
- @logger.info "Paging: i_id: #{incident_id.inspect} msg: #{msg.inspect} " +
- "details: #{json_data.inspect} options: #{options.inspect}"
- end
-
- if options[:no_create]
- @logger.info "Not creating PagerDuty notification due to :no_create option" if @logger
- else
- @logger.info "Creating via HTTP POST to PagerDuty" if @logger
- reply = HTTParty.post "https://events.pagerduty.com/generic/2010-04-15/create_event.json",
- :body => MultiJson.dump({
- :service_key => @@pagerduty_key,
- :incident_key => incident_id,
- :event_type => "trigger",
- :description => msg,
- :details => json_data,
- })
-
- @logger.info "Posted, reply is #{reply.inspect}" if @logger
-
- unless reply.code >= 200 && reply.code < 400
- raise "Error creating PagerDuty incident: #{reply.inspect}"
- end
- end
- end
- end
- end
-end
View
50 lib/hastur-server/trigger/state_handler.rb
@@ -1,50 +0,0 @@
-require "cassandra"
-require "digest/md5"
-require "multi_json"
-MultiJson.use :yajl
-
-# Note(jbhat): Since all our column names match, compressing this CF would be a huge win
-module Hastur
- module Trigger
- DEFAULT_KEYSPACE = "HasturTrigger"
- DEFAULT_SERVERS = ["127.0.0.1:9160"]
- DEFAULT_CF = :TriggerState
- DEFAULT_COL = "val"
-
- class StateHandler
- # @example
- # @state_handler = Hastur::Trigger::StateHandler.new("trigger_file1")
- def initialize(filename, opts = {})
- @key = StateHandler.filename_to_rowkey(filename)
- @client = opts[:client] || StateHandler.create_client(opts[:keyspace], opts[:servers])
- @cf = opts[:cf] || DEFAULT_CF
- @col = opts[:col] || DEFAULT_COL
- end
-
- # @example
- # @state_handler.set_state(@context.state)
- def set_state(state, options = {})
- raise "Argument must be a Hash" unless state.is_a? Hash
- val = MultiJson.dump state
- @client.insert(@cf, @key, { @col => val }, options)
- end
-
- # @example
- # @context.state = @state_handler.get_state
- def get_state(options = {})
- val = @client.get(@cf, @key, @col, options)
- val ? MultiJson.load(val) : {}
- end
-
- private
- def self.filename_to_rowkey(filename)
- raise "Must pass in non-nil filename for state rowkey" unless filename
- Digest::MD5.hexdigest(filename)
- end
-
- def self.create_client(keyspace = nil, servers = nil)
- ::Cassandra.new(keyspace || DEFAULT_KEYSPACE, [servers || DEFAULT_SERVERS].flatten)
- end
- end
- end
-end
View
10 lib/hastur-server/trigger/web_hook.rb
@@ -1,10 +0,0 @@
-module Hastur
- module Trigger
- class Context
- def web_hook(url, options = {})
- method = options[:method] || :get
- HTTParty.send(method, url, :query => options[:params] || nil)
- end
- end
- end
-end
View
217 test/integration/basic_trigger_test.rb
@@ -1,217 +0,0 @@
-#!/usr/bin/env ruby
-
-require_relative "./integration_test_helper"
-require 'scope'
-require 'nodule'
-require 'nodule/zeromq'
-require 'nodule/alarm'
-require 'multi_json'
-require 'hastur-server/message'
-require 'hastur-server/mock/nodule_agent'
-require 'minitest/autorun'
-
-require "ffi-rzmq"
-
-TEST_TRIGGER = File.join(HASTUR_ROOT, "tools", "trigger", "triggers", "logging_trigger.rb")
-
-TEST_COUNTER_ENVELOPE = Hastur::Envelope.new :type => Hastur::Message::Stat::Counter,
- :from => A1UUID, :to => A2UUID
-TEST_COUNTER_1 = <<JSON
-{
- "type": "counter",
- "value": 1,
- "name": "write.out.counter",
- "timestamp": 1335826232943810,
- "labels": {
- }
-}
-JSON
-
-TEST_GAUGE_ENVELOPE = Hastur::Envelope.new :type => Hastur::Message::Stat::Gauge,
- :from => A1UUID, :to => A2UUID
-TEST_GAUGE_1 = <<JSON
-{
- "type": "gauge",
- "value": 1,
- "name": "write.out.gauge",
- "timestamp": 1335826232943810,
- "labels": {
- }
-}
-JSON
-TEST_GAUGE_2 = <<JSON
-{
- "type": "gauge",
- "value": 17,
- "name": "other.gauge",
- "timestamp": 1335826232943811,
- "labels": {
- }
-}
-JSON
-
-TEST_MARK_ENVELOPE = Hastur::Envelope.new :type => Hastur::Message::Stat::Mark,
- :from => A1UUID, :to => A2UUID
-TEST_MARK_1 = <<JSON
-{
- "type": "mark",
- "value": 1,
- "name": "write.out.mark",
- "timestamp": 1335826232943810,
- "labels": {
- }
-}
-JSON
-TEST_MARK_2 = <<JSON
-{
- "type": "mark",
- "value": 31,
- "name": "other.mark",
- "timestamp": 1335826232943814,
- "labels": {
- }
-}
-JSON
-
-TEST_EVENT_ENVELOPE = Hastur::Envelope.new :type => Hastur::Message::Event,
- :from => A1UUID, :to => A2UUID
-TEST_EVENT_1 = <<JSON
-{
- "type": "event",
- "name": "write.out.event",
- "timestamp": 1335826232943810,
- "labels": {
- }
-}
-JSON
-TEST_EVENT_2 = <<JSON
-{
- "type": "event",
- "name": "other.event",
- "timestamp": 1335826232943819,
- "labels": {
- }
-}
-JSON
-
-TEST_HB_PROCESS_ENVELOPE = Hastur::Envelope.new :type => Hastur::Message::HB::Process,
- :from => A1UUID, :to => A2UUID
-TEST_HB_PROCESS_1 = <<JSON
-{
- "type": "hb_process",
- "name": "write.out.hb_process",
- "value": 1,
- "timestamp": 1335826232943810,
- "labels": {
- }
-}
-JSON
-
-class BasicTriggerTest < Scope::TestCase
- setup_once do
- @start_time = Time.now
-
- @topology = Nodule::Topology.new(
- :alarm => Nodule::Alarm.new(:timeout => test_timeout(90)),
- :greenio => Nodule::Console.new(:fg => :green),
- :redio => Nodule::Console.new(:fg => :red),
- :cyanio => Nodule::Console.new(:fg => :cyan),
- :yellow => Nodule::Console.new(:fg => :yellow),
-
- :firehose => Nodule::ZeroMQ.new(:bind => ZMQ::PUB, :uri => :gen),
- :syndicator => Nodule::ZeroMQ.new(:bind => ZMQ::PUB, :uri => :gen),
-
- :syndicator_proc => Nodule::Process.new(
- TRIGGER_SYNDICATOR_BIN,
- '--firehose', :firehose,
- '--workers', :syndicator,
- :stdout => :capture, :stderr => :capture, :verbose => :cyanio,
- ),
- :worker_proc => Nodule::Process.new(
- TRIGGER_WORKER_BIN,
- '--syndicator', :syndicator,
- '--triggers', TEST_TRIGGER,
- '--no-cassandra',
- :stdout => :capture, :stderr => :capture, :verbose => :cyanio,
- ),
- )
-
- @topology.start_all
- end
-
- teardown_once do
- @topology.stop_all
- end
-
- # @example count = count_messages(:event)
- def count_messages(type)
- out_array = @topology[:worker_proc].stdout
- out_array.select { |line| line =~ Regexp.new("MSG: #{type}") }.size
- end
-
- context "trigger messages" do
- should "be filtered properly" do
- context = ZMQ::Context.new
- socket = Hastur::Util.bind_socket(context, ZMQ::PUB, @topology[:firehose].uri, :hwm => 10_000)
-
- t = Time.now
- loop do
- if Time.now - t > 60
- flunk "Couldn't even start up! #{@topology[:worker_proc].stdout.inspect}"
- end
-
- Hastur::Util.send_strings(socket, [TEST_COUNTER_ENVELOPE.pack, TEST_COUNTER_1])
- sleep 0.1
-
- contents = @topology[:worker_proc].stdout #_pipe.readlines
- #puts "Contents: #{contents.inspect}"
- break if contents.any? { |line| line =~ /MSG: counter "write.out.counter"/ }
- end
-
- STDERR.puts "Trigger is running!"
-
- sleep 1.0 # In-flight messages should clear
-
- pre_counter = count_messages(:counter)
- STDERR.puts "Pre-counter: #{pre_counter}"
- STDERR.puts "Time from start: #{Time.now - @start_time} seconds"
-
- [
- [3, [TEST_COUNTER_ENVELOPE.pack, TEST_COUNTER_1]],
- [4, [TEST_GAUGE_ENVELOPE.pack, TEST_GAUGE_1]],
- [5, [TEST_MARK_ENVELOPE.pack, TEST_MARK_1]],
- [6, [TEST_EVENT_ENVELOPE.pack, TEST_EVENT_1]],
- [1, [TEST_HB_PROCESS_ENVELOPE.pack, TEST_HB_PROCESS_1]],
- ].each do |count, msgs|
- STDERR.puts "Sending a set of #{count} messages to Syndicator..."
- count.times do
- s = Hastur::Util.send_strings(socket, msgs)
- raise "Error sending message!" unless s
- end
- end
-
- while not @topology[:worker_proc].stdout.any? { |line| line =~ /^MSG: hb_process/ }
- STDERR.puts "Worker stdout: #{@topology[:worker_proc].stdout.inspect}"
- STDERR.puts "Worker stderr: #{@topology[:worker_proc].stderr.inspect}"
- STDERR.puts "Syndicator stdout: #{@topology[:syndicator_proc].stdout.inspect}"
- STDERR.puts "Syndicator stderr: #{@topology[:syndicator_proc].stderr.inspect}"
- STDERR.puts "-----------------------"
- STDERR.puts "Sleep 2.0 seconds. Time from start: #{Time.now - @start_time} seconds"
- STDERR.flush
- sleep 2.0
- end
-
- post = {}
- [:counter, :gauge, :mark, :event].each do |type|
- post[type] = count_messages(type)
- end
-
- post[:counter] -= pre_counter
-
- assert_equal 6, post[:event], "Must see 6 events received!"
- assert_equal 5, post[:mark], "Must see 5 marks received!"
- assert_equal 4, post[:gauge], "Must see 4 marks received!"
- assert_equal 3, post[:counter], "Must see 3 counters received!"
- end
- end
-end
View
4 test/integration/integration_test_helper.rb
@@ -22,10 +22,6 @@
HASTUR_CASS_SINK_BIN="#{HASTUR_ROOT}/bin/cass-sink.rb"
HASTUR_REGISTRATION_ROLLUP_BIN="#{HASTUR_ROOT}/bin/registration-rollups.rb"
-TRIGGER_SYNDICATOR_BIN = File.join(HASTUR_ROOT, "bin", "trigger-syndicator")
-TRIGGER_WORKER_BIN = File.join(HASTUR_ROOT, "bin", "trigger-worker")
-TRIGGER_REPLAY_BIN = File.join(HASTUR_ROOT, "bin", "replay-from-query-server")
-
HASTUR_UDP_PORT = Nodule::Util.random_udp_port
Hastur.udp_port = HASTUR_UDP_PORT
View
9 tools/trigger/json/email_for_noah.json
@@ -1,9 +0,0 @@
-[
- {
- "payload": "{\"name\": \"load.reset\",\"subject\":\"like whoah, srsly!\",\"attn\": [\"noah@ooyala.com\"]}",
- "timestamp": 1335307053158247,
- "type": "event",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- }
-]
View
79 tools/trigger/json/glowworm.json
@@ -1,79 +0,0 @@
-[
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 0.16}",
- "timestamp": 1335307053158247,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 0.34}",
- "timestamp": 1335307053158501,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158530,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 0.253}",
- "timestamp": 1335307053158540,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158550,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158560,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158570,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158580,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158590,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1.00487}",
- "timestamp": 1335307053158600,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"glowworm.latency\", \"value\": 1234.1}",
- "timestamp": 1335307053158610,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- }
-]
View
9 tools/trigger/json/page.json
@@ -1,9 +0,0 @@
-[
- {
- "payload": "{\"name\": \"i.can.haz.error.naow\", \"attn\":[\"tna-pager\"], \"subject\": \"A test of paging in page.json\"}",
- "timestamp": 1335307053158247,
- "type": "event",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- }
-]
View
16 tools/trigger/json/vlt.json
@@ -1,16 +0,0 @@
-[
- {
- "payload": "{\"name\": \"collectd.load\", \"value\": 11.1}",
- "timestamp": 1335307053158244,
- "type": "gauge",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- },
- {
- "payload": "{\"name\": \"load.reset\"}",
- "timestamp": 1335307053158247,
- "type": "event",
- "from": "8e58af00-708c-012f-e468-64ce8f3a9dc2",
- "to": "8e58af00-708c-012f-e468-64ce8f3a9dc2"
- }
-]
View
104 tools/trigger/production/glowworm_trigger.rb
@@ -1,104 +0,0 @@
-ctx = Hastur::Trigger::Context.new
-
-def server_record(ctx, msg)
- ctx["servers"] ||= {}
- ctx["servers"][msg.uuid] ||= { "hostname" => msg.hostname }
-
- server = ctx["servers"][msg.uuid]
-
- server
-end
-
-ctx.events(:name => "glowworm.exception") do |msg|
- puts "Glowworm got a server error"
-
- server = server_record(ctx, msg)
-
- server["exceptions"] ||= 0
- server["exceptions"] += 1
-end
-
-ctx.counters(:name => "glowworm.fetch.fresh") do |msg|
- server = server_record(ctx, msg)
-
- server["fresh"] ||= 0
- server["fresh"] += 1
-
- server["total"] ||= 0
- server["total"] += 1
-end
-
-ctx.counters(:name => "glowworm.fetch.thread.existing") do |msg|
- server = server_record(ctx, msg)
-
- server["join_thread"] ||= 0
- server["join_thread"] += 1
-
- server["total"] ||= 0
- server["total"] += 1
-end
-
-ctx.counters(:name => "glowworm.fetch.thread.new") do |msg|
- server = server_record(ctx, msg)
-
- server["new_thread"] ||= 0
- server["new_thread"] += 1
-
- server["total"] ||= 0
- server["total"] += 1
-end
-
-ctx.counters(:name => "glowworm.feature.return.no_cache") do |msg|
- puts "Glowworm returned made-up data on a non-prefetch"
-
- server = server_record(ctx, msg)
-
- server["no_cache"] ||= 0
- server["no_cache"] += 1
-end
-
-ctx.counters(:name => "glowworm.fetch.timeout") do |msg|
- return unless msg.labels["timeout"] && msg.labels["timeout"] > 0.4999
-
- puts "Glowworm timed out waiting for server for at least 0.5 seconds"
-
- server = server_record(ctx, msg)
-
- server["timeouts"] ||= 0
- server["timeouts"] += 1
-end
-
-def gw_exception_email(uuid, hash)
- send_email "noah@ooyala.com",
- "Glowworm on #{hash["hostname"]} got too many exceptions",
- <<BODY,
-Glowworm on #{hash["hostname"]} (UUID #{uuid}) got too many exceptions.
-
-Specifically, it got #{hash["exceptions"]} on #{hash["total"]} total requests.
-
-Summary data:
-#{hash.inspect}
-BODY
- :cc => "tna-team@ooyala.com"
-end
-
-def gw_summary!(ctx)
- ctx["servers"] ||= {}
-
- ctx["server"].each do |uuid, hash|
- if hash["exceptions"] >= hash["total"] ||
- hash["exceptions"] > ((hash["total"] / 20.0) + 2.0)
- gw_exception_email uuid, hash
- end
- end
-
- ctx["servers"] = {}
-end
-
-ctx.every(:day) do
- gw_summary!
-end
-
-ctx.events(:name => "glowworm.trigger.reset") do |msg|
- gw_summary!
-end
View
18 tools/trigger/triggers/derived_stat_trigger.rb
@@ -1,18 +0,0 @@
-ctx = Hastur::Trigger::Context.new
-
-ctx.gauges(:name => "glowworm.latency") do |msg|
- puts "Received latency of #{msg.value} from Glowworm"
- ctx["latencies"] ||= []
- ctx["latencies"] << msg.value
-
- # An average is actually a terrible idea for latencies. You're much
- # better off sending out, say, the best and worst latency for the
- # time period, and perhaps also the median.
-
- if ctx["latencies"].size > 10
- avg = ctx["latencies"].inject(0, &:+) / 10.0
- puts "Sending out 10-latency average of #{avg}"
- Hastur.gauge("glowworm.latency.moving_average", avg)
- ctx["latencies"] = []
- end
-end
View
8 tools/trigger/triggers/email_noah_trigger.rb
@@ -1,8 +0,0 @@
-ctx = Hastur::Trigger::Context.new
-
-ctx.events(:attn => ["noah@ooyala.com"]) do |msg|
- STDERR.puts "Email for Noah! OMG!"
- send_email("noah@ooyala.com", "OMG! #{msg.subject}",
- "A message totally showed up for you. It said:\n#{msg.to_json}",
- :from => "omg!authority@hastur.ooyala.com")
-end
View
27 tools/trigger/triggers/logging_trigger.rb
@@ -1,27 +0,0 @@
-ctx = Hastur::Trigger::Context.new
-
-STDOUT.sync = true
-
-ctx.gauges(:name => "write.out.gauge") do |msg|
- print "MSG: gauge #{msg.name.inspect} #{msg.value.inspect}\n"
-end
-
-ctx.counters(:name => "write.out.counter") do |msg|
- print "MSG: counter #{msg.name.inspect} #{msg.value.inspect}\n"
-end
-
-ctx.marks(:name => "write.out.mark") do |msg|
- print "MSG: mark #{msg.name.inspect} #{msg.value.inspect}\n"
-end
-
-ctx.events(:name => "write.out.event") do |msg|
- print "MSG: event #{msg.name.inspect}\n"
-end
-
-#ctx.hb_no_such_thing(:name => "bob") do |msg|
-# print "MSG: event #{msg.name.inspect} #{msg.value.inspect}\n"
-#end
-
-ctx.hb_processes(:name => "write.out.hb_process") do |msg|
- print "MSG: hb_process #{msg.name.inspect} #{msg.value.inspect}\n"
-end
View
17 tools/trigger/triggers/paging_trigger.rb
@@ -1,17 +0,0 @@
-ctx = Hastur::Trigger::Context.new
-
-# Guarantees about trigger behavior
-#ctx.disable_stash!
-#ctx.order_independent!
-
-ctx.events(:attn => [ "tna-pager" ]) do |msg|
- puts "Paging TNA on event #{msg.name}!"
- pager_duty("TNA general page: #{msg.name}",
- (msg.subject rescue nil) || "No description",
- :message => msg.to_json, :uuid => msg.uuid,
- :hostname => msg.hostname)
-end
-
-# ctx.every(:minute) do
-# Hastur.gauge("load.spikes", ctx["total"])
-# end
View
31 tools/trigger/triggers/variable_load_trigger.rb
@@ -1,31 +0,0 @@
-ctx = Hastur::Trigger::Context.new
-
-# Guarantees about trigger behavior
-#ctx.disable_stash!
-#ctx.order_independent!
-
-ctx.gauges(:name => "collectd.load") do |msg|
- if msg.value > 10.0
- # PagerDuty requires an incident ID, a message, and has an optional
- # JSON hash of extra stuff. Pass in the message automatically? Or
- # just its UUID and timestamp?
- pager_duty("Monitoring-load-spiking-#{msg.uuid}",
- "The load has spiked to #{msg.value} on host #{msg.hostname}",
- :message => msg.to_json, :load => msg.value, :uuid => msg.uuid,
- :hostname => msg.hostname)
-
- ctx["total"] ||= 0
- ctx["total"] += 1
-
- puts "VLT: Received high-value message!"
- end
-end
-
-ctx.events(:name => "load.reset") do |msg|
- ctx["total"] = 0
- puts "VLT: Received reset event!"
-end
-
-# ctx.every(:minute) do
-# Hastur.gauge("load.spikes", ctx["total"])
-# end

0 comments on commit c42d316

Please sign in to comment.