In [None]:
import weave
import typing
from weave import weaveflow
import openai

In [None]:
weave.init('wf-streamtest-1')

In [None]:
@weave.type()
class OpenAIChatModel:
    model: str = 'gpt-3.5-turbo'

    @weave.op()
    def complete(self, message: str) -> typing.Any:
        return openai.ChatCompletion.create(
            model=self.model,
            messages=[{'role': 'user', 'content': message}],
        )

    # Note this returns a generator
    @weave.op()
    def stream(self, message: str) -> typing.Any:
        return openai.ChatCompletion.create(
            model=self.model,
            messages=[{'role': 'user', 'content': message}],
            stream=True
        )

In [None]:
model = OpenAIChatModel()
# Fails if weave.init has been called, when we go to auto-publish the output, since we don't have a type for generator
for v in model.stream('hello'):
    print(v)

In [None]:
# OK so what we need to do:
# if we get a generator output,
#    wrap it in our own generator, that'll close the span when it's done instead of immediately
# we do this in the OpenAI wrapper already. need to extract that code out somehow

In [None]:
# Table streaming case
examples = weave.WeaveList(["hello", "who is the queen of england?"])
# Engine will wait for final value for you
# But this will fail since we don't have a type for the generator
examples.apply(lambda ex: model.stream(ex))

# we could make this work at some point
with weave.lazy():
    node = examples.apply(lambda ex: model.stream(ex))
    # final vector result not available til all streams done
    result_stream = weave.streaming_use(node)
# so here we get streaming updates for each cell in the result vector
# and then a final result for the vector
# ops will need to return a special generator wrapper that can produce the final
#   aggregated value
for ref, val in result_stream:
    print(ref, val)


# hmm... an alternative
#   what if each stream created a StreamTable, that has all the items in it.
#   we need a StreamTable.close method
#   the returned vector would be a vector of StreamTable

In [None]:
## Async case

In [None]:
@weave.op()
async def async_op(v: int) -> int:
    return v + 9

In [None]:
# Doesn't work today, since we don't have a type for the generator

In [None]:
async_op(5)

In [None]:
await async_op(5)