Skip to content

Commit

Permalink
Define the SessionManager protocol and improve our websocket reconnec…
Browse files Browse the repository at this point in the history
…t behavior (#5856)

This PR is the final merge for the feature/session-manager branch into develop. It aims to accomplish three main goals:

1. Define new SessionManager and SessionStorage abstractions, which will soon be useful for being able to
   configure how Streamlit runs across various different environments. Right now, our notion of a "session" is
   very closely tied to the life of a browser's websocket connection, and we also assume that Streamlit servers
   are long-lived + that server shutdowns cause us to lose data that isn't explicitly persisted outside of Streamlit.
   In the future, it's possible that many of these assumptions will be changing, so we want to define suitable
   abstractions to guide how we think about these changes. Note that we don't expect to get these abstractions
   perfectly correct on the first try, but we have some leeway here to make adjustments until we have a few
   implementations of `SessionManager` / `SessionStorage` aside from the ones used by default by the OS lib.

2. Write our first concrete implementations of SessionManager and SessionStorage: WebsocketSessionManager
   and MemorySessionStorage. These implementations retain most of the behavior that exists today, with one
   exception described below.

3. Use WebsocketSessionManager to fix a source of nondeterministic bugs where a browser tab's session state
   is immediately cleared when its websocket disconnects. To fix this, we teach WebsocketSessionManager to
   avoid immediately deleting a session's state/uploaded files/etc on websocket disconnect and instead defer
   the cleanup for a few minutes. This allows browser tabs that only transient disconnected (due to a network
   blip, load balancer timeout, etc.) to avoid losing all of their state.
  • Loading branch information
vdonato committed Jan 24, 2023
1 parent d9f318a commit d2bfc89
Show file tree
Hide file tree
Showing 29 changed files with 1,666 additions and 276 deletions.
30 changes: 30 additions & 0 deletions e2e/scripts/websocket_reconnects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import streamlit as st
from streamlit import runtime

# st.session_state can only be accessed while running with streamlit
if runtime.exists():
if "counter" not in st.session_state:
st.session_state.counter = 0

if st.button("click me!"):
st.session_state.counter += 1

st.write(f"count: {st.session_state.counter}")

# TODO(vdonato): Add st.file_uploader and st.camera_input tests once we're able to
# teach those widgets how to retrieve previously uploaded files after a session
# disconnect/reconnect.
52 changes: 52 additions & 0 deletions e2e/specs/websocket_reconnects.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

const INCREMENTS_PER_DISCONNECT = 5;
const NUM_DISCONNECTS = 20;

describe("websocket reconnects", () => {
beforeEach(() => {
cy.loadApp("http://localhost:3000/");
});

it("persists session state when the websocket connection is dropped and reconnects", () => {
let expectedCount = 0;

for (let i = 0; i < NUM_DISCONNECTS; i++) {
expectedCount += INCREMENTS_PER_DISCONNECT;

for (let j = 0; j < INCREMENTS_PER_DISCONNECT; j++) {
cy.get(".stButton button").contains("click me!").click();
}

cy.window().then((win) => {
setTimeout(() => {
win.streamlitDebug.disconnectWebsocket();
}, 100);
});

// Wait until we've disconnected.
cy.get("[data-testid='stStatusWidget']").should(
"have.text",
"Connecting"
);
// Wait until we've reconnected and rerun the script.
cy.get("[data-testid='stStatusWidget']").should("not.exist");

cy.get(".stMarkdown").contains(`count: ${expectedCount}`);
}
});
});
10 changes: 10 additions & 0 deletions frontend/src/App.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ describe("App", () => {
expect(wrapper.html()).not.toBeNull()
})

it("calls connectionManager.disconnect() when unmounting", () => {
const wrapper = getWrapper()
const instance = wrapper.instance() as App

wrapper.unmount()

// @ts-ignore
expect(instance.connectionManager.disconnect).toHaveBeenCalled()
})

it("reloads when streamlit server version changes", () => {
const props = getProps()
const wrapper = shallow(<App {...props} />)
Expand Down
17 changes: 17 additions & 0 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,23 @@ export class App extends PureComponent<Props, State> {
}
}

componentWillUnmount(): void {
// Needing to disconnect our connection manager + websocket connection is
// only needed here to handle the case in dev mode where react hot-reloads
// the client as a result of a source code change. In this scenario, the
// previous websocket connection is still connected, and the client and
// server end up in a reconnect loop because the server rejects attempts to
// connect to an already-connected session.
//
// This situation doesn't exist outside of dev mode because the whole App
// unmounting is either a page refresh or the browser tab closing.
//
// The optional chaining on connectionManager is needed to make typescript
// happy since connectionManager's type is `ConnectionManager | null`,
// but at this point it should always be set.
this.connectionManager?.disconnect()
}

showError(title: string, errorNode: ReactNode): void {
logError(errorNode)
const newDialog: DialogProps = {
Expand Down
8 changes: 7 additions & 1 deletion frontend/src/components/widgets/CameraInput/CameraInput.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,16 @@ class CameraInput extends React.PureComponent<Props, State> {
public componentDidUpdate = (prevProps: Props): void => {
const { element, widgetMgr } = this.props

// TODO(vdonato): Rework this now that there's a short window where the app
// may reconnect to the server without losing its uploaded files. Just
// removing the if statement below (to avoid resetting widget state on a
// disconnect) seemed to work, but I'm not entirely sure if it's a complete
// fix.
//
// Widgets are disabled if the app is not connected anymore.
// If the app disconnects from the server, a new session is created and users
// will lose access to the files they uploaded in their previous session.
// If we are reconnecting, reset the file uploader so that the widget is
// If we are reconnecting, reset the camera input so that the widget is
// in sync with the new session.
if (prevProps.disabled !== this.props.disabled && this.props.disabled) {
this.reset()
Expand Down
6 changes: 6 additions & 0 deletions frontend/src/components/widgets/FileUploader/FileUploader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ class FileUploader extends React.PureComponent<Props, State> {
public componentDidUpdate = (prevProps: Props): void => {
const { element, widgetMgr } = this.props

// TODO(vdonato): Rework this now that there's a short window where the app
// may reconnect to the server without losing its uploaded files. Just
// removing the if statement below (to avoid resetting widget state on a
// disconnect) seemed to work, but I'm not entirely sure if it's a complete
// fix.
//
// Widgets are disabled if the app is not connected anymore.
// If the app disconnects from the server, a new session is created and users
// will lose access to the files they uploaded in their previous session.
Expand Down
6 changes: 5 additions & 1 deletion frontend/src/lib/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ export class ConnectionManager {
}
}

disconnect(): void {
this.connection?.disconnect()
}

private setConnectionState = (
connectionState: ConnectionState,
errMsg?: string
Expand All @@ -146,7 +150,7 @@ export class ConnectionManager {
this.props.connectionStateChanged(connectionState)
}

if (errMsg || connectionState === ConnectionState.DISCONNECTED_FOREVER) {
if (errMsg) {
this.props.onConnectionError(errMsg || "unknown")
}
}
Expand Down
13 changes: 11 additions & 2 deletions frontend/src/lib/SessionInfo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ test("Throws an error when used before initialization", () => {
expect(() => SessionInfo.current).toThrow()
})

test("Clears session info", () => {
SessionInfo.current = new SessionInfo({
test("Saves SessionInfo.current to lastSessionInfo on clear", () => {
const sessionInfo = new SessionInfo({
appId: "aid",
sessionId: "sessionId",
streamlitVersion: "sv",
Expand All @@ -34,10 +34,19 @@ test("Clears session info", () => {
commandLine: "command line",
userMapboxToken: "mpt",
})

// @ts-ignore
SessionInfo.lastSessionInfo = "some older value"

SessionInfo.current = sessionInfo
expect(SessionInfo.isSet()).toBe(true)
// Also verify that lastSessionInfo is cleared when SessionInfo.current is
// set.
expect(SessionInfo.lastSessionInfo).toBe(undefined)

SessionInfo.clearSession()
expect(SessionInfo.isSet()).toBe(false)
expect(SessionInfo.lastSessionInfo).toBe(sessionInfo)
})

test("Can be initialized from a protobuf", () => {
Expand Down
8 changes: 8 additions & 0 deletions frontend/src/lib/SessionInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ export class SessionInfo {
*/
private static singleton?: SessionInfo

/**
* Our last SessionInfo singleton if there is no currently active session, or
* undefined if there is one.
*/
public static lastSessionInfo?: SessionInfo

public static get current(): SessionInfo {
if (!SessionInfo.singleton) {
throw new Error("Tried to use SessionInfo before it was initialized")
Expand All @@ -82,6 +88,7 @@ export class SessionInfo {
}

public static set current(sm: SessionInfo) {
SessionInfo.lastSessionInfo = undefined
SessionInfo.singleton = sm
}

Expand All @@ -94,6 +101,7 @@ export class SessionInfo {
}

public static clearSession(): void {
SessionInfo.lastSessionInfo = SessionInfo.singleton
SessionInfo.singleton = undefined
}

Expand Down
57 changes: 54 additions & 3 deletions frontend/src/lib/WebsocketConnection.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
*/

import axios from "axios"
import React, { Fragment } from "react"
import WS from "jest-websocket-mock"
import { zip } from "lodash"
import React, { Fragment } from "react"

import { BackMsg } from "src/autogen/proto"
import { ConnectionState } from "src/lib/ConnectionState"
import { SessionInfo } from "src/lib/SessionInfo"
import {
CORS_ERROR_MESSAGE_DOCUMENTATION_LINK,
StyledBashCode,
WebsocketConnection,
doInitPings,
} from "src/lib/WebsocketConnection"
import { zip } from "lodash"

const MOCK_ALLOWED_ORIGINS_RESPONSE = {
data: {
Expand Down Expand Up @@ -561,10 +562,22 @@ describe("WebsocketConnection", () => {
Promise.all = originalPromiseAll

// @ts-ignore
client.websocket.close()
if (client.websocket) {
// @ts-ignore
client.websocket.close()
}
server.close()
})

it("disconnect closes connection and sets state to DISCONNECTED_FOREVER", () => {
client.disconnect()

// @ts-ignore
expect(client.state).toBe(ConnectionState.DISCONNECTED_FOREVER)
// @ts-ignore
expect(client.websocket).toBe(undefined)
})

it("increments message cache run count", () => {
const incrementRunCountSpy = jest.spyOn(
// @ts-ignore
Expand Down Expand Up @@ -636,6 +649,8 @@ describe("WebsocketConnection auth token handling", () => {

afterEach(() => {
axios.get = originalAxiosGet

SessionInfo.lastSessionInfo = undefined
})

it("always sets first Sec-WebSocket-Protocol option to 'streamlit'", async () => {
Expand All @@ -661,6 +676,42 @@ describe("WebsocketConnection auth token handling", () => {
claimHostAuthToken: () => Promise.resolve("iAmAnAuthToken"),
resetHostAuthToken,
})

// @ts-ignore
await ws.connectToWebSocket()

expect(websocketSpy).toHaveBeenCalledWith(
"ws://localhost:1234/_stcore/stream",
["streamlit", "iAmAnAuthToken"]
)
})

it("sets second Sec-WebSocket-Protocol option to lastSessionId", async () => {
// @ts-ignore
SessionInfo.lastSessionInfo = { sessionId: "sessionId" }

const ws = new WebsocketConnection(MOCK_SOCKET_DATA)

// @ts-ignore
await ws.connectToWebSocket()

expect(websocketSpy).toHaveBeenCalledWith(
"ws://localhost:1234/_stcore/stream",
["streamlit", "sessionId"]
)
})

it("prioritizes host provided auth token over lastSessionId if both set", async () => {
// @ts-ignore
SessionInfo.lastSessionInfo = { sessionId: "sessionId" }

const resetHostAuthToken = jest.fn()
const ws = new WebsocketConnection({
...MOCK_SOCKET_DATA,
claimHostAuthToken: () => Promise.resolve("iAmAnAuthToken"),
resetHostAuthToken,
})

// @ts-ignore
await ws.connectToWebSocket()

Expand Down
Loading

0 comments on commit d2bfc89

Please sign in to comment.