Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions web-ui/src/components/player/video-player.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
goLiveTargetMse,
isNearLiveWallClock,
type LiveSessionAnchor,
lagBehindLiveEdge,
wallClockToMse,
} from "../../mpegts/player/wall-clock";
import mp2WasmUrl from "../../mpegts/wasm/minimp3/mp2_decoder.wasm?url";
Expand Down Expand Up @@ -156,6 +155,7 @@ export function VideoPlayer({
const slotBVideoRef = useRef<HTMLVideoElement>(null);
const slotAPlayerRef = useRef<Player | null>(null);
const slotBPlayerRef = useRef<Player | null>(null);
const slotLiveStateRef = useRef<Record<SlotId, boolean>>({ a: true, b: true });
const activeSlotIdRef = useRef<SlotId>("a");
const [visibleSlotId, setVisibleSlotId] = useState<SlotId>("a");
const transitionGenRef = useRef(0);
Expand All @@ -180,9 +180,7 @@ export function VideoPlayer({
const [isMuted, setIsMuted] = useState(() => getMuted());
const [isPlaying, setIsPlaying] = useState(false);
const [liveSessionAnchor, setLiveSessionAnchor] = useState<LiveSessionAnchor | null>(null);
// Before session calibration (initial load / reload), treat live mode as Live — not Go Live.
const isLive =
playMode === "live" && (liveSessionAnchor === null || lagBehindLiveEdge(liveSessionAnchor, currentVideoTime) < 3);
const [isLive, setIsLive] = useState(true);
const [needsUserInteraction, setNeedsUserInteraction] = useState(false);
const [showControls, setShowControls] = useState(true);
const [isPiP, setIsPiP] = useState(false);
Expand Down Expand Up @@ -251,9 +249,7 @@ export function VideoPlayer({

const goLiveToSessionEdge = useEffectEvent(() => {
if (!liveSessionAnchor) return;
const video = getActiveVideo();
const currentTime = video?.currentTime ?? 0;
const targetMse = goLiveTargetMse(liveSessionAnchor, defaultConfig.liveSyncTargetLatency, currentTime);
const targetMse = goLiveTargetMse(liveSessionAnchor, defaultConfig.liveSyncTargetLatency);
getActivePlayer()?.goLive(targetMse);
getActivePlayer()?.setLiveSync(true);
});
Expand Down Expand Up @@ -390,6 +386,7 @@ export function VideoPlayer({
// Hard switch: reveal new stream first, then tear down the old slot
activeSlotIdRef.current = newActiveId;
setVisibleSlotId(newActiveId);
setIsLive(slotLiveStateRef.current[newActiveId]);
setIsLoading(false);

if (oldActiveId !== newActiveId && oldPlayer) {
Expand Down Expand Up @@ -572,6 +569,17 @@ export function VideoPlayer({
handleSeekNeeded(seconds);
}
});
p.on("live-state-change", (live) => {
if (slotPlayerRef(slotId).current !== p) return;
slotLiveStateRef.current[slotId] = live;
if (slotId === getActiveSlotId()) {
setIsLive(live);
const video = slotVideoRef(slotId).current;
if (!live && video?.paused && playMode === "live") {
p.setLiveSync(false);
}
}
});
p.on("audio-suspended", () => {
if (slotPlayerRef(slotId).current === p) {
handleAudioSuspended();
Expand Down
8 changes: 8 additions & 0 deletions web-ui/src/mpegts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export function createPlayer(video: HTMLVideoElement, config?: Partial<PlayerCon

const errorHandlers = new Set<(e: PlayerError) => void>();
const seekHandlers = new Set<(s: number) => void>();
const liveStateHandlers = new Set<(isLive: boolean) => void>();
const audioSuspendedHandlers = new Set<() => void>();

let impl: PlayerImpl | null = null;
Expand All @@ -52,6 +53,11 @@ export function createPlayer(video: HTMLVideoElement, config?: Partial<PlayerCon
h(e);
}
};
impl.onLiveStateChange = (isLive) => {
for (const h of liveStateHandlers) {
h(isLive);
}
};
impl.onAudioSuspended = () => {
for (const h of audioSuspendedHandlers) {
h();
Expand Down Expand Up @@ -97,12 +103,14 @@ export function createPlayer(video: HTMLVideoElement, config?: Partial<PlayerCon
on<K extends keyof PlayerEventMap>(event: K, handler: PlayerEventMap[K]) {
if (event === "error") errorHandlers.add(handler as (e: PlayerError) => void);
if (event === "seek-needed") seekHandlers.add(handler as (s: number) => void);
if (event === "live-state-change") liveStateHandlers.add(handler as (isLive: boolean) => void);
if (event === "audio-suspended") audioSuspendedHandlers.add(handler as () => void);
},

off<K extends keyof PlayerEventMap>(event: K, handler: PlayerEventMap[K]) {
if (event === "error") errorHandlers.delete(handler as (e: PlayerError) => void);
if (event === "seek-needed") seekHandlers.delete(handler as (s: number) => void);
if (event === "live-state-change") liveStateHandlers.delete(handler as (isLive: boolean) => void);
if (event === "audio-suspended") audioSuspendedHandlers.delete(handler as () => void);
},
};
Expand Down
113 changes: 72 additions & 41 deletions web-ui/src/mpegts/io/fetch-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ interface LoaderRange {
to: number;
}

type ResumeMode = "range" | "restart";

interface FetchLoaderOptions {
resumeMode?: ResumeMode;
}

interface FetchRequestContext {
abortController: AbortController;
contentLength: number | null;
receivedLength: number;
}

enum LoaderStatus {
kIdle = 0,
kConnecting = 1,
Expand All @@ -55,6 +67,7 @@ class FetchLoader {
// --- public callbacks (set by consumer, e.g. TSDemuxer) ---
onDataArrival: ((data: Uint8Array, byteStart: number) => number) | null;
onSeeked: (() => void) | null;
onRestarted: (() => void) | null;
onError: ((type: string, info: LoaderErrorInfo) => void) | null;
onComplete: ((extraData: unknown) => void) | null;
/** Called with the playlist text and its final (post-redirect) URL when the response is an HLS playlist. */
Expand All @@ -64,6 +77,7 @@ class FetchLoader {
private _config: PlayerConfig;
private _dataSource: DataSource | null;
private _extraData: unknown;
private _resumeMode: ResumeMode;

// --- stash buffer ---
private _stashUsed: number;
Expand All @@ -80,15 +94,13 @@ class FetchLoader {

// --- fetch internals ---
private _status: LoaderStatus;
private _requestAbort: boolean;
private _abortController: AbortController | null;
private _contentLength: number | null;
private _receivedLength: number;

constructor(dataSource: DataSource, config: PlayerConfig, extraData?: unknown) {
constructor(dataSource: DataSource, config: PlayerConfig, extraData?: unknown, options: FetchLoaderOptions = {}) {
this._config = config;
this._dataSource = dataSource;
this._extraData = extraData;
this._resumeMode = options.resumeMode ?? "range";

// stash buffer setup
this._stashUsed = 0;
Expand All @@ -104,14 +116,12 @@ class FetchLoader {

// fetch state
this._status = LoaderStatus.kIdle;
this._requestAbort = false;
this._abortController = null;
this._contentLength = null;
this._receivedLength = 0;

// callbacks
this.onDataArrival = null;
this.onSeeked = null;
this.onRestarted = null;
this.onError = null;
this.onComplete = null;
this.onHLSDetected = null;
Expand All @@ -130,6 +140,7 @@ class FetchLoader {

this.onDataArrival = null;
this.onSeeked = null;
this.onRestarted = null;
this.onError = null;
this.onComplete = null;
this.onHLSDetected = null;
Expand Down Expand Up @@ -196,7 +207,11 @@ class FetchLoader {
this._paused = false;
const bytes = this._resumeFrom;
this._resumeFrom = 0;
this._internalSeek(bytes);
if (this._resumeMode === "restart") {
this._internalRestart();
} else {
this._internalSeek(bytes);
}
}
}

Expand All @@ -221,9 +236,11 @@ class FetchLoader {
// --- fetch + ReadableStream logic (inlined FetchStreamLoader) ------------

private _startFetch(range: LoaderRange): void {
this._requestAbort = false;
this._contentLength = null;
this._receivedLength = 0;
const request: FetchRequestContext = {
abortController: new self.AbortController(),
contentLength: null,
receivedLength: 0,
};

const dataSource = this._dataSource as DataSource;
const sourceURL = dataSource.url;
Expand Down Expand Up @@ -264,18 +281,15 @@ class FetchLoader {
params.referrerPolicy = dataSource.referrerPolicy;
}

if (self.AbortController) {
this._abortController = new self.AbortController();
params.signal = this._abortController.signal;
}
this._abortController = request.abortController;
params.signal = request.abortController.signal;

this._status = LoaderStatus.kConnecting;

self
.fetch(seekConfig.url, params)
.then((res: Response) => {
if (this._requestAbort) {
this._status = LoaderStatus.kIdle;
if (this._isRequestAborted(request)) {
res.body?.cancel();
return;
}
Expand All @@ -287,7 +301,7 @@ class FetchLoader {
this._status = LoaderStatus.kIdle;
// Read the body so the already-fetched playlist can be reused (avoids a duplicate request)
return res.text().then((text) => {
if (!this._requestAbort) {
if (!this._isRequestAborted(request)) {
this.onHLSDetected?.(text, res.url || sourceURL);
}
});
Expand All @@ -298,11 +312,11 @@ class FetchLoader {
if (lengthHeader != null) {
const cl = parseInt(lengthHeader, 10);
if (cl !== 0) {
this._contentLength = cl;
request.contentLength = cl;
}
}

return this._pump((res.body as ReadableStream<Uint8Array>).getReader(), range);
return this._pump((res.body as ReadableStream<Uint8Array>).getReader(), range, request);
} else {
this._status = LoaderStatus.kError;
const errInfo: LoaderErrorInfo = { code: res.status, msg: res.statusText };
Expand All @@ -314,7 +328,7 @@ class FetchLoader {
}
})
.catch((e: unknown) => {
if (this._abortController?.signal.aborted) {
if (this._isRequestAborted(request)) {
return;
}

Expand All @@ -329,42 +343,45 @@ class FetchLoader {
});
}

private _pump(reader: ReadableStreamDefaultReader<Uint8Array>, range: LoaderRange): Promise<void> {
private _isRequestAborted(request: FetchRequestContext): boolean {
return request.abortController.signal.aborted;
}

private _pump(
reader: ReadableStreamDefaultReader<Uint8Array>,
range: LoaderRange,
request: FetchRequestContext,
): Promise<void> {
return reader
.read()
.then((result: ReadableStreamReadResult<Uint8Array>) => {
if (this._isRequestAborted(request)) {
return;
}

if (result.done) {
if (this._contentLength !== null && this._receivedLength < this._contentLength) {
if (request.contentLength !== null && request.receivedLength < request.contentLength) {
this._status = LoaderStatus.kError;
const info: LoaderErrorInfo = { code: -1, msg: "Fetch stream meet Early-EOF" };
this._handleLoaderError(LoaderErrors.EARLY_EOF, info);
} else {
this._status = LoaderStatus.kComplete;
this._onFetchComplete(range.from, range.from + this._receivedLength - 1);
this._onFetchComplete(range.from, range.from + request.receivedLength - 1);
}
} else {
if (this._abortController?.signal.aborted) {
this._status = LoaderStatus.kComplete;
return;
} else if (this._requestAbort === true) {
this._status = LoaderStatus.kComplete;
return reader.cancel() as unknown as undefined;
}

this._status = LoaderStatus.kBuffering;

const chunk = result.value as Uint8Array;
const byteStart = range.from + this._receivedLength;
this._receivedLength += chunk.byteLength;
const byteStart = range.from + request.receivedLength;
request.receivedLength += chunk.byteLength;

this._onFetchChunkArrival(chunk, byteStart);

this._pump(reader, range);
this._pump(reader, range, request);
}
})
.catch((e: unknown) => {
if (this._abortController?.signal.aborted) {
this._status = LoaderStatus.kComplete;
if (this._isRequestAborted(request)) {
return;
}

Expand All @@ -378,7 +395,8 @@ class FetchLoader {

if (
(errCode === 19 || errMsg === "network error") &&
(this._contentLength === null || (this._contentLength !== null && this._receivedLength < this._contentLength))
(request.contentLength === null ||
(request.contentLength !== null && request.receivedLength < request.contentLength))
) {
type = LoaderErrors.EARLY_EOF;
info = { code: errCode, msg: "Fetch stream meet Early-EOF" };
Expand All @@ -392,8 +410,6 @@ class FetchLoader {
}

private _abortFetch(): void {
this._requestAbort = true;

if (this._abortController) {
try {
this._abortController.abort();
Expand All @@ -416,14 +432,29 @@ class FetchLoader {
const requestRange: LoaderRange = { from: bytes, to: -1 };
this._currentRange = { from: requestRange.from, to: -1 };

this._requestAbort = false;
this._startFetch(requestRange);

if (this.onSeeked) {
this.onSeeked();
}
}

private _internalRestart(): void {
if (this._status === LoaderStatus.kConnecting || this._status === LoaderStatus.kBuffering) {
this._abortFetch();
}

// Flush old-stream stash before the caller marks the next bytes as a new TS input boundary.
Comment thread
stackia marked this conversation as resolved.
this._flushStashBuffer(true);

this.onRestarted?.();

const requestRange: LoaderRange = { from: 0, to: -1 };
this._currentRange = { from: requestRange.from, to: -1 };

this._startFetch(requestRange);
}

// --- stash buffer management (from IOController) -------------------------

private _expandBuffer(expectedBytes: number): void {
Expand Down
Loading
Loading