Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ stream = openai.chat.completions.create_streaming(
model: "gpt-4o"
)

stream.for_each do |completion|
stream.each do |completion|
puts(completion)
end
```
Expand Down
2 changes: 1 addition & 1 deletion lib/openai/base_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def request(req)
decoded = OpenAI::Util.decode_content(response, stream: stream)
case req
in { stream: Class => st }
st.new(model: model, url: url, status: status, response: response, messages: decoded)
st.new(model: model, url: url, status: status, response: response, stream: decoded)
in { page: Class => page }
page.new(client: self, req: req, headers: response, page_data: decoded)
else
Expand Down
15 changes: 8 additions & 7 deletions lib/openai/base_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module OpenAI
# @example
# ```ruby
# stream.for_each do |chunk|
# stream.each do |chunk|
# puts(chunk)
# end
# ```
Expand All @@ -12,7 +12,6 @@ module OpenAI
# ```ruby
# chunks =
# stream
# .to_enum
# .lazy
# .select { _1.object_id.even? }
# .map(&:itself)
Expand All @@ -22,6 +21,8 @@ module OpenAI
# chunks => Array
# ```
module BaseStream
include Enumerable

# @return [void]
def close = OpenAI::Util.close_fused!(@iterator)

Expand All @@ -33,14 +34,14 @@ def close = OpenAI::Util.close_fused!(@iterator)
# @param blk [Proc]
#
# @return [void]
def for_each(&)
def each(&)
unless block_given?
raise ArgumentError.new("A block must be given to ##{__method__}")
end
@iterator.each(&)
end

# @return [Enumerable]
# @return [Enumerator]
def to_enum = @iterator

alias_method :enum_for, :to_enum
Expand All @@ -51,13 +52,13 @@ def to_enum = @iterator
# @param url [URI::Generic]
# @param status [Integer]
# @param response [Net::HTTPResponse]
# @param messages [Enumerable]
def initialize(model:, url:, status:, response:, messages:)
# @param stream [Enumerable]
def initialize(model:, url:, status:, response:, stream:)
@model = model
@url = url
@status = status
@response = response
@messages = messages
@stream = stream
@iterator = iterator
end
end
Expand Down
9 changes: 8 additions & 1 deletion lib/openai/resources/beta/threads.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ def delete(thread_id, params = {})
# @return [OpenAI::Models::Beta::Threads::Run]
def create_and_run(params)
parsed, options = OpenAI::Models::Beta::ThreadCreateAndRunParams.dump_request(params)
parsed.delete(:stream)
if parsed[:stream]
message = "Please use `#create_and_run_streaming` for the streaming use case."
raise ArgumentError.new(message)
end
@client.request(
method: :post,
path: "threads/runs",
Expand Down Expand Up @@ -315,6 +318,10 @@ def create_and_run(params)
# @return [OpenAI::Stream<OpenAI::Models::Beta::AssistantStreamEvent::ThreadCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunQueued, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunRequiresAction, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelling, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ErrorEvent>]
def create_and_run_streaming(params)
parsed, options = OpenAI::Models::Beta::ThreadCreateAndRunParams.dump_request(params)
unless parsed.fetch(:stream, true)
message = "Please use `#create_and_run` for the non-streaming use case."
raise ArgumentError.new(message)
end
parsed.store(:stream, true)
@client.request(
method: :post,
Expand Down
18 changes: 16 additions & 2 deletions lib/openai/resources/beta/threads/runs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ class Runs
# @return [OpenAI::Models::Beta::Threads::Run]
def create(thread_id, params)
parsed, options = OpenAI::Models::Beta::Threads::RunCreateParams.dump_request(params)
parsed.delete(:stream)
if parsed[:stream]
message = "Please use `#create_streaming` for the streaming use case."
raise ArgumentError.new(message)
end
query_params = [:include]
@client.request(
method: :post,
Expand Down Expand Up @@ -254,6 +257,10 @@ def create(thread_id, params)
# @return [OpenAI::Stream<OpenAI::Models::Beta::AssistantStreamEvent::ThreadCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunQueued, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunRequiresAction, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelling, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ErrorEvent>]
def create_streaming(thread_id, params)
parsed, options = OpenAI::Models::Beta::Threads::RunCreateParams.dump_request(params)
unless parsed.fetch(:stream, true)
message = "Please use `#create` for the non-streaming use case."
raise ArgumentError.new(message)
end
parsed.store(:stream, true)
query_params = [:include]
@client.request(
Expand Down Expand Up @@ -410,7 +417,10 @@ def cancel(run_id, params)
# @return [OpenAI::Models::Beta::Threads::Run]
def submit_tool_outputs(run_id, params)
parsed, options = OpenAI::Models::Beta::Threads::RunSubmitToolOutputsParams.dump_request(params)
parsed.delete(:stream)
if parsed[:stream]
message = "Please use `#submit_tool_outputs_streaming` for the streaming use case."
raise ArgumentError.new(message)
end
thread_id =
parsed.delete(:thread_id) do
raise ArgumentError.new("missing required path argument #{_1}")
Expand Down Expand Up @@ -444,6 +454,10 @@ def submit_tool_outputs(run_id, params)
# @return [OpenAI::Stream<OpenAI::Models::Beta::AssistantStreamEvent::ThreadCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunQueued, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunRequiresAction, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelling, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepFailed, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepCancelled, OpenAI::Models::Beta::AssistantStreamEvent::ThreadRunStepExpired, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCreated, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageInProgress, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageDelta, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageCompleted, OpenAI::Models::Beta::AssistantStreamEvent::ThreadMessageIncomplete, OpenAI::Models::Beta::AssistantStreamEvent::ErrorEvent>]
def submit_tool_outputs_streaming(run_id, params)
parsed, options = OpenAI::Models::Beta::Threads::RunSubmitToolOutputsParams.dump_request(params)
unless parsed.fetch(:stream, true)
message = "Please use `#submit_tool_outputs` for the non-streaming use case."
raise ArgumentError.new(message)
end
parsed.store(:stream, true)
thread_id =
parsed.delete(:thread_id) do
Expand Down
9 changes: 8 additions & 1 deletion lib/openai/resources/chat/completions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ class Completions
# @return [OpenAI::Models::Chat::ChatCompletion]
def create(params)
parsed, options = OpenAI::Models::Chat::CompletionCreateParams.dump_request(params)
parsed.delete(:stream)
if parsed[:stream]
message = "Please use `#create_streaming` for the streaming use case."
raise ArgumentError.new(message)
end
@client.request(
method: :post,
path: "chat/completions",
Expand Down Expand Up @@ -433,6 +436,10 @@ def create(params)
# @return [OpenAI::Stream<OpenAI::Models::Chat::ChatCompletionChunk>]
def create_streaming(params)
parsed, options = OpenAI::Models::Chat::CompletionCreateParams.dump_request(params)
unless parsed.fetch(:stream, true)
message = "Please use `#create` for the non-streaming use case."
raise ArgumentError.new(message)
end
parsed.store(:stream, true)
@client.request(
method: :post,
Expand Down
9 changes: 8 additions & 1 deletion lib/openai/resources/completions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ class Completions
# @return [OpenAI::Models::Completion]
def create(params)
parsed, options = OpenAI::Models::CompletionCreateParams.dump_request(params)
parsed.delete(:stream)
if parsed[:stream]
message = "Please use `#create_streaming` for the streaming use case."
raise ArgumentError.new(message)
end
@client.request(
method: :post,
path: "completions",
Expand Down Expand Up @@ -237,6 +240,10 @@ def create(params)
# @return [OpenAI::Stream<OpenAI::Models::Completion>]
def create_streaming(params)
parsed, options = OpenAI::Models::CompletionCreateParams.dump_request(params)
unless parsed.fetch(:stream, true)
message = "Please use `#create` for the non-streaming use case."
raise ArgumentError.new(message)
end
parsed.store(:stream, true)
@client.request(
method: :post,
Expand Down
9 changes: 8 additions & 1 deletion lib/openai/resources/responses.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ class Responses
# @return [OpenAI::Models::Responses::Response]
def create(params)
parsed, options = OpenAI::Models::Responses::ResponseCreateParams.dump_request(params)
parsed.delete(:stream)
if parsed[:stream]
message = "Please use `#create_streaming` for the streaming use case."
raise ArgumentError.new(message)
end
@client.request(
method: :post,
path: "responses",
Expand Down Expand Up @@ -262,6 +265,10 @@ def create(params)
# @return [OpenAI::Stream<OpenAI::Models::Responses::ResponseAudioDeltaEvent, OpenAI::Models::Responses::ResponseAudioDoneEvent, OpenAI::Models::Responses::ResponseAudioTranscriptDeltaEvent, OpenAI::Models::Responses::ResponseAudioTranscriptDoneEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallCodeDeltaEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallCodeDoneEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallCompletedEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallInProgressEvent, OpenAI::Models::Responses::ResponseCodeInterpreterCallInterpretingEvent, OpenAI::Models::Responses::ResponseCompletedEvent, OpenAI::Models::Responses::ResponseContentPartAddedEvent, OpenAI::Models::Responses::ResponseContentPartDoneEvent, OpenAI::Models::Responses::ResponseCreatedEvent, OpenAI::Models::Responses::ResponseErrorEvent, OpenAI::Models::Responses::ResponseFileSearchCallCompletedEvent, OpenAI::Models::Responses::ResponseFileSearchCallInProgressEvent, OpenAI::Models::Responses::ResponseFileSearchCallSearchingEvent, OpenAI::Models::Responses::ResponseFunctionCallArgumentsDeltaEvent, OpenAI::Models::Responses::ResponseFunctionCallArgumentsDoneEvent, OpenAI::Models::Responses::ResponseInProgressEvent, OpenAI::Models::Responses::ResponseFailedEvent, OpenAI::Models::Responses::ResponseIncompleteEvent, OpenAI::Models::Responses::ResponseOutputItemAddedEvent, OpenAI::Models::Responses::ResponseOutputItemDoneEvent, OpenAI::Models::Responses::ResponseRefusalDeltaEvent, OpenAI::Models::Responses::ResponseRefusalDoneEvent, OpenAI::Models::Responses::ResponseTextAnnotationDeltaEvent, OpenAI::Models::Responses::ResponseTextDeltaEvent, OpenAI::Models::Responses::ResponseTextDoneEvent, OpenAI::Models::Responses::ResponseWebSearchCallCompletedEvent, OpenAI::Models::Responses::ResponseWebSearchCallInProgressEvent, OpenAI::Models::Responses::ResponseWebSearchCallSearchingEvent>]
def create_streaming(params)
parsed, options = OpenAI::Models::Responses::ResponseCreateParams.dump_request(params)
unless parsed.fetch(:stream, true)
message = "Please use `#create` for the non-streaming use case."
raise ArgumentError.new(message)
end
parsed.store(:stream, true)
@client.request(
method: :post,
Expand Down
7 changes: 3 additions & 4 deletions lib/openai/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module OpenAI
# @example
# ```ruby
# stream.for_each do |event|
# stream.each do |event|
# puts(event)
# end
# ```
Expand All @@ -12,7 +12,6 @@ module OpenAI
# ```ruby
# events =
# stream
# .to_enum
# .lazy
# .select { _1.object_id.even? }
# .map(&:itself)
Expand All @@ -29,10 +28,10 @@ class Stream
# @return [Enumerable]
private def iterator
# rubocop:disable Metrics/BlockLength
@iterator ||= OpenAI::Util.chain_fused(@messages) do |y|
@iterator ||= OpenAI::Util.chain_fused(@stream) do |y|
consume = false

@messages.each do |msg|
@stream.each do |msg|
next if consume

case msg
Expand Down
10 changes: 6 additions & 4 deletions rbi/lib/openai/base_stream.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module OpenAI
module BaseStream
include Enumerable

Message = type_member(:in)
Elem = type_member(:out)

Expand All @@ -15,10 +17,10 @@ module OpenAI
end

sig { params(blk: T.proc.params(arg0: Elem).void).void }
def for_each(&blk)
def each(&blk)
end

sig { returns(T::Enumerable[Elem]) }
sig { returns(T::Enumerator[Elem]) }
def to_enum
end

Expand All @@ -31,11 +33,11 @@ module OpenAI
url: URI::Generic,
status: Integer,
response: Net::HTTPResponse,
messages: T::Enumerable[Message]
stream: T::Enumerable[Message]
)
.void
end
def initialize(model:, url:, status:, response:, messages:)
def initialize(model:, url:, status:, response:, stream:)
end
end
end
8 changes: 8 additions & 0 deletions rbi/lib/openai/resources/beta/threads.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ module OpenAI
),
top_p: T.nilable(Float),
truncation_strategy: T.nilable(OpenAI::Models::Beta::ThreadCreateAndRunParams::TruncationStrategy),
stream: T.noreturn,
request_options: T.nilable(T.any(OpenAI::RequestOptions, T::Hash[Symbol, T.anything]))
)
.returns(OpenAI::Models::Beta::Threads::Run)
Expand Down Expand Up @@ -227,6 +228,9 @@ module OpenAI
# Controls for how a thread will be truncated prior to the run. Use this to
# control the intial context window of the run.
truncation_strategy: nil,
# There is no need to provide `stream:`. Instead, use `#create_and_run_streaming`
# or `#create_and_run` for streaming and non-streaming use cases, respectively.
stream: false,
request_options: {}
)
end
Expand Down Expand Up @@ -264,6 +268,7 @@ module OpenAI
),
top_p: T.nilable(Float),
truncation_strategy: T.nilable(OpenAI::Models::Beta::ThreadCreateAndRunParams::TruncationStrategy),
stream: T.noreturn,
request_options: T.nilable(T.any(OpenAI::RequestOptions, T::Hash[Symbol, T.anything]))
)
.returns(
Expand Down Expand Up @@ -386,6 +391,9 @@ module OpenAI
# Controls for how a thread will be truncated prior to the run. Use this to
# control the intial context window of the run.
truncation_strategy: nil,
# There is no need to provide `stream:`. Instead, use `#create_and_run_streaming`
# or `#create_and_run` for streaming and non-streaming use cases, respectively.
stream: true,
request_options: {}
)
end
Expand Down
Loading