Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed May 29, 2023
1 parent 55d8e4c commit 23325b2
Show file tree
Hide file tree
Showing 39 changed files with 554 additions and 266 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Karafka Web changelog

## 0.6.0 (Unreleased)
- **[Feature]** Introduce producers errors tracking.
- [Improvement] Improve pagination by providing a "Go to first page" fast button.
- [Improvement] Provide more explicit into on the consumers view consumers running.
- [Improvement] Replace `compact` policy for states topics with `delete,compact` to optimize used storage.
- [Improvement] Validate error reporting with unified error contract.
- [Improvement] Use estimated errors count for counters presentation taken from the errors topic instead of materialization via from consumers states to allow for producers errors tracking.
- [Improvement] Introduce `schema_version` to error reports.
- [Fix] Fix missing empty `Process name` value in the errors index view.
- [Refactor] Cleanup common components for errors extraction.
- [Refactor] Remove not used and reduntant partials.

## 0.5.2 (2023-05-22)
- Label ActiveJob consumers jobs with `active_job` tag.
- Label Virtual Partitions consumers with `virtual` tag.
Expand Down
26 changes: 13 additions & 13 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
PATH
remote: .
specs:
karafka-web (0.5.2)
karafka-web (0.6.0)
erubi (~> 1.4)
karafka (>= 2.0.40, < 3.0.0)
karafka-core (>= 2.0.12, < 3.0.0)
roda (~> 3.63)
karafka (>= 2.1.2, < 3.0.0)
karafka-core (>= 2.0.13, < 3.0.0)
roda (~> 3.68, >= 3.68)
tilt (~> 2.0)

GEM
Expand All @@ -26,15 +26,15 @@ GEM
ffi (1.15.5)
i18n (1.13.0)
concurrent-ruby (~> 1.0)
karafka (2.0.41)
karafka-core (>= 2.0.12, < 3.0.0)
karafka (2.1.2)
karafka-core (>= 2.0.13, < 3.0.0)
thor (>= 0.20)
waterdrop (>= 2.4.10, < 3.0.0)
waterdrop (>= 2.5.3, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.0.12)
karafka-core (2.0.13)
concurrent-ruby (>= 1.1)
karafka-rdkafka (>= 0.12.1)
karafka-rdkafka (0.12.1)
karafka-rdkafka (>= 0.12.3)
karafka-rdkafka (0.12.3)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand All @@ -45,7 +45,7 @@ GEM
rack (>= 3.0.0.beta1)
webrick
rake (13.0.6)
roda (3.67.0)
roda (3.68.0)
rack
rspec (3.12.0)
rspec-core (~> 3.12.0)
Expand All @@ -70,8 +70,8 @@ GEM
tilt (2.1.0)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
waterdrop (2.5.2)
karafka-core (>= 2.0.12, < 3.0.0)
waterdrop (2.5.3)
karafka-core (>= 2.0.13, < 3.0.0)
zeitwerk (~> 2.3)
webrick (1.8.1)
zeitwerk (2.6.8)
Expand Down
6 changes: 3 additions & 3 deletions karafka-web.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ Gem::Specification.new do |spec|
spec.licenses = %w[LGPL-3.0 Commercial]

spec.add_dependency 'erubi', '~> 1.4'
spec.add_dependency 'karafka', '>= 2.0.40', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.0.12', '< 3.0.0'
spec.add_dependency 'roda', '~> 3.63'
spec.add_dependency 'karafka', '>= 2.1.2', '< 3.0.0'
spec.add_dependency 'karafka-core', '>= 2.0.13', '< 3.0.0'
spec.add_dependency 'roda', '~> 3.68', '>= 3.68'
spec.add_dependency 'tilt', '~> 2.0'

spec.add_development_dependency 'rackup', '~> 0.2'
Expand Down
22 changes: 17 additions & 5 deletions lib/karafka/web/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@ class Config
# Topic for storing states aggregated info
setting :states, default: 'karafka_consumers_states'
end

setting :producers do
# Reports containing particular producers instances informations and metrics.
# It is similar in nature to the consumers reports one.
setting :reports, default: 'karafka_producers_reports'
end
end

# Tracking and reporting related settings
setting :tracking do
# Collects the metrics we will be dispatching
# Tracks and reports the collected metrics
setting :reporter, default: Tracking::Reporter.new

# How often should we report data from a single process
# You may set it to a lower value in development but in production and scale, every
# 5 seconds should be enough
setting :interval, default: 5_000

setting :consumers do
# Reports the metrics collected in the sampler
setting :reporter, default: Tracking::Consumers::Reporter.new

setting :sampler, default: Tracking::Consumers::Sampler.new

setting :listeners, default: [
Expand All @@ -51,7 +56,14 @@ class Config
end

setting :producers do
setting :listeners, default: []
setting :reporter, default: Tracking::Producers::Reporter.new

setting :sampler, default: Tracking::Producers::Sampler.new

setting :listeners, default: [
Tracking::Producers::Listeners::Errors.new,
Tracking::Producers::Listeners::Reporter.new
]
end
end

Expand Down
5 changes: 2 additions & 3 deletions lib/karafka/web/installer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Installer
# @param replication_factor [Integer] replication factor we want to use (1 by default)
def bootstrap!(replication_factor: 1)
bootstrap_topics!(replication_factor)
bootstrap_state!
bootstrap_consumers_state!
end

# Removes all the Karafka topics and creates them again with the same replication factor
Expand Down Expand Up @@ -142,7 +142,7 @@ def bootstrap_topics!(replication_factor = 1)
end

# Creates the initial state record with all values being empty
def bootstrap_state!
def bootstrap_consumers_state!
::Karafka.producer.produce_sync(
topic: Karafka::Web.config.topics.consumers.states,
key: Karafka::Web.config.topics.consumers.states,
Expand All @@ -151,7 +151,6 @@ def bootstrap_state!
stats: {
batches: 0,
messages: 0,
errors: 0,
retries: 0,
dead: 0,
busy: 0,
Expand Down
31 changes: 0 additions & 31 deletions lib/karafka/web/tracking/base_contract.rb

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Consumers
module Contracts
# Expected data for each consumer group
# It's mostly about subscription groups details
class ConsumerGroup < BaseContract
class ConsumerGroup < Tracking::Contracts::Base
configure

required(:id) { |val| val.is_a?(String) && !val.empty? }
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web/tracking/consumers/contracts/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Tracking
module Consumers
module Contracts
# Contract for the job reporting details
class Job < BaseContract
class Job < Tracking::Contracts::Base
configure

required(:consumer) { |val| val.is_a?(String) }
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web/tracking/consumers/contracts/partition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Tracking
module Consumers
module Contracts
# Partition metrics required for web to operate
class Partition < BaseContract
class Partition < Tracking::Contracts::Base
configure

required(:id) { |val| val.is_a?(Integer) && val >= 0 }
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web/tracking/consumers/contracts/report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Contracts
#
# Any outgoing reporting needs to match this format for it to work with the statuses
# consumer.
class Report < BaseContract
class Report < Tracking::Contracts::Base
configure

required(:schema_version) { |val| val.is_a?(String) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Consumers
module Contracts
# Expected data for each subscription group
# It's mostly about topics details
class SubscriptionGroup < BaseContract
class SubscriptionGroup < Tracking::Contracts::Base
configure

required(:id) { |val| val.is_a?(String) && !val.empty? }
Expand Down
4 changes: 3 additions & 1 deletion lib/karafka/web/tracking/consumers/contracts/topic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ module Tracking
module Consumers
module Contracts
# Expected topic information that needs to go out
class Topic < BaseContract
class Topic < Tracking::Contracts::Base
configure

required(:name) { |val| val.is_a?(String) && !val.empty? }
required(:partitions) { |val| val.is_a?(Hash) }

Expand Down
6 changes: 6 additions & 0 deletions lib/karafka/web/tracking/consumers/listeners/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ module Listeners
class Errors < Base
include Tracking::Helpers::ErrorInfo

# Schema used by consumers error reporting
SCHEMA_VERSION = '1.0.0'

private_constant :SCHEMA_VERSION

# Collects errors info and counts errors
#
# @param event [Karafka::Core::Monitoring::Event]
Expand All @@ -25,6 +30,7 @@ def on_error_occurred(event)
error_class, error_message, backtrace = extract_error_info(event[:error])

sampler.errors << {
schema_version: SCHEMA_VERSION,
type: event[:type],
error_class: error_class,
error_message: error_message,
Expand Down
7 changes: 5 additions & 2 deletions lib/karafka/web/tracking/consumers/reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class Reporter
def initialize
# Move back so first report is dispatched fast to indicate, that the process is alive
@tracked_at = monotonic_now - 10_000
@contract = Consumers::Contracts::Report.new
@report_contract = Consumers::Contracts::Report.new
@error_contract = Tracking::Contracts::Error.new
end

# Dispatches the current state from sampler to appropriate topics
Expand All @@ -43,7 +44,7 @@ def report(forced: false)

report = sampler.to_report

@contract.validate!(report)
@report_contract.validate!(report)

process_name = report[:process][:name]

Expand All @@ -59,6 +60,8 @@ def report(forced: false)

# Report errors that occurred (if any)
messages += sampler.errors.map do |error|
@error_contract.validate!(error)

{
topic: Karafka::Web.config.topics.errors,
payload: error.to_json,
Expand Down
33 changes: 33 additions & 0 deletions lib/karafka/web/tracking/contracts/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Karafka
module Web
module Tracking
module Contracts
# Base for all the metric related contracts
class Base < ::Karafka::Core::Contractable::Contract
class << self
# This layer is not for users extensive feedback, thus we can easily use the minimum
# error messaging there is.
def configure
super do |config|
config.error_messages = YAML.safe_load(
File.read(
File.join(Karafka::Web.gem_root, 'config', 'locales', 'errors.yml')
)
).fetch('en').fetch('validations').fetch('web')
end
end
end

# @param data [Hash] data for validation
# @return [Boolean] true if all good
# @raise [Errors::ContractError] invalid report
def validate!(data)
super(data, Errors::Tracking::ContractError)
end
end
end
end
end
end
31 changes: 31 additions & 0 deletions lib/karafka/web/tracking/contracts/error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

module Karafka
module Web
module Tracking
module Contracts
# Contract for error reporting
# Since producers and consumers report their errors to the same topic, we need to have
# a unified contract for both
class Error < Base
configure

required(:schema_version) { |val| val.is_a?(String) }
required(:type) { |val| val.is_a?(String) && !val.empty? }
required(:error_class) { |val| val.is_a?(String) && !val.empty? }
required(:error_message) { |val| val.is_a?(String) }
required(:backtrace) { |val| val.is_a?(String) }
required(:details) { |val| val.is_a?(Hash) }
required(:occurred_at) { |val| val.is_a?(Float) }

nested(:process) do
required(:name) { |val| val.is_a?(String) && !val.empty? }
# Tags may not be present for producers because they may operate from outside of
# karafka taggable process
optional(:tags) { |val| val.is_a?(Karafka::Core::Taggable::Tags) }
end
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/karafka/web/tracking/helpers/error_info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
module Karafka
module Web
module Tracking
# Namespace for tracking related helpers
module Helpers
# Module containing some helper methods useful for extracting extra errors info
module ErrorInfo
# Extracts the basic error info
#
Expand Down
Loading

0 comments on commit 23325b2

Please sign in to comment.