Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions lib/orb/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -577,15 +577,29 @@ class << self
#
# @return [Enumerable]
#
def enum_lines(enum)
def decode_lines(enum)
re = /(\r\n|\r|\n)/
buffer = String.new.b
cr_seen = nil

chain_fused(enum) do |y|
buffer = String.new
enum.each do |row|
buffer << row
while (idx = buffer.index("\n"))
y << buffer.slice!(..idx)
while (match = re.match(buffer, cr_seen.to_i))
case [match.captures.first, cr_seen]
in ["\r", nil]
cr_seen = match.end(1)
next
in ["\r" | "\r\n", Integer]
y << buffer.slice!(..(cr_seen.pred))
else
y << buffer.slice!(..(match.end(1).pred))
end
cr_seen = nil
end
end

y << buffer.slice!(..(cr_seen.pred)) unless cr_seen.nil?
y << buffer unless buffer.empty?
end
end
Expand All @@ -598,26 +612,26 @@ def enum_lines(enum)
#
# @return [Hash{Symbol=>Object}]
#
def parse_sse(lines)
def decode_sse(lines)
chain_fused(lines) do |y|
blank = {event: nil, data: nil, id: nil, retry: nil}
current = {}

lines.each do |line|
case line.strip
case line.sub(/\R$/, "")
in ""
next if current.empty?
y << {**blank, **current}
current = {}
in /^:/
next
in /^([^:]+):\s?(.*)$/
_, field, value = Regexp.last_match.to_a
field, value = Regexp.last_match.captures
case field
in "event"
current.merge!(event: value)
in "data"
(current[:data] ||= String.new) << (value << "\n")
(current[:data] ||= String.new.b) << value << "\n"
in "id" unless value.include?("\0")
current.merge!(id: value)
in "retry" if /^\d+$/ =~ value
Expand Down
4 changes: 2 additions & 2 deletions rbi/lib/orb/util.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ module Orb
end

sig { params(enum: T::Enumerable[String]).returns(T::Enumerable[String]) }
def self.enum_lines(enum)
def self.decode_lines(enum)
end

sig { params(lines: T::Enumerable[String]).returns(Orb::Util::SSEMessage) }
def self.parse_sse(lines)
def self.decode_sse(lines)
end
end
end
4 changes: 2 additions & 2 deletions sig/orb/util.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ module Orb
type sse_message =
{ event: String?, data: String?, id: String?, retry: Integer? }

def self?.enum_lines: (Enumerable[String] enum) -> Enumerable[String]
def self?.decode_lines: (Enumerable[String] enum) -> Enumerable[String]

def self?.parse_sse: (Enumerable[String] lines) -> Orb::Util::sse_message
def self?.decode_sse: (Enumerable[String] lines) -> Orb::Util::sse_message
end
end
45 changes: 37 additions & 8 deletions test/orb/util_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ def test_close_fused_sse_chain
.map(&:to_s)

fused_1 = Orb::Util.fused_enum(enum)
fused_2 = Orb::Util.enum_lines(fused_1)
fused_3 = Orb::Util.parse_sse(fused_2)
fused_2 = Orb::Util.decode_lines(fused_1)
fused_3 = Orb::Util.decode_sse(fused_2)

assert_equal(0, taken)
Orb::Util.close_fused!(fused_3)
Expand All @@ -330,7 +330,7 @@ def test_close_fused_sse_chain
end

class Orb::Test::UtilSseTest < Minitest::Test
def test_enum_lines
def test_decode_lines
cases = {
%w[] => %w[],
%W[\n\n] => %W[\n \n],
Expand All @@ -340,15 +340,37 @@ def test_enum_lines
%W[a\nb\n] => %W[a\n b\n],
%W[\na b\n] => %W[\n ab\n],
%W[\na b\n\n] => %W[\n ab\n \n],
%W[\na b] => %W[\n ab]
%W[\na b] => %W[\n ab],
%W[\u1F62E\u200D\u1F4A8] => %W[\u1F62E\u200D\u1F4A8],
%W[\u1F62E \u200D \u1F4A8] => %W[\u1F62E\u200D\u1F4A8]
}
eols = %W[\n \r \r\n]
cases.each do |enum, expected|
lines = Orb::Util.enum_lines(enum)
eols.each do |eol|
lines = Orb::Util.decode_lines(enum.map { _1.gsub("\n", eol) })
assert_equal(expected.map { _1.gsub("\n", eol) }, lines.to_a, "eol=#{JSON.generate(eol)}")
end
end
end

def test_mixed_decode_lines
cases = {
%w[] => %w[],
%W[\r\r] => %W[\r \r],
%W[\r \r] => %W[\r \r],
%W[\r\r\r] => %W[\r \r \r],
%W[\r\r \r] => %W[\r \r \r],
%W[\r \n] => %W[\r\n],
%W[\r\r\n] => %W[\r \r\n],
%W[\n\r] => %W[\n \r]
}
cases.each do |enum, expected|
lines = Orb::Util.decode_lines(enum)
assert_equal(expected, lines.to_a)
end
end

def test_parse_sse
def test_decode_sse
cases = {
"empty input" => {
[] => []
Expand All @@ -372,8 +394,8 @@ def test_parse_sse
},
"complete event" => {
[
"event: update\n",
"id: 123\n",
"event: update\n",
"data: hello world\n",
"retry: 5000\n",
"\n"
Expand Down Expand Up @@ -454,12 +476,19 @@ def test_parse_sse
{data: "first\n"},
{data: "second\n"}
]
},
"multibyte unicode" => {
[
"data: \u1F62E\u200D\u1F4A8\n"
] => [
{data: "\u1F62E\u200D\u1F4A8\n"}
]
}
}

cases.each do |name, test_cases|
test_cases.each do |input, expected|
actual = Orb::Util.parse_sse(input).map(&:compact)
actual = Orb::Util.decode_sse(input).map(&:compact)
assert_equal(expected, actual, name)
end
end
Expand Down