Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset based pagination #75

Merged
merged 6 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Karafka Web changelog

## 0.7.0 (Unreleased)
- [Improvement] Switch to offset based pagination instead of per-page pagination.
- [Improvement] Avoid double-reading of watermark offsets for explorer and errors display.
- [Improvement] When no params needed for a page, do not include empty params.
- [Improvement] Do not include page when page is 1.
- [Refactor] Reorganize pagination engine to support offset based pagination.
Expand Down
17 changes: 17 additions & 0 deletions lib/karafka/web/ui/controllers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def initialize(params)
@params = params
end

private

# Builds the respond data object with assigned attributes based on instance variables.
#
# @return [Responses::Data] data that should be used to render appropriate view
Expand All @@ -33,6 +35,21 @@ def respond
attributes
)
end

# Initializes the expected pagination engine and assigns expected arguments
# @param args Any arguments accepted by the selected pagination engine
def paginate(*args)
engine = case args.count
when 2
Ui::Lib::Paginations::PageBased
when 3
Ui::Lib::Paginations::OffsetBased
else
raise ::Karafka::Errors::UnsupportedCaseError, args.count
end

@pagination = engine.new(*args)
end
end
end
end
Expand Down
5 changes: 1 addition & 4 deletions lib/karafka/web/ui/controllers/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ def index
@params.current_page
)

@page_scope = Ui::Lib::PageScopes::PageBased.new(
@params.current_page,
!last_page
)
paginate(@params.current_page, !last_page)

respond
end
Expand Down
5 changes: 1 addition & 4 deletions lib/karafka/web/ui/controllers/consumers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ def index
@params.current_page
)

@page_scope = Ui::Lib::PageScopes::PageBased.new(
@params.current_page,
!last_page
)
paginate(@params.current_page, !last_page)

respond
end
Expand Down
24 changes: 14 additions & 10 deletions lib/karafka/web/ui/controllers/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ module Controllers
class Errors < Base
# Lists first page of the errors
def index
@error_messages, last_page, = Models::Message.page(
errors_topic,
0,
@params.current_page
)

@watermark_offsets = Ui::Models::WatermarkOffsets.find(errors_topic, 0)
previous_offset, @error_messages, next_offset, = current_page_data

@page_scope = Ui::Lib::PageScopes::PageBased.new(
@params.current_page,
!last_page
)
paginate(previous_offset, @params.current_offset, next_offset)

respond
end
Expand All @@ -39,6 +31,18 @@ def show(offset)

private

# @return [Array] Array with requested messages as well as pagination details and other
# obtained metadata
def current_page_data
Models::Message.offset_page(
errors_topic,
0,
@params.current_offset,
@watermark_offsets[:low],
@watermark_offsets[:high]
)
end

# @return [String] errors topic
def errors_topic
::Karafka::Web.config.topics.errors
Expand Down
10 changes: 2 additions & 8 deletions lib/karafka/web/ui/controllers/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,9 @@ def index
end
end

@jobs, last_page = Ui::Lib::PaginateArray.call(
jobs_total,
@params.current_page
)
@jobs, last_page = Ui::Lib::PaginateArray.call(jobs_total, @params.current_page)

@page_scope = Ui::Lib::PageScopes::PageBased.new(
@params.current_page,
!last_page
)
paginate(@params.current_page, !last_page)

respond
end
Expand Down
10 changes: 10 additions & 0 deletions lib/karafka/web/ui/controllers/requests/params.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def current_page
page.positive? ? page : 1
end
end

# @return [Integer] offset from which we want to start. `-1` indicates, that we want
# to show the first page discovered based on the high watermark offset. If no offset
# is provided, we go with the high offset first page approach
def current_offset
@current_offset ||= begin
offset = @request_params.fetch('offset', -1).to_i
offset < -1 ? -1 : offset
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Web
module Ui
module Lib
# Namespace for all the types of pagination engines we want to support
module PageScopes
module Paginations
# Abstraction on top of pagination, so we can alter pagination key and other things
# for non-standard pagination views (non page based, etc)
#
Expand Down
84 changes: 84 additions & 0 deletions lib/karafka/web/ui/lib/paginations/offset_based.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

module Karafka
module Web
module Ui
module Lib
module Paginations
# Kafka offset based pagination backend
#
# Allows us to support paginating over offsets
class OffsetBased < Base
# @param previous_offset [Integer, false] previous offset or false if should not be
# presented
# @param current_offset [Integer] current offset
# @param next_offset [Integer, Boolean] should we show next offset page button. If
# false it will not be presented.
def initialize(
previous_offset,
current_offset,
next_offset
)
@previous_offset = previous_offset
@current_offset = current_offset
@next_offset = next_offset
super()
end

# Show pagination only when there is more than one page of results to be presented
#
# @return [Boolean]
def paginate?
@current_offset && (!!@previous_offset || !!@next_offset)
end

# @return [Boolean] active only when we are not on the first page. First page is always
# indicated by the current offset being -1. If there is someone that sets up the
# current offset to a value equal to the last message in the topic partition, we do
# not consider it as a first page and we allow to "reset" to -1 via the first page
# button
def first_offset?
@current_offset != -1
end

# @return [Boolean] first page offset is always nothing because we use the default -1
# for the offset.
def first_offset
false
end

# @return [Boolean] Active previous page link when it is not the first page
def previous_offset?
!!@previous_offset
end

# @return [Boolean] Since this is offset based pagination, there is no notion of
# the current page and we operate on offsets. Because of that there is no continuous
# pagination, thus we hide the current page.
def current_offset?
false
end

# @return [Boolean] move to the next page if not false. False indicates, that there is
# no next page to move to
def next_offset?
!!@next_offset
end

# If there is no next offset, we point to 0 as there should be no smaller offset than
# that in Kafka ever
# @return [Integer]
def next_offset
next_offset? ? @next_offset : 0
end

# @return [String] for offset based pagination we use the offset param name
def offset_key
'offset'
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Karafka
module Web
module Ui
module Lib
module PageScopes
module Paginations
# Regular page-based pagination engine
class PageBased < Base
# @param current_offset [Integer] current page
Expand Down
52 changes: 31 additions & 21 deletions lib/karafka/web/ui/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,44 @@ def find(topic_id, partition_id, offset)
)
end

# Fetches requested page of Kafka messages.
# Fetches requested `page_count` number of Kafka messages starting from the oldest
# requested `start_offset`. If `start_offset` is `-1`, will fetch the most recent
# results
#
# @param topic_id [String]
# @param partition_id [Integer]
# @param page [Integer]
# @return [Array] We return both page data as well as all the details needed to build
# @param start_offset [Integer] oldest offset from which we want to get the data
# @param low_offset [Integer] low watermark offset
# @param high_offset [Integer] high watermark offset
# @return [Array] We return page data as well as all the details needed to build
# the pagination details.
def page(topic_id, partition_id, page)
low_offset, high_offset = Karafka::Admin.read_watermark_offsets(
topic_id,
partition_id
)

def offset_page(topic_id, partition_id, start_offset, low_offset, high_offset)
partitions_count = fetch_partition_count(topic_id)

no_data_result = [[], true, partitions_count]
# If we start from offset -1, it means we want first page with the most recent
# results. We obtain this page by using the offset based on the high watermark
# off
start_offset = high_offset - per_page if start_offset == -1

# If there is not even one message, we need to early exit
# If low and high watermark offsets are of the same value, it means no data in the
# topic is present
return no_data_result if low_offset == high_offset
# No previous pages, no data, and no more offsets
no_data_result = [false, [], false, partitions_count]

# We add plus one because we compute previous offset from which we want to start and
# not previous page leading offset
start_offset = high_offset - (per_page * page)
# If there is no data, we return the no results result
return no_data_result if low_offset == high_offset

if start_offset <= low_offset
# If this page does not contain max per page, compute how many messages we can
# fetch before stopping
count = per_page - (low_offset - start_offset)
last_page = true
next_offset = false
start_offset = low_offset
else
last_page = false
count = per_page
next_offset = start_offset - per_page
# Do not go below the lowest possible offset
next_offset = low_offset if next_offset < low_offset
count = high_offset - start_offset
# If there would be more messages that we want to get, force max
count = per_page if count > per_page
end

# This code is a bit tricky. Since topics can be compacted and certain offsets may
Expand Down Expand Up @@ -91,9 +96,14 @@ def page(topic_id, partition_id, page)

next unless messages

previous_offset = start_offset + count

return [
# If there is a potential previous page with more recent data, compute its
# offset
previous_offset >= high_offset ? false : previous_offset,
fill_compacted(messages, context_offset, context_count).reverse,
last_page,
next_offset,
partitions_count
]
end
Expand Down
5 changes: 1 addition & 4 deletions lib/karafka/web/ui/pro/controllers/consumers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ def index
@params.current_page
)

@page_scope = Ui::Lib::PageScopes::PageBased.new(
@params.current_page,
!last_page
)
paginate(@params.current_page, !last_page)

respond
end
Expand Down
37 changes: 23 additions & 14 deletions lib/karafka/web/ui/pro/controllers/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,13 @@ module Controllers
class Errors < Ui::Controllers::Base
# @param partition_id [Integer] id of the partition of errors we are interested in
def index(partition_id)
errors_topic = ::Karafka::Web.config.topics.errors
@partition_id = partition_id
@error_messages, last_page, @partitions_count = \
Models::Message.page(
errors_topic,
@partition_id,
@params.current_page
)

@page_scope = Ui::Lib::PageScopes::PageBased.new(
@params.current_page,
!last_page
)

@watermark_offsets = Ui::Models::WatermarkOffsets.find(errors_topic, @partition_id)

previous_page, @error_messages, next_page, @partitions_count = current_page_data

paginate(previous_page, @params.current_offset, next_page)

respond
end

Expand All @@ -44,7 +35,6 @@ def index(partition_id)
# @param partition_id [Integer]
# @param offset [Integer]
def show(partition_id, offset)
errors_topic = ::Karafka::Web.config.topics.errors
@partition_id = partition_id
@offset = offset
@error_message = Models::Message.find(
Expand All @@ -55,6 +45,25 @@ def show(partition_id, offset)

respond
end

private

# @return [Array] Array with requested messages as well as pagination details and other
# obtained metadata
def current_page_data
Models::Message.offset_page(
errors_topic,
@partition_id,
@params.current_offset,
@watermark_offsets[:low],
@watermark_offsets[:high]
)
end

# @return [String] errors topic
def errors_topic
::Karafka::Web.config.topics.errors
end
end
end
end
Expand Down
Loading