Skip to content
82 changes: 52 additions & 30 deletions app/Services/SharedStreamService.php
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,10 @@ public function removeClient(string $streamKey, string $clientId): void
Log::channel('ffmpeg')->warning("Client {$clientId} not found in database for stream {$streamKey}");
}

// Remove client cursor
$cursorKey = "client_cursors:{$streamKey}";
$this->redis()->hdel($cursorKey, $clientId);

// Get current active client count from database
$activeClientCount = SharedStreamClient::where('stream_id', $streamKey)
->where('status', 'connected')
Expand Down Expand Up @@ -1430,8 +1434,11 @@ public function getAllActiveStreams(): array
$uptime = $sharedStream->started_at ?
now()->diffInSeconds($sharedStream->started_at) : 0;

// Get stream info from model
// Get stream info from model, ensuring it's an array
$streamInfo = $sharedStream->stream_info ?: [];
if (!is_array($streamInfo)) {
$streamInfo = [];
}
$streamInfo['stream_key'] = $streamKey;
$streamInfo['status'] = $sharedStream->status;
$streamInfo['pid'] = $sharedStream->process_id;
Expand Down Expand Up @@ -1495,24 +1502,30 @@ public function getAllActiveClients(): array
public function cleanupOldBufferSegments(string $streamKey): int
{
try {
$cursorKey = "client_cursors:{$streamKey}";
$cursors = $this->redis()->hgetall($cursorKey);

if (empty($cursors)) {
return 0;
}

$minSegment = min(array_values($cursors));

$bufferKey = self::BUFFER_PREFIX . $streamKey;
$segmentNumbers = $this->redis()->lrange("{$bufferKey}:segments", 0, -1);

$cleaned = 0;
$keepCount = 50; // Keep only the most recent 50 segments

// Ensure we have an array before processing
if (is_array($segmentNumbers) && count($segmentNumbers) > $keepCount) {
$toRemove = array_slice($segmentNumbers, $keepCount);
foreach ($toRemove as $segmentNumber) {
$segmentKey = "{$bufferKey}:segment_{$segmentNumber}";
if ($this->redis()->del($segmentKey)) {
$cleaned++;
if (is_array($segmentNumbers)) {
foreach ($segmentNumbers as $segmentNumber) {
if ($segmentNumber < $minSegment) {
$segmentKey = "{$bufferKey}:segment_{$segmentNumber}";
if ($this->redis()->del($segmentKey)) {
$this->redis()->lrem("{$bufferKey}:segments", 0, $segmentNumber);
$cleaned++;
}
}
}

// Trim the segments list
$this->redis()->ltrim("{$bufferKey}:segments", 0, $keepCount - 1);
}

return $cleaned;
Expand Down Expand Up @@ -1830,30 +1843,35 @@ public function isProcessRunning(?int $pid): bool
if (!$pid) {
return false;
}
if (!function_exists('posix_getpgid')) {
// Fallback for non-POSIX systems (e.g., Windows)
try {
// Use ps to check process status (works on both Linux and macOS)
$output = shell_exec("ps -p {$pid} -o stat= 2>/dev/null");

try {
// Use ps to check process status (works on both Linux and macOS)
$output = shell_exec("ps -p {$pid} -o stat= 2>/dev/null");
if (empty(trim($output))) {
// Process doesn't exist
return false;
}

if (empty(trim($output))) {
// Process doesn't exist
return false;
}
$stat = trim($output);
// Check for zombie or dead processes
// Z = zombie, X = dead on most systems
if (preg_match('/^[ZX]/', $stat)) {
Log::channel('ffmpeg')->debug("Process {$pid} exists but is in state '{$stat}' (zombie/dead)");
return false;
}

$stat = trim($output);
// Check for zombie or dead processes
// Z = zombie, X = dead on most systems
if (preg_match('/^[ZX]/', $stat)) {
Log::channel('ffmpeg')->debug("Process {$pid} exists but is in state '{$stat}' (zombie/dead)");
// Process exists and is not zombie/dead
return true;
} catch (\Exception $e) {
Log::channel('ffmpeg')->error("Error checking if process {$pid} is running: " . $e->getMessage());
return false;
}

// Process exists and is not zombie/dead
return true;
} catch (\Exception $e) {
Log::channel('ffmpeg')->error("Error checking if process {$pid} is running: " . $e->getMessage());
return false;
}
// Check if the process is running using posix_getpgid
// this is a more reliable way to check if a process is running
return posix_getpgid($pid) !== false;
}

/**
Expand Down Expand Up @@ -2266,6 +2284,9 @@ public function getNextStreamSegments(string &$streamKey, string $clientId, int
}
}

$cursorKey = "client_cursors:{$streamKey}";
$lastSegment = (int)$this->redis()->hget($cursorKey, $clientId) ?: -1;

$bufferKey = self::BUFFER_PREFIX . $streamKey;
$segmentNumbers = $this->redis()->lrange("{$bufferKey}:segments", 0, -1);

Expand Down Expand Up @@ -2300,6 +2321,7 @@ public function getNextStreamSegments(string &$streamKey, string $clientId, int

// If data is not empty, track bandwidth and update activity
if (!empty($data)) {
$this->redis()->hset($cursorKey, $clientId, $lastSegment);
$this->trackBandwidth($streamKey, strlen($data));
$this->updateStreamActivity($streamKey);
$this->updateClientActivity($streamKey, $clientId);
Expand Down
Empty file modified rebuild.sh
100644 → 100755
Empty file.
Empty file modified refresh.sh
100644 → 100755
Empty file.
67 changes: 67 additions & 0 deletions tests/Unit/SharedStreamServiceTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,21 @@ public function it_handles_process_validation_safely()
$this->assertFalse($result);
}

/** @test */
public function it_checks_if_process_is_running_correctly()
{
if (!function_exists('posix_getpgid')) {
$this->markTestSkipped('POSIX functions not available.');
}

// Test with a valid process ID (the current process)
$pid = getmypid();
$this->assertTrue($this->sharedStreamService->isProcessRunning($pid));

// Test with an invalid process ID
$this->assertFalse($this->sharedStreamService->isProcessRunning(99999));
}

/** @test */
public function it_maintains_consistent_cache_prefix_usage()
{
Expand Down Expand Up @@ -1264,4 +1279,56 @@ public function it_handles_multiple_failover_attempts()
$result = $method->invoke($this->sharedStreamService, $streamKey, $streamInfo);
$this->assertNull($result, 'Should return null when channel not found');
}

/** @test */
public function it_handles_client_specific_cursors_and_segment_cleanup()
{
$streamKey = 'test_stream_cursors';
$clientId1 = 'client1';
$clientId2 = 'client2';

// 1. Simulate some segments in the buffer
$bufferKey = SharedStreamService::BUFFER_PREFIX . $streamKey;
for ($i = 0; $i < 10; $i++) {
Redis::rpush("{$bufferKey}:segments", $i);
Redis::set("{$bufferKey}:segment_{$i}", "segment_data_{$i}");
}

// 2. Client 1 requests segments
$lastSegment1 = -1;
$data1 = $this->sharedStreamService->getNextStreamSegments($streamKey, $clientId1, $lastSegment1);
$this->assertStringContainsString('segment_data_0', $data1);
$this->assertStringContainsString('segment_data_4', $data1);
$this->assertEquals(4, $lastSegment1);

// 3. Client 2 requests segments
$lastSegment2 = -1;
$data2 = $this->sharedStreamService->getNextStreamSegments($streamKey, $clientId2, $lastSegment2);
$this->assertStringContainsString('segment_data_0', $data2);
$this->assertStringContainsString('segment_data_4', $data2);
$this->assertEquals(4, $lastSegment2);

// 4. Client 1 requests more segments
$data1 = $this->sharedStreamService->getNextStreamSegments($streamKey, $clientId1, $lastSegment1);
$this->assertStringContainsString('segment_data_5', $data1);
$this->assertStringContainsString('segment_data_9', $data1);
$this->assertEquals(9, $lastSegment1);

// 5. Check cursors in Redis
$cursorKey = "client_cursors:{$streamKey}";
$cursors = Redis::hgetall($cursorKey);
$this->assertEquals(9, $cursors[$clientId1]);
$this->assertEquals(4, $cursors[$clientId2]);

// 6. Clean up old segments
$this->sharedStreamService->cleanupOldBufferSegments($streamKey);

// 7. Verify that segments consumed by both clients are deleted
$this->assertFalse(Redis::exists("{$bufferKey}:segment_0"));
$this->assertFalse(Redis::exists("{$bufferKey}:segment_1"));
$this->assertFalse(Redis::exists("{$bufferKey}:segment_2"));
$this->assertFalse(Redis::exists("{$bufferKey}:segment_3"));
$this->assertTrue(Redis::exists("{$bufferKey}:segment_4"));
$this->assertTrue(Redis::exists("{$bufferKey}:segment_5"));
}
}