Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instrumentation unification #340

Merged
merged 7 commits into from May 25, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -3,6 +3,8 @@
## 1.3-wip
- #300 - Store value in a value key and replace its content with parsed version - without root merge
- #331 - Disallow building groups without topics
- #340 - Instrumentation unification. Better and more consistent naming
- #340 - Procline instrumentation for a nicer process name
- #342 - Change default for `fetcher_max_queue_size` from `100` to `10` to lower max memory usage

## 1.2.4
@@ -13,6 +13,7 @@ class << self
# @raise [Karafka::Errors::InvalidConfiguration] raised when configuration
# doesn't match with ConfigurationSchema
def boot!
initialize!
Setup::Config.validate!
Setup::Config.setup_components
Callbacks.after_init(Karafka::App.config)
@@ -13,11 +13,10 @@ class Server < Base

# Start the Karafka server
def call
validate!

puts 'Starting Karafka server'
cli.info

validate!

if cli.options[:daemon]
FileUtils.mkdir_p File.dirname(cli.options[:pid])
daemonize
@@ -34,8 +34,10 @@ class Monitor < Dry::Monitor::Notifications
async_producer.call.retry
sync_producer.call.error
sync_producer.call.retry
server.stop
server.stop.error
app.initializing
app.running
app.stopping
app.stopping.error
].freeze

private_constant :BASE_EVENTS
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module Karafka
module Instrumentation
# Listener that sets a proc title with a nice descriptive value
module ProctitleListener
class << self
# Updates proc title to an initializing one
# @param _event [Dry::Events::Event] event details including payload
def on_app_initializing(_event)
setproctitle('initializing')
end

# Updates proc title to a running one
# @param _event [Dry::Events::Event] event details including payload
def on_app_running(_event)
setproctitle('running')
end

# Updates proc title to a stopping one
# @param _event [Dry::Events::Event] event details including payload
def on_app_stopping(_event)
setproctitle('stopping')
end

private

# Sets a proper proc title with our constant prefix
# @param status [String] any status we want to set
def setproctitle(status)
::Process.setproctitle(
"karafka #{Karafka::App.config.client_id} (#{status})"
)
end
end
end
end
end
@@ -4,7 +4,7 @@ module Karafka
module Instrumentation
# Default listener that hooks up to our instrumentation and uses its events for logging
# It can be removed/replaced or anything without any harm to the Karafka app flow
module Listener
module StdoutListener
# Log levels that we use in this particular listener
USED_LOG_LEVELS = %i[
debug
@@ -86,17 +86,29 @@ def on_consumers_responders_respond_with(event)
info "Responded from #{calling} using #{responder} with following data #{data}"
end

# Logs info that we're initializing Karafka app
# @param _event [Dry::Events::Event] event details including payload
def on_app_initializing(_event)
info "Initializing Karafka server #{::Process.pid}"
end

# Logs info that we're running Karafka app
# @param _event [Dry::Events::Event] event details including payload
def on_app_running(_event)
info "Running Karafka server #{::Process.pid}"
end

# Logs info that we're going to stop the Karafka server
# @param _event [Dry::Events::Event] event details including payload
def on_server_stop(_event)
def on_app_stopping(_event)
# We use a separate thread as logging can't be called from trap context
Thread.new { info "Stopping Karafka server #{::Process.pid}" }
end

# Logs an error that Karafka was unable to stop the server gracefully and it had to do a
# forced exit
# @param _event [Dry::Events::Event] event details including payload
def on_server_stop_error(_event)
def on_app_stopping_error(_event)
# We use a separate thread as logging can't be called from trap context
Thread.new { error "Forceful Karafka server #{::Process.pid} stop" }
end
@@ -19,7 +19,7 @@ def consumer_loop
.flat_map(&:values)
.select { |ctrl| ctrl.respond_to?(:run_callbacks) }

if Karafka::App.stopped?
if Karafka::App.stopping?
consumers.each { |ctrl| ctrl.run_callbacks :before_stop }
Karafka::Persistence::Client.read.stop
else
@@ -22,7 +22,7 @@ def run
process.on_sigint { stop_supervised }
process.on_sigquit { stop_supervised }
process.on_sigterm { stop_supervised }
start_supervised
run_supervised
end

# @return [Array<String>] array with names of consumer groups that should be consumed in a
@@ -42,7 +42,7 @@ def process
# Starts Karafka with a supervision
# @note We don't need to sleep because Karafka::Fetcher is locking and waiting to
# finish loop (and it won't happen until we explicitily want to stop)
def start_supervised
def run_supervised
process.supervise
Karafka::App.run!
Karafka::Fetcher.call
@@ -51,13 +51,6 @@ def start_supervised
# Stops Karafka with a supervision (as long as there is a shutdown timeout)
# If consumers won't stop in a given timeframe, it will force them to exit
def stop_supervised
# Because this is called in the trap context, there is a chance that instrumentation
# listeners contain things that aren't allowed from within a trap context.
# To bypass that (instead of telling users not to do things they need to)
# we spin up a thread to instrument server.stop and server.stop.error and wait until
# they're finished
Thread.new { Karafka.monitor.instrument('server.stop', {}) }.join

Karafka::App.stop!
# If there is no shutdown timeout, we don't exit and wait until all the consumers
# had done their work
@@ -73,7 +66,7 @@ def stop_supervised

raise Errors::ForcefulShutdown
rescue Errors::ForcefulShutdown => error
Thread.new { Karafka.monitor.instrument('server.stop.error', error: error) }.join
Thread.new { Karafka.monitor.instrument('app.stopping.error', error: error) }.join
# We're done waiting, lets kill them!
consumer_threads.each(&:terminate)

@@ -10,7 +10,6 @@ module Dsl
# @param [Block] block configuration block
def setup(&block)
Setup::Config.setup(&block)
initialize!
end

# @return [Karafka::Config] config instance
@@ -9,7 +9,7 @@ class Status
STATES = {
initializing: :initialize!,
running: :run!,
stopped: :stop!
stopping: :stop!
}.freeze

STATES.each do |state, transition|
@@ -19,6 +19,9 @@ class Status

define_method transition do
@status = state
# Trap context disallows to run certain things that we instrument
# so the state changes are executed from a separate thread
Thread.new { Karafka.monitor.instrument("app.#{state}", {}) }.join
end
end
end
@@ -15,10 +15,6 @@
allow(Karafka::Server).to receive(:run)
end

it 'expect to print info and expect to run Karafka application' do
expect { server_cli.call }.to output("Starting Karafka server\n").to_stdout
end

it 'expect to validate!' do
expect(server_cli).to receive(:validate!)
server_cli.call
@@ -46,8 +42,7 @@
it 'expect to print info, validate!, daemonize and clean' do
expect(server_cli).to receive(:validate!)
expect(server_cli).to receive(:daemonize)

expect { server_cli.call }.to output("Starting Karafka server\n").to_stdout
server_cli.call
end
end
end
@@ -48,7 +48,7 @@

describe '#available_events' do
it 'expect to include registered events' do
expect(monitor.available_events.size).to eq 15
expect(monitor.available_events.size).to eq 17
end

it { expect(monitor.available_events).to include 'connection.listener.fetch_loop.error' }
@@ -0,0 +1,27 @@
# frozen_string_literal: true

RSpec.describe Karafka::Instrumentation::ProctitleListener do
describe '#on_app_initializing' do
let(:expected_title) { "karafka #{Karafka::App.config.client_id} (initializing)" }

after { described_class.on_app_initializing({}) }

it { expect(::Process).to receive(:setproctitle).with(expected_title) }
end

describe '#on_app_running' do
let(:expected_title) { "karafka #{Karafka::App.config.client_id} (running)" }

after { described_class.on_app_running({}) }

it { expect(::Process).to receive(:setproctitle).with(expected_title) }
end

describe '#on_app_stopping' do
let(:expected_title) { "karafka #{Karafka::App.config.client_id} (stopping)" }

after { described_class.on_app_stopping({}) }

it { expect(::Process).to receive(:setproctitle).with(expected_title) }
end
end
@@ -1,6 +1,6 @@
# frozen_string_literal: true

RSpec.describe Karafka::Instrumentation::Listener do
RSpec.describe Karafka::Instrumentation::StdoutListener do
let(:event) { Dry::Events::Event.new(rand, payload) }
let(:time) { rand }
let(:topic) { instance_double(Karafka::Routing::Topic, name: topic_name) }
@@ -156,8 +156,8 @@
end
end

describe '#on_server_stop' do
subject(:trigger) { described_class.on_server_stop(event) }
describe '#on_app_stopping' do
subject(:trigger) { described_class.on_app_stopping(event) }

let(:payload) { {} }
let(:message) { "Stopping Karafka server #{::Process.pid}" }
@@ -172,8 +172,8 @@
end
end

describe '#on_server_stop_error' do
subject(:trigger) { described_class.on_server_stop_error(event) }
describe '#on_app_stopping_error' do
subject(:trigger) { described_class.on_app_stopping_error(event) }

let(:payload) { {} }
let(:message) { "Forceful Karafka server #{::Process.pid} stop" }
@@ -24,10 +24,10 @@ def consumer_loop

after { Thread.current[:client] = nil }

context 'when karafka app has stopped' do
before { allow(Karafka::App).to receive(:stopped?).and_return(true) }
context 'when karafka app has stopping' do
before { allow(Karafka::App).to receive(:stopping?).and_return(true) }

it 'expect to not yield the original block as it would process data when stopped' do
it 'expect to not yield the original block as it would process data when stopping' do
expect { |block| kafka_consumer.consumer_loop(&block) }.not_to yield_control
end

@@ -38,7 +38,7 @@ def consumer_loop
end

context 'when karafka is running' do
before { allow(Karafka::App).to receive(:stopped?).and_return(false) }
before { allow(Karafka::App).to receive(:stopping?).and_return(false) }

it 'expect to yield the original base block' do
expect { |block| kafka_consumer.consumer_loop(&block) }.to yield_control
@@ -58,7 +58,7 @@ def consumer_loop
0
)

allow(Karafka::App).to receive(:stopped?).and_return(false)
allow(Karafka::App).to receive(:stopping?).and_return(false)
end

it 'expect to yield the original base block' do
@@ -59,8 +59,8 @@
end
end

describe '#start_supervised' do
after { server_class.send(:start_supervised) }
describe '#run_supervised' do
after { server_class.send(:run_supervised) }

it 'expect to supervise and run' do
expect(Karafka::Process.instance).to receive(:supervise)
@@ -5,7 +5,7 @@

it 'by default expect to be in initialized state because it is bootstraped' do
expect(status_manager.running?).to eq false
expect(status_manager.stopped?).to eq false
expect(status_manager.stopping?).to eq false
expect(status_manager.initializing?).to eq true
end

@@ -33,25 +33,25 @@

it { expect(status_manager.running?).to eq true }
it { expect(status_manager.initializing?).to eq false }
it { expect(status_manager.stopped?).to eq false }
it { expect(status_manager.stopping?).to eq false }
end
end

describe 'stopped?' do
context 'when status is not set to stopped' do
describe 'stopping?' do
context 'when status is not set to stopping' do
before do
status_manager.instance_variable_set(:'@status', rand)
end

it { expect(status_manager.stopped?).to eq false }
it { expect(status_manager.stopping?).to eq false }
end

context 'when status is set to stopped' do
context 'when status is set to stopping' do
before do
status_manager.instance_variable_set(:'@status', :stopped)
status_manager.instance_variable_set(:'@status', :stopping)
end

it { expect(status_manager.stopped?).to eq true }
it { expect(status_manager.stopping?).to eq true }
end
end

@@ -64,7 +64,7 @@

it { expect(status_manager.running?).to eq false }
it { expect(status_manager.initializing?).to eq false }
it { expect(status_manager.stopped?).to eq true }
it { expect(status_manager.stopping?).to eq true }
end
end

@@ -94,7 +94,7 @@

it { expect(status_manager.running?).to eq false }
it { expect(status_manager.initializing?).to eq true }
it { expect(status_manager.stopped?).to eq false }
it { expect(status_manager.stopping?).to eq false }
end
end
end
@@ -80,4 +80,4 @@ class App

# We by default use the default listener for specs to check how it works and that
# it does not break anything
Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)
Karafka.monitor.subscribe(Karafka::Instrumentation::StdoutListener)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.