Skip to content

Commit

Permalink
api: Stop triggering isActive clean-up from GETs
Browse files Browse the repository at this point in the history
Still monkey-patch responses to keep the same UX,
but the actual recordings processing will only be
triggered once the active clean-up job is triggered.
  • Loading branch information
victorges committed Apr 26, 2024
1 parent 5457512 commit f70dfe3
Showing 1 changed file with 53 additions and 40 deletions.
93 changes: 53 additions & 40 deletions packages/api/src/controllers/stream.ts
Expand Up @@ -297,7 +297,7 @@ export function getFLVPlaybackUrl(ingest: string, stream: DBStream) {
* Returns whether the stream is currently tagged as active but hasn't been
* updated in a long time and thus should be cleaned up.
*/
function shouldActiveCleanup(stream: DBStream | DBSession) {
function shouldCleanUpIsActive(stream: DBStream | DBSession) {
const isActive = "isActive" in stream ? stream.isActive : true; // sessions don't have `isActive` field so we just assume `true`
return (
isActive &&
Expand All @@ -306,13 +306,34 @@ function shouldActiveCleanup(stream: DBStream | DBSession) {
);
}

function activeCleanupOne(
/**
* Creates an updated stream object with a fixed `isActive` field in case it is lost and needs clean up.
*/
function withFixedIsActive(stream: DBStream) {
return {
...stream,
isActive: stream.isActive && !shouldCleanUpIsActive(stream),
};
}

/**
* Calls {@link withFixedIsActive} on multiple streams, optionally filtering to only the really active streams.
*/
function withFixedIsActiveMany(streams: DBStream[], filter = false) {
streams = streams.map(withFixedIsActive);
if (filter) {
streams = streams.filter((s) => s.isActive);
}
return streams;
}

function triggerCleanUpIsActiveJob(
config: CliArgs,
stream: DBStream,
queue: Queue,
ingest: string
) {
if (!shouldActiveCleanup(stream)) {
if (!shouldCleanUpIsActive(stream)) {
return false;
}

Expand Down Expand Up @@ -344,23 +365,9 @@ function activeCleanupOne(
logger.error("Error sending /setactive hooks err=", err);
}
});

stream.isActive = false;
return true;
}

function activeCleanup(
config: CliArgs,
streams: DBStream[],
queue: Queue,
ingest: string
) {
for (const stream of streams) {
activeCleanupOne(config, stream, queue, ingest);
}
return streams;
}

async function getIngestBase(req: Request) {
const ingests = await req.getIngest();
if (!ingests.length) {
Expand Down Expand Up @@ -474,22 +481,19 @@ app.get("/", authorizer({}), async (req, res) => {
},
});

const ingest = await getIngestBase(req);
res.status(200);

if (newCursor) {
res.links({ next: makeNextHREF(req, newCursor) });
}

output = db.stream.addDefaultFieldsMany(
db.stream.removePrivateFieldsMany(output, req.user.admin)
const filter = active && active !== "false";
output = withFixedIsActiveMany(
db.stream.addDefaultFieldsMany(
db.stream.removePrivateFieldsMany(output, req.user.admin)
),
filter
);
output = activeCleanup(req.config, output, req.queue, ingest);
if (active) {
output = output.filter((s) => s.isActive); // activeCleanup monkey patches the stream object
}

res.json(output);
res.status(200).json(output);
});

export async function getRecordingPlaybackUrl(
Expand Down Expand Up @@ -733,13 +737,10 @@ app.get("/user/:userId", authorizer({}), async (req, res) => {
res.links({ next: makeNextHREF(req, newCursor) });
}
res.json(
activeCleanup(
req.config,
withFixedIsActiveMany(
db.stream.addDefaultFieldsMany(
db.stream.removePrivateFieldsMany(streams, req.user.admin)
),
req.queue,
ingest
)
)
);
});
Expand All @@ -756,7 +757,8 @@ app.get("/:id", authorizer({}), async (req, res) => {
res.status(404);
return res.json({ errors: ["not found"] });
}
activeCleanupOne(req.config, stream, req.queue, await getIngestBase(req));
stream = withFixedIsActive(stream);

// fixup 'user' session
if (!raw && stream.lastSessionId) {
const lastSession = await db.stream.get(stream.lastSessionId);
Expand Down Expand Up @@ -1176,7 +1178,7 @@ app.post("/:id/lockPull", authorizer({ anyAdmin: true }), async (req, res) => {

// We have an issue that some of the streams/sessions are not marked as inactive when they should be.
// This is a workaround to clean up the stream in the background
const doingActiveCleanup = activeCleanupOne(
const doingActiveCleanup = triggerCleanUpIsActiveJob(
req.config,
stream,
req.queue,
Expand Down Expand Up @@ -1540,8 +1542,8 @@ async function triggerSessionRecordingHooks(
}

const session = await db.session.get(sessionId);
if (isCleanup && !shouldActiveCleanup(session)) {
// The {activeCleanupOne} logic only checks the parent stream, so we need
if (isCleanup && !shouldCleanUpIsActive(session)) {
// The {activeCleanup} logic only checks the parent stream, so we need
// to recheck the sessions here to avoid spamming active sessions.
continue;
}
Expand Down Expand Up @@ -1954,7 +1956,8 @@ app.get("/:id/info", authorizer({}), async (req, res) => {
errors: ["not found"],
});
}
activeCleanupOne(req.config, stream, req.queue, await getIngestBase(req));
stream = withFixedIsActive(stream);

if (!session) {
// find last session
session = await db.stream.getLastSession(stream.id);
Expand Down Expand Up @@ -2119,11 +2122,21 @@ app.post(

const ingest = await getIngestBase(req);

streams = activeCleanup(req.config, streams, req.queue, ingest);
streams = streams.filter((s) => !s.isActive); // activeCleanup monkey patches the stream objects
const cleanedUp: DBStream[] = [];
for (const stream of streams) {
const cleaned = triggerCleanUpIsActiveJob(
req.config,
stream,
req.queue,
ingest
);
if (cleaned) {
cleanedUp.push(stream);
}
}

res.status(200);
res.json({ cleanedUp: streams });
res.json({ cleanedUp });
}
);

Expand Down

0 comments on commit f70dfe3

Please sign in to comment.