Skip to content

Commit

Permalink
Overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
y2k2mt committed Jun 12, 2019
1 parent 25062ad commit ea51ad1
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 64 deletions.
2 changes: 2 additions & 0 deletions spec/spec_helper.cr
@@ -0,0 +1,2 @@
require "spec"
require "../src/sse"
19 changes: 9 additions & 10 deletions spec/sse_spec.cr → spec/sse/event_source_spec.cr
@@ -1,7 +1,6 @@
require "spec"
require "../src/sse"
require "../spec_helper"

describe HTTP::ServerSentEvents do
describe HTTP::ServerSentEvents::EventSource do
it "Receive 3 events" do
channel = Channel(Array(String)).new
event_source = HTTP::ServerSentEvents::EventSource.new("http://localhost:8080/events/")
Expand All @@ -16,35 +15,35 @@ describe HTTP::ServerSentEvents do
actual.size.should eq 2
(actual[1].to_i - actual[0].to_i).should eq 1
end
event_source.abort
event_source.stop
end

it "Initialize without args" do
channel = Channel(Array(String)).new
event_source = HTTP::ServerSentEvents::EventSource.new
event_source = HTTP::ServerSentEvents::EventSource.new(URI.parse("http://localhost:8080/events/"))
spawn do
event_source.on_message do |message|
channel.send(message.data)
end
event_source.run(URI.parse("http://localhost:8080/events/"))
event_source.run
end
3.times do
actual = channel.receive
actual.size.should eq 2
(actual[1].to_i - actual[0].to_i).should eq 1
end
event_source.abort
event_source.stop
end

it "Initialize and run without args" do
channel = Channel(Array(String)).new
event_source = HTTP::ServerSentEvents::EventSource.new
event_source = HTTP::ServerSentEvents::EventSource.new("")
spawn do
event_source.on_message do |message|
channel.send(message.data)
end
end
expect_raises URI::Error do
expect_raises ArgumentError do
event_source.run
end
end
Expand All @@ -66,7 +65,7 @@ describe HTTP::ServerSentEvents do
actual.data[0].should eq "foo"
actual.data[1].should eq "bar"
end
event_source.abort
event_source.stop
end

it "Invaluid endpoint" do
Expand Down
111 changes: 57 additions & 54 deletions src/sse/event_source.cr
Expand Up @@ -7,17 +7,14 @@ module HTTP::ServerSentEvents
class EventSource
@@default_retry_duration : Int64 = 3000.to_i64

def initialize(host : String, path : String, port : Int32, tls = false, headers : HTTP::Headers = HTTP::Headers.new)
scheme = tls ? "https" : "http"
initialize(URI.new(scheme: scheme, host: host, path: path, port: port), headers)
end
@last_id : String? = nil
@abort : Bool = false

def initialize(uri : String, headers : HTTP::Headers = HTTP::Headers.new)
initialize(URI.parse(uri), headers)
end

def initialize(@uri : URI | Nil = nil, @base_headers : HTTP::Headers = HTTP::Headers.new)
@abort = false
def initialize(@uri : URI, @base_headers : HTTP::Headers = HTTP::Headers.new)
end

def on_message(&@on_message : EventMessage ->)
Expand All @@ -29,77 +26,83 @@ module HTTP::ServerSentEvents
def on_close(&@on_close : String ->)
end

def abort
def stop
@abort = true
end

def run(uri : URI = @uri, last_id : String? = nil)
if !uri
raise URI::Error.new("Endpoint URI must be specified")
end
def run
loop do
if @abort
break
end
HTTP::Client.get(uri, headers: prepare_headers(last_id)) do |response|
HTTP::Client.get(@uri, headers: prepare_headers) do |response|
case response.status_code
when 200
an_entry = [] of String
io = response.try &.body_io
last_message = nil
loop do
if @abort
break
end
line = io.gets
if !line
break
end
if line.empty? && an_entry.size != 0
last_message = parse_event_message(an_entry)
an_entry = [] of String
@on_message.try &.call(last_message.not_nil!)
else
an_entry = an_entry << line
end
end
if !last_message.not_nil!.id.try &.empty? && !@abort
sleep last_message.not_nil!.retry / 1000
end
when 307
location = response.headers["Location"]
uri = URI.parse(location)
successful_response response
when 302 | 303 | 307
tempolary_redirection response
when 503
body = response.body_io?
if body
@on_error.try &.call({status_code: response.status_code, message: body.gets_to_end})
else
@on_error.try &.call({status_code: response.status_code, message: ""})
end
# Ignore date formatted header
retry_after = response.headers["Retry-After"].to_i64?
if retry_after
sleep retry_after / 1000
else
@abort = true
end
service_unavairable response
else
body = response.body_io?
if body
@on_error.try &.call({status_code: response.status_code, message: body.gets_to_end})
else
@on_error.try &.call({status_code: response.status_code, message: ""})
end
@abort = true
stop
end
end
end
end

private def prepare_headers(last_id : String | Nil)
private def successful_response(response)
an_entry = [] of String
io = response.try &.body_io
last_message = nil
loop do
if @abort
break
end
line = io.gets
if !line
break
end
if line.empty? && an_entry.size != 0
last_message = parse_event_message(an_entry)
an_entry = [] of String
@on_message.try &.call(last_message.not_nil!)
else
an_entry = an_entry << line
end
end
if !last_message.not_nil!.id.try &.empty? && !@abort
sleep last_message.not_nil!.retry / 1000
end
end

private def tempolary_redirection(response)
location = response.headers["Location"]
@uri = URI.parse(location)
end

private def service_unavairable(response)
body = response.body_io?
if body
@on_error.try &.call({status_code: response.status_code, message: body.gets_to_end})
else
@on_error.try &.call({status_code: response.status_code, message: ""})
end
# Ignore date formatted header
response.headers["Retry-After"]?.try &.to_i64?.try { |retry_after|
sleep retry_after / 1000
} || {stop}
end

private def prepare_headers
headers = HTTP::Headers.new
if last_id
headers.add("Last-Event-Id", "#{last_id}")
if @last_id
headers.add("Last-Event-Id", "#{@last_id}")
end
headers.add("Accept", "text/event-stream")
headers.add("Cache-Control", "no-cache")
Expand Down

0 comments on commit ea51ad1

Please sign in to comment.