Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamz with websocket not steaming any data #474

Open
leahpence opened this issue Feb 3, 2024 · 5 comments
Open

Streamz with websocket not steaming any data #474

leahpence opened this issue Feb 3, 2024 · 5 comments

Comments

@leahpence
Copy link

leahpence commented Feb 3, 2024

I am trying to figure out a right way of streaming data using streamz with websocket. My streaming data is loaded using websocket but not streaming anything.

import config
import websocket, json
import pandas as pd
from streamz.dataframe import DataFrame
from streamz import Stream
import time

ticker_list = ["SCHG"]

# Create DataFrame with specified columns and ticker_list as index
df = pd.DataFrame(columns=['Bid', 'Ask', 'Time'], index=ticker_list)

source = Stream()

def update_dataframe(json_message, df):
    ticker_data = json_message[0]
    ticker_symbol = ticker_data['S']
    df.loc[ticker_symbol, 'Bid'] = ticker_data['bp']
    df.loc[ticker_symbol, 'Ask'] = ticker_data['ap']
    df.loc[ticker_symbol, 'Time'] = pd.to_datetime(ticker_data['t'], unit='s').tz_localize('UTC').tz_convert('America/New_York').strftime('%Y-%m-%d %H:%M:%S%z')

    source.emit(df.copy())  # Emit the updated DataFrame to the stream

def on_open(ws):
    print("opened connection")
    # Authentication
    auth_data = {"action":"auth","key": config.API_KEY,"secret": config.SECRET_KEY}

    # Subscribe
    ws.send(json.dumps(auth_data))
    
    listen_message = {"action":"subscribe","quotes":ticker_list}
    ws.send(json.dumps(listen_message))
    print('subscribed')       

def on_message(ws, message):
    json_message = json.loads(message)
    
    # Update the DataFrame and emit it to the stream
    update_dataframe(json_message, df)

def on_close(ws):
    print("closed connection")

def print_result(x):
    print(x)

sink = source.map(print_result)  # Set up a sink to consume and process the emitted data

socket = "wss://stream.data.alpaca.markets/v2/iex"
ws = websocket.WebSocketApp(socket, on_open=on_open, on_message=on_message, on_close=on_close)
ws.run_forever()
@martindurant
Copy link
Member

Are you saying that the on_open, on_message and on_close functions do run, but print_result does not?

You might also be interested in Stream.from_websocket which listens for incoming connections. A streamz native (async) client version would look very similar.

@leahpence
Copy link
Author

leahpence commented Feb 3, 2024

That is correct, I am not getting any data results window with streamz. Perhaps I am missing something on the way I use steamz code.....
When I am using python without streamz code all works good and print normal.

@martindurant
Copy link
Member

I am afraid I don't know. You are not using any async streamz stuff here, so the simplified code is essentially

source = Stream()
sink = source.map(print_result)
source.emit(df) # repeated for different df

which does indeed print.

@leahpence
Copy link
Author

Does Streamz work on Visual Studio Code.. or just in Jupiter Notebooks?

@martindurant
Copy link
Member

The auto-updating visual output is a jupyter-widget thing I don't know if VSCode shows those properly or not. streamz plotting via hvplot can work in a variety of browser platforms, including a standalone webapp; but, again, I don't know what VSCode does.

However, streamz's internals will continue to work anywhere, including a simple terminal, so if output is to a file, printed, or some other side-effect, those will work anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants