diff --git a/.gitignore b/.gitignore index ee93f64..01a304e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,7 @@ dist/ .DS_Store coverage/ *.log +downloads/ +hlsDownloads/ +server/logs/ +*.log \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f897327 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,53 @@ +services: + # ── Redis ──────────────────────────────────────────────── + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + + # ── API Server ─────────────────────────────────────────── + api: + build: + context: ./server + dockerfile: Dockerfile + target: api + ports: + - "${PORT:-8080}:${PORT:-8080}" + env_file: + - ./server/.env + environment: + - IOREDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + restart: unless-stopped + + # ── Workers (transcode + HLS) ─────────────────────────── + worker: + build: + context: ./server + dockerfile: Dockerfile + target: worker + env_file: + - ./server/.env + environment: + - IOREDIS_URL=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + restart: unless-stopped + # Workers may need more memory for FFmpeg transcoding + deploy: + resources: + limits: + memory: 2G + +volumes: + redis_data: diff --git a/server/.dockerignore b/server/.dockerignore new file mode 100644 index 0000000..95ef023 --- /dev/null +++ b/server/.dockerignore @@ -0,0 +1,12 @@ +node_modules +dist +downloads +hlsDownloads +logs +.env +.env.* +!.env.example +.DS_Store +*.log +.git +.Dockerfile diff --git a/server/Dockerfile b/server/Dockerfile new file mode 100644 index 0000000..6bcfc33 --- /dev/null +++ b/server/Dockerfile @@ -0,0 +1,49 @@ +# ── Stage 1: Install deps & build ────────────────────── +FROM node:22-alpine AS builder + +WORKDIR /app + +# Install dependencies first (layer cache) +COPY package.json package-lock.json ./ +RUN npm ci + +# Copy source and build TypeScript +COPY tsconfig.json ./ +COPY src/ ./src/ +RUN npm i typescript +RUN npm run build + + +# ── Stage 2: Production deps only ───────────────────── +FROM node:22-alpine AS prod-deps + +WORKDIR /app + +COPY package.json package-lock.json ./ +RUN npm ci --omit=dev && npm cache clean --force + + +# ── Stage 3a: API server (lightweight — no FFmpeg) ───── +FROM node:22-alpine AS api + +WORKDIR /app + +COPY --from=prod-deps /app/node_modules ./node_modules +COPY --from=builder /app/dist ./dist +COPY package.json ./ + +CMD ["node", "dist/index.js"] + +# ── Stage 3b: Worker (with FFmpeg for transcoding) ───── +FROM node:22-alpine AS worker + +# Only workers need FFmpeg +RUN apk add --no-cache ffmpeg + +WORKDIR /app + +COPY --from=prod-deps /app/node_modules ./node_modules +COPY --from=builder /app/dist ./dist +COPY package.json ./ + +CMD ["node", "dist/worker.js"] diff --git a/server/package-lock.json b/server/package-lock.json index 64a259c..b09ce33 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -12,14 +12,6 @@ "@aws-sdk/client-s3": "^3.1000.0", "@aws-sdk/lib-storage": "^3.1000.0", "@aws-sdk/s3-request-presigner": "^3.1000.0", - "@types/axios": "^0.9.36", - "@types/bcrypt": "^6.0.0", - "@types/body-parser": "^1.19.6", - "@types/cors": "^2.8.19", - "@types/dotenv": "^6.1.1", - "@types/express": "^5.0.6", - "@types/ioredis": "^4.28.10", - "@types/jsonwebtoken": "^9.0.10", "axios": "^1.13.6", "bcrypt": "^6.0.0", "body-parser": "^2.2.2", @@ -27,10 +19,20 @@ "cors": "^2.8.6", "dotenv": "^17.3.1", "express": "^5.2.1", + "fluent-ffmpeg": "^2.1.3", "ioredis": "^5.10.1", "jsonwebtoken": "^9.0.3" }, "devDependencies": { + "@types/axios": "^0.9.36", + "@types/bcrypt": "^6.0.0", + "@types/body-parser": "^1.19.6", + "@types/cors": "^2.8.19", + "@types/dotenv": "^6.1.1", + "@types/express": "^5.0.6", + "@types/fluent-ffmpeg": "^2.1.28", + "@types/ioredis": "^4.28.10", + "@types/jsonwebtoken": "^9.0.10", "nodemon": "^3.1.14" } }, @@ -1758,12 +1760,14 @@ "version": "0.9.36", "resolved": "https://registry.npmjs.org/@types/axios/-/axios-0.9.36.tgz", "integrity": "sha512-NLOpedx9o+rxo/X5ChbdiX6mS1atE4WHmEEIcR9NLenRVa5HoVjAvjafwU3FPTqnZEstpoqCaW7fagqSoTDNeg==", + "dev": true, "license": "MIT" }, "node_modules/@types/bcrypt": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/@types/bcrypt/-/bcrypt-6.0.0.tgz", "integrity": "sha512-/oJGukuH3D2+D+3H4JWLaAsJ/ji86dhRidzZ/Od7H/i8g+aCmvkeCc6Ni/f9uxGLSQVCRZkX2/lqEFG2BvWtlQ==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -1773,6 +1777,7 @@ "version": "1.19.6", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.6.tgz", "integrity": "sha512-HLFeCYgz89uk22N5Qg3dvGvsv46B8GLvKKo1zKG4NybA8U2DiEO3w9lqGg29t/tfLRJpJ6iQxnVw4OnB7MoM9g==", + "dev": true, "license": "MIT", "dependencies": { "@types/connect": "*", @@ -1783,6 +1788,7 @@ "version": "3.4.38", "resolved": "https://registry.npmjs.org/@types/connect/-/connect-3.4.38.tgz", "integrity": "sha512-K6uROf1LD88uDQqJCktA4yzL1YYAK6NgfsI0v/mTgyPKWsX1CnJ0XPSDhViejru1GcRkLWb8RlzFYJRqGUbaug==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -1792,6 +1798,7 @@ "version": "2.8.19", "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.19.tgz", "integrity": "sha512-mFNylyeyqN93lfe/9CSxOGREz8cpzAhH+E93xJ4xWQf62V8sQ/24reV2nyzUWM6H6Xji+GGHpkbLe7pVoUEskg==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -1801,6 +1808,7 @@ "version": "6.1.1", "resolved": "https://registry.npmjs.org/@types/dotenv/-/dotenv-6.1.1.tgz", "integrity": "sha512-ftQl3DtBvqHl9L16tpqqzA4YzCSXZfi7g8cQceTz5rOlYtk/IZbFjAv3mLOQlNIgOaylCQWQoBdDQHPgEBJPHg==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -1810,6 +1818,7 @@ "version": "5.0.6", "resolved": "https://registry.npmjs.org/@types/express/-/express-5.0.6.tgz", "integrity": "sha512-sKYVuV7Sv9fbPIt/442koC7+IIwK5olP1KWeD88e/idgoJqDm3JV/YUiPwkoKK92ylff2MGxSz1CSjsXelx0YA==", + "dev": true, "license": "MIT", "dependencies": { "@types/body-parser": "*", @@ -1821,6 +1830,7 @@ "version": "5.1.1", "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-5.1.1.tgz", "integrity": "sha512-v4zIMr/cX7/d2BpAEX3KNKL/JrT1s43s96lLvvdTmza1oEvDudCqK9aF/djc/SWgy8Yh0h30TZx5VpzqFCxk5A==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*", @@ -1829,16 +1839,28 @@ "@types/send": "*" } }, + "node_modules/@types/fluent-ffmpeg": { + "version": "2.1.28", + "resolved": "https://registry.npmjs.org/@types/fluent-ffmpeg/-/fluent-ffmpeg-2.1.28.tgz", + "integrity": "sha512-5ovxsDwBcPfJ+eYs1I/ZpcYCnkce7pvH9AHSvrZllAp1ZPpTRDZAFjF3TRFbukxSgIYTTNYePbS0rKUmaxVbXw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/http-errors": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/@types/http-errors/-/http-errors-2.0.5.tgz", "integrity": "sha512-r8Tayk8HJnX0FztbZN7oVqGccWgw98T/0neJphO91KkmOzug1KkofZURD4UaD5uH8AqcFLfdPErnBod0u71/qg==", + "dev": true, "license": "MIT" }, "node_modules/@types/ioredis": { "version": "4.28.10", "resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.28.10.tgz", "integrity": "sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -1848,6 +1870,7 @@ "version": "9.0.10", "resolved": "https://registry.npmjs.org/@types/jsonwebtoken/-/jsonwebtoken-9.0.10.tgz", "integrity": "sha512-asx5hIG9Qmf/1oStypjanR7iKTv0gXQ1Ov/jfrX6kS/EO0OFni8orbmGCn0672NHR3kXHwpAwR+B368ZGN/2rA==", + "dev": true, "license": "MIT", "dependencies": { "@types/ms": "*", @@ -1858,12 +1881,14 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/@types/ms/-/ms-2.1.0.tgz", "integrity": "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA==", + "dev": true, "license": "MIT" }, "node_modules/@types/node": { "version": "25.3.1", "resolved": "https://registry.npmjs.org/@types/node/-/node-25.3.1.tgz", "integrity": "sha512-hj9YIJimBCipHVfHKRMnvmHg+wfhKc0o4mTtXh9pKBjC8TLJzz0nzGmLi5UJsYAUgSvXFHgb0V2oY10DUFtImw==", + "dev": true, "license": "MIT", "dependencies": { "undici-types": "~7.18.0" @@ -1873,18 +1898,21 @@ "version": "6.14.0", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.14.0.tgz", "integrity": "sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ==", + "dev": true, "license": "MIT" }, "node_modules/@types/range-parser": { "version": "1.2.7", "resolved": "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.7.tgz", "integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ==", + "dev": true, "license": "MIT" }, "node_modules/@types/send": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/@types/send/-/send-1.2.1.tgz", "integrity": "sha512-arsCikDvlU99zl1g69TcAB3mzZPpxgw0UQnaHeC1Nwb015xp8bknZv5rIfri9xTOcMuaVgvabfIRA7PSZVuZIQ==", + "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -1894,6 +1922,7 @@ "version": "2.2.0", "resolved": "https://registry.npmjs.org/@types/serve-static/-/serve-static-2.2.0.tgz", "integrity": "sha512-8mam4H1NHLtu7nmtalF7eyBH14QyOASmcxHhSfEoRyr0nP/YdoesEtU+uSRvMe96TW/HPTtkoKqQLl53N7UXMQ==", + "dev": true, "license": "MIT", "dependencies": { "@types/http-errors": "*", @@ -1927,6 +1956,11 @@ "node": ">= 8" } }, + "node_modules/async": { + "version": "0.2.10", + "resolved": "https://registry.npmjs.org/async/-/async-0.2.10.tgz", + "integrity": "sha512-eAkdoKxU6/LkKDBzLpT+t6Ff5EtfSF4wx1WfJiPEEV7WNLnDaRXk0oVysiEPm262roaachGexwUv94WhSgN5TQ==" + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -2539,6 +2573,20 @@ "url": "https://opencollective.com/express" } }, + "node_modules/fluent-ffmpeg": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/fluent-ffmpeg/-/fluent-ffmpeg-2.1.3.tgz", + "integrity": "sha512-Be3narBNt2s6bsaqP6Jzq91heDgOEaDCJAXcE3qcma/EJBSy5FB4cvO31XBInuAuKBx8Kptf8dkhjK0IOru39Q==", + "deprecated": "Package no longer supported. Contact Support at https://www.npmjs.com/support for more info.", + "license": "MIT", + "dependencies": { + "async": "^0.2.9", + "which": "^1.1.1" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/follow-redirects": { "version": "1.15.11", "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz", @@ -2903,6 +2951,12 @@ "integrity": "sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==", "license": "MIT" }, + "node_modules/isexe": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/isexe/-/isexe-2.0.0.tgz", + "integrity": "sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==", + "license": "ISC" + }, "node_modules/jsonwebtoken": { "version": "9.0.3", "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.3.tgz", @@ -3705,6 +3759,7 @@ "version": "7.18.2", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.18.2.tgz", "integrity": "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w==", + "dev": true, "license": "MIT" }, "node_modules/unpipe": { @@ -3744,6 +3799,18 @@ "node": ">= 0.8" } }, + "node_modules/which": { + "version": "1.3.1", + "resolved": "https://registry.npmjs.org/which/-/which-1.3.1.tgz", + "integrity": "sha512-HxJdYWq1MTIQbJ3nw0cqssHoTNU267KlrDuGZ1WYlxDStUtKUhOaJmh112/TZmHxxUfuJqPXSOm7tDyas0OSIQ==", + "license": "ISC", + "dependencies": { + "isexe": "^2.0.0" + }, + "bin": { + "which": "bin/which" + } + }, "node_modules/wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", diff --git a/server/package.json b/server/package.json index 5f45cd9..78f565e 100644 --- a/server/package.json +++ b/server/package.json @@ -4,9 +4,11 @@ "description": "", "main": "index.js", "scripts": { - "start": "nodemon dist/index.js", - "build": "npx tsc -b", - "dev": "npm run build && nodemon dist/index.js" + "start": "node dist/index.js", + "start:worker": "node dist/worker.js", + "build": "tsc -b", + "dev": "npm run build && nodemon dist/index.js", + "dev:worker": "npm run build && nodemon dist/worker.js" }, "keywords": [], "author": "", @@ -16,14 +18,6 @@ "@aws-sdk/client-s3": "^3.1000.0", "@aws-sdk/lib-storage": "^3.1000.0", "@aws-sdk/s3-request-presigner": "^3.1000.0", - "@types/axios": "^0.9.36", - "@types/bcrypt": "^6.0.0", - "@types/body-parser": "^1.19.6", - "@types/cors": "^2.8.19", - "@types/dotenv": "^6.1.1", - "@types/express": "^5.0.6", - "@types/ioredis": "^4.28.10", - "@types/jsonwebtoken": "^9.0.10", "axios": "^1.13.6", "bcrypt": "^6.0.0", "body-parser": "^2.2.2", @@ -31,10 +25,20 @@ "cors": "^2.8.6", "dotenv": "^17.3.1", "express": "^5.2.1", + "fluent-ffmpeg": "^2.1.3", "ioredis": "^5.10.1", "jsonwebtoken": "^9.0.3" }, "devDependencies": { + "@types/fluent-ffmpeg": "^2.1.28", + "@types/axios": "^0.9.36", + "@types/bcrypt": "^6.0.0", + "@types/body-parser": "^1.19.6", + "@types/cors": "^2.8.19", + "@types/dotenv": "^6.1.1", + "@types/express": "^5.0.6", + "@types/ioredis": "^4.28.10", + "@types/jsonwebtoken": "^9.0.10", "nodemon": "^3.1.14" } -} +} \ No newline at end of file diff --git a/server/src/app.ts b/server/src/app.ts index 3713542..4f46fa8 100644 --- a/server/src/app.ts +++ b/server/src/app.ts @@ -4,8 +4,7 @@ import authRouter from "./routes/auth.routes.js"; import s3Router from "./routes/s3.routes.js"; import { authMiddleware } from "./middleware/auth.middleware.js"; -// Initialize BullMQ worker -import "./workers/transcode.worker.js"; +// Workers are started separately via worker.ts (see docker-compose.yml) const app = express(); app.use(cors({ diff --git a/server/src/index.ts b/server/src/index.ts index 1e0c072..444de4d 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -1,2 +1,3 @@ import 'dotenv/config.js' +import './utils/logger.js' import './server.js' diff --git a/server/src/services/hls.service.ts b/server/src/services/hls.service.ts new file mode 100644 index 0000000..e40544f --- /dev/null +++ b/server/src/services/hls.service.ts @@ -0,0 +1,141 @@ +import { getDownloadUrl } from "../utils/getPresignedUrl.js"; +import { type Job } from "bullmq"; +import axios from "axios"; +import path from "path"; +import fs from "fs"; +import { fileURLToPath } from "url"; +import ffmpeg from "fluent-ffmpeg"; +import s3Client from "../config/s3.js"; +import { PutObjectCommand } from "@aws-sdk/client-s3"; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + + +const bitRateFiles = [ + "1080p", + "720p", + "480p", + "360p" +] + +export type VideoDownloadSignedUrlsWithBitrates = { + bitrate: string; + url: string; +} + +export const getPreSignedUrlForDownloadHls = async (fileId: string, userId: string) => { + const currentEnv = process.env.NODE_ENV === 'development' ? 'dev' : 'prod'; + const baseVideoObjectId = `${currentEnv}/users/${userId}/original/${fileId}`; + + const videoDownloadSignedUrls: VideoDownloadSignedUrlsWithBitrates[] = []; + + await Promise.all(bitRateFiles.map(async (bitRateFile) => { + const videoObjectId = `${baseVideoObjectId}/${bitRateFile}.mp4` + const videoDownloadSignedUrl = await getDownloadUrl(videoObjectId) + + if (!videoDownloadSignedUrl) { + throw new Error(`Video download url is not found for file ${baseVideoObjectId} for bitrate ${bitRateFile}`); + } + + videoDownloadSignedUrls.push({ bitrate: bitRateFile, url: videoDownloadSignedUrl }) + })) + + return videoDownloadSignedUrls +} + +export const downloadObjectFromPreSignedUrlWithBitrate = async (videoDownloadSignedUrl: string, fileId: string, job: Job, bitrate: string) => { + // Download the video as a buffer + const response = await axios.get(videoDownloadSignedUrl, { responseType: 'arraybuffer' }); + + if (response.status !== 200) + console.log(`Video download failed for hls job id ${job.id}`) + + const videoBuffer = response.data; + + // Ensure downloads directory exists (hlsDownloads/{fileId}/{bitrate}) + const downloadsDir = path.join(__dirname, '..', '..', 'hlsDownloads', fileId); + if (!fs.existsSync(downloadsDir)) { + fs.mkdirSync(downloadsDir, { recursive: true }); + } + + // Save the buffer to the local repository + const localFilePath = path.join(downloadsDir, `${bitrate}.mp4`); + fs.writeFileSync(localFilePath, videoBuffer); + console.log(`✅ Video downloaded successfully to ${localFilePath} for hls job ${job.id}`); + + return localFilePath; +} + +export const segmentVideo = async (localFilePath: string, fileId: string, job: Job, bitrate: string): Promise => { + // Output directory: hlsDownloads/{fileId}/{bitrate}/ + const outputDir = path.join(path.dirname(localFilePath), bitrate); + if (!fs.existsSync(outputDir)) { + fs.mkdirSync(outputDir, { recursive: true }); + } + + const playlistPath = path.join(outputDir, `${bitrate}.m3u8`); + const segmentPattern = path.join(outputDir, `segment_%03d.ts`); + + return new Promise((resolve, reject) => { + console.log(`⏳ Starting HLS segmentation for ${fileId} [${bitrate}]...`); + + ffmpeg(localFilePath) + .outputOptions([ + '-codec copy', // no re-encoding, just repackage + '-start_number 0', + '-hls_time 6', // 10-second segments + '-hls_list_size 0', // keep all segments in the playlist + '-hls_segment_filename', segmentPattern, + '-f hls', + ]) + .output(playlistPath) + .on('progress', (progress) => { + if (progress.percent) { + console.log(`📊 HLS [${bitrate}] progress: ${Math.round(progress.percent)}%`); + } + }) + .on('end', () => { + console.log(`✅ HLS segmentation complete for ${fileId} [${bitrate}] → ${playlistPath}`); + resolve(playlistPath); + }) + .on('error', (err) => { + console.error(`❌ HLS segmentation failed for ${fileId} [${bitrate}]:`, err); + reject(err); + }) + .run(); + }); +} + +export const uploadSegmentedVideos = async (playlistPath: string, fileId: string, userId: string, bitrate: string): Promise => { + const currentEnv = process.env.NODE_ENV === 'development' ? 'dev' : 'prod'; + const bucket = process.env.HLS_S3_BUCKET_NAME; + const s3BaseKey = `${currentEnv}/users/${userId}/${fileId}/${bitrate}`; + + // The playlist lives in the segment output directory — read all files from there + const segmentDir = path.dirname(playlistPath); + const files = fs.readdirSync(segmentDir); + const uploadedKeys: string[] = []; + + await Promise.all(files.map(async (fileName) => { + const filePath = path.join(segmentDir, fileName); + const s3Key = `${s3BaseKey}/${fileName}`; + const contentType = fileName.endsWith('.m3u8') + ? 'application/x-mpegURL' + : 'video/MP2T'; // .ts segments + + const command = new PutObjectCommand({ + Bucket: bucket, + Key: s3Key, + Body: fs.createReadStream(filePath), + ContentType: contentType, + }); + + await s3Client.send(command); + console.log(`☁️ Uploaded ${fileName} to S3: ${s3Key}`); + uploadedKeys.push(s3Key); + })); + + console.log(`✅ All HLS files uploaded to S3 for ${fileId} [${bitrate}]`); + return uploadedKeys; +} diff --git a/server/src/services/transcode.service.ts b/server/src/services/transcode.service.ts new file mode 100644 index 0000000..98676b9 --- /dev/null +++ b/server/src/services/transcode.service.ts @@ -0,0 +1,118 @@ +import { getDownloadUrl } from "../utils/getPresignedUrl.js"; +import axios from "axios"; +import type { Job } from "bullmq"; +import fs from "fs"; +import path from "path"; +import { fileURLToPath } from "url"; +import ffmpeg from "fluent-ffmpeg"; +import { PutObjectCommand } from "@aws-sdk/client-s3"; +import s3Client from "../config/s3.js"; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +export const getPreSignedUrlForDownload = async (fileId: string, userId: string) => { + const currentEnv = process.env.NODE_ENV === 'development' ? 'dev' : 'prod'; + const videoObjectId = `${currentEnv}/users/${userId}/original/${fileId}`; + + const videoDownloadSignedUrl = await getDownloadUrl(videoObjectId) + + if (!videoDownloadSignedUrl) { + throw new Error("Video download signed url not found for transcode job"); + } + + return videoDownloadSignedUrl +} + +export const downloadObjectFromPreSignedUrl = async (videoDownloadSignedUrl: string, fileId: string, job: Job) => { + // Download the video as a buffer + const response = await axios.get(videoDownloadSignedUrl, { responseType: 'arraybuffer' }); + + if (response.status !== 200) + console.log(`Video download failed for transcode job id ${job.id}`) + + const videoBuffer = response.data; + + // Ensure downloads directory exists + const downloadsDir = path.join(__dirname, '..', '..', 'downloads'); + if (!fs.existsSync(downloadsDir)) { + fs.mkdirSync(downloadsDir, { recursive: true }); + } + + // Save the buffer to the local repository + const localFilePath = path.join(downloadsDir, fileId); + fs.writeFileSync(localFilePath, videoBuffer); + console.log(`✅ Video downloaded successfully to ${localFilePath} for transcode job ${job.id}`); + + return localFilePath; +} + +export const transcodeVideo = async (inputFilePath: string, fileId: string): Promise => { + const resolutions = [ + { name: '1080p', size: '1920x1080' }, + { name: '720p', size: '1280x720' }, + { name: '480p', size: '854x480' }, + { name: '360p', size: '640x360' } + ]; + + const downloadsDir = path.dirname(inputFilePath); + const outputFiles: string[] = []; + + const transcodePromises = resolutions.map((res) => { + return new Promise((resolve, reject) => { + const outputFileName = `${fileId}_${res.name}.mp4`; + const outputFilePath = path.join(downloadsDir, outputFileName); + + console.log(`⏳ Starting transcode for ${res.name} (${res.size})...`); + + ffmpeg(inputFilePath) + .output(outputFilePath) + .size(res.size) + .videoCodec('libx264') + .audioCodec('aac') + .on('end', () => { + console.log(`✅ Finished transcode for ${res.name}`); + outputFiles.push(outputFilePath); + resolve(outputFilePath); + }) + .on('error', (err) => { + console.error(`❌ Error transcoding to ${res.name}:`, err); + reject(err); + }) + .run(); + }); + }); + + await Promise.all(transcodePromises); + return outputFiles; +}; + +export const uploadTranscodedFiles = async (outputFiles: string[], fileId: string, userId: string): Promise => { + const currentEnv = process.env.NODE_ENV === 'development' ? 'dev' : 'prod'; + const bucket = process.env.S3_BUCKET_NAME; + const uploadedKeys: string[] = []; + + const uploadPromises = outputFiles.map(async (filePath) => { + const fileName = path.basename(filePath); + // Extract resolution from filename (e.g. "fileId_720p.mp4" -> "720p") + const resolution = fileName.replace(`${fileId}_`, '').replace('.mp4', ''); + const s3Key = `${currentEnv}/users/${userId}/original/${fileId}/${resolution}.mp4`; + + const fileBuffer = fs.readFileSync(filePath); + + const command = new PutObjectCommand({ + Bucket: bucket, + Key: s3Key, + Body: fileBuffer, + ContentType: 'video/mp4', + }); + + await s3Client.send(command); + console.log(`☁️ Uploaded ${resolution} to S3: ${s3Key}`); + uploadedKeys.push(s3Key); + }); + + await Promise.all(uploadPromises); + console.log(`✅ All transcoded files uploaded to S3 for fileId: ${fileId}`); + return uploadedKeys; +}; diff --git a/server/src/worker.ts b/server/src/worker.ts new file mode 100644 index 0000000..e2a6d53 --- /dev/null +++ b/server/src/worker.ts @@ -0,0 +1,8 @@ +import 'dotenv/config.js' +import './utils/logger.js' + +// Initialize BullMQ workers +import './workers/transcode.worker.js' +import './workers/hls.worker.js' + +console.log('🚀 Workers started and listening for jobs...') diff --git a/server/src/workers/hls.queue.ts b/server/src/workers/hls.queue.ts new file mode 100644 index 0000000..ac2c59e --- /dev/null +++ b/server/src/workers/hls.queue.ts @@ -0,0 +1,15 @@ +import { Queue } from "bullmq"; +import { redis } from "../config/redis.js"; + +export const hlsQueue = new Queue("hlsQueue", { + connection: redis as any, + defaultJobOptions: { + attempts: 5, + backoff: { + type: 'exponential', + delay: 5000 + }, + removeOnComplete: true, + removeOnFail: false + } +}); diff --git a/server/src/workers/hls.worker.ts b/server/src/workers/hls.worker.ts new file mode 100644 index 0000000..284b2f1 --- /dev/null +++ b/server/src/workers/hls.worker.ts @@ -0,0 +1,45 @@ +import { Worker, type Job } from "bullmq"; +import { redis } from "../config/redis.js"; +import { downloadObjectFromPreSignedUrlWithBitrate, getPreSignedUrlForDownloadHls, segmentVideo, uploadSegmentedVideos } from "../services/hls.service.js"; +import { exec } from "child_process"; + +export const hlsWorker = new Worker("hlsQueue", async (job: Job) => { + const { fileId, userId } = job.data; + console.log(`📽️ Processing HLS job ${job.id} for fileId ${fileId}`); + + const videoDownloadSignedUrls = await getPreSignedUrlForDownloadHls(fileId, userId); + + await Promise.all(videoDownloadSignedUrls.map(async (url) => { + const localFilePath = await downloadObjectFromPreSignedUrlWithBitrate(url.url, fileId, job, url.bitrate); + + console.log(`⏳ Initiating HLS segmenting for ${fileId} for ${url.bitrate} bitrate...`); + const playlistPath = await segmentVideo(localFilePath, fileId, job, url.bitrate); + console.log(`✅ HLS playlist created at ${playlistPath}`); + + + console.log(`☁️ Uploading segmented files to S3 for ${fileId} with ${url.bitrate} bitrate...`); + await uploadSegmentedVideos(playlistPath, fileId, userId, url.bitrate); + console.log(`✅ All hls uploads complete for job ${job.id}.`); + })); + +}, { + connection: redis as any, +}); + +hlsWorker.on("completed", () => { + exec("rm -rf hlsDownloads", (error, stdout, stderr) => { + if (error) { + console.error(`exec error: ${error.message}`); + return; + } + if (stderr) { + console.error(`stderr: ${stderr}`); + return; + } + console.log('Successfully cleaned up hlsDownloads directory'); + }) +}) + +hlsWorker.on("failed", (job, err) => { + console.log(`Job ${job?.id} has failed with error: ${err.message}`); +}) \ No newline at end of file diff --git a/server/src/workers/transcode.queue.ts b/server/src/workers/transcode.queue.ts index 942982d..3c84cd9 100644 --- a/server/src/workers/transcode.queue.ts +++ b/server/src/workers/transcode.queue.ts @@ -2,5 +2,14 @@ import { Queue } from "bullmq"; import { redis } from "../config/redis.js"; export const transcodeQueue = new Queue("transcodeQueue", { - connection: redis as any + connection: redis as any, + defaultJobOptions: { + attempts: 5, + backoff: { + type: 'exponential', + delay: 3000 + }, + removeOnComplete: true, + removeOnFail: false + } }); diff --git a/server/src/workers/transcode.worker.ts b/server/src/workers/transcode.worker.ts index bf4c6f9..2e2b16a 100644 --- a/server/src/workers/transcode.worker.ts +++ b/server/src/workers/transcode.worker.ts @@ -1,27 +1,115 @@ -import { Worker, type Job } from "bullmq"; +import { tryCatch, Worker, type Job } from "bullmq"; import { redis } from "../config/redis.js"; +import { downloadObjectFromPreSignedUrl, getPreSignedUrlForDownload, transcodeVideo, uploadTranscodedFiles } from "../services/transcode.service.js"; +import path from "path"; +import { fileURLToPath } from "url"; +import { exec } from "child_process"; +import { hlsQueue } from "./hls.queue.js"; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +// ────────────────────────────────────────────────────────── +// 🧪 TEST CONFIG: Simulate failures to observe exponential backoff +// Set to true to make the first N attempts fail on purpose +// const SIMULATE_FAILURE = true; +// const FAIL_UNTIL_ATTEMPT = 3; // Succeed on the 3rd attempt (fails attempts 1 & 2) +const BASE_DELAY = 3000; // Must match the delay in transcode.queue.ts +// ────────────────────────────────────────────────────────── + +// Track timestamps to measure actual delay between attempts +// const jobTimestamps = new Map(); export const transcodeWorker = new Worker("transcodeQueue", async (job: Job) => { const { fileId, userId, contentType } = job.data; - console.log(`📽️ Processing job ${job.id} for fileId ${fileId}`); + const currentAttempt = job.attemptsMade + 1; // attemptsMade is 0-indexed before this run + + // const now = Date.now(); + // const jobKey = job.id ?? fileId; + // const lastTimestamp = jobTimestamps.get(jobKey); + // const elapsedMs = lastTimestamp ? now - lastTimestamp : 0; + // jobTimestamps.set(jobKey, now); + + // const expectedDelay = currentAttempt > 1 ? BASE_DELAY * Math.pow(2, currentAttempt - 2) : 0; + + // console.log(`\n${'='.repeat(60)}`); + // console.log(`📽️ Processing job ${job.id} | fileId: ${fileId}`); + // console.log(`🔄 Attempt ${currentAttempt} / ${job.opts.attempts ?? '?'}`); + // console.log(`⏰ Timestamp: ${new Date().toISOString()}`); + // if (currentAttempt > 1) { + // console.log(`⏱️ Time since last attempt: ${(elapsedMs / 1000).toFixed(1)}s (expected: ~${(expectedDelay / 1000).toFixed(1)}s)`); + // } + // console.log(`${'='.repeat(60)}`); + + // // 🧪 Simulate failure for testing exponential backoff + // if (SIMULATE_FAILURE && currentAttempt < FAIL_UNTIL_ATTEMPT) { + // const nextDelay = BASE_DELAY * Math.pow(2, currentAttempt - 1); + // const errorMsg = `🧪 SIMULATED FAILURE on attempt ${currentAttempt}. Will succeed on attempt ${FAIL_UNTIL_ATTEMPT}. Next retry in ~${(nextDelay / 1000).toFixed(1)}s (exponential backoff).`; + // console.log(`❌ ${errorMsg}`); + // throw new Error(errorMsg); + // } + + // if (SIMULATE_FAILURE && currentAttempt >= FAIL_UNTIL_ATTEMPT) { + // console.log(`✅ 🧪 Attempt ${currentAttempt} — past simulated failure threshold, proceeding normally!`); + // } + + const videoDownloadSignedUrl = await getPreSignedUrlForDownload(fileId, userId); + + if (!videoDownloadSignedUrl) { + console.log(`❌ Presigned url failed for job ${job.id}.`); + return; + } - // Simulate transcoding delay - await new Promise((resolve) => setTimeout(resolve, 3000)); + const localFilePath = await downloadObjectFromPreSignedUrl(videoDownloadSignedUrl, fileId, job) - console.log(`✅ Transcoding completed for job ${job.id}`); + console.log(`⏳ Initiating bulk FFmpeg transcodes for ${fileId}...`); + const outputFiles = await transcodeVideo(localFilePath, fileId); + console.log(`✅ All transcoding finished for job ${job.id}. Outputs: ${outputFiles.join(', ')}`) + console.log(`☁️ Uploading transcoded files to S3 for ${fileId}...`); + await uploadTranscodedFiles(outputFiles, fileId, userId); + console.log(`✅ All uploads complete for job ${job.id}.`); - // TODO: - // 1. Download original file from S3 - // 2. Transcode with fluent-ffmpeg to 720p, 360p, 240p - // 3. Upload transcoded versions back to S3 + await hlsQueue.add("hls", { fileId, userId }) + console.log(`HLS job added for fileId ${fileId}`) }, { connection: redis as any, + settings: { + backoffStrategy: (attemptsMade: number, type?: string) => { + if (type === 'exponential') { + const delay = Math.round(Math.pow(2, attemptsMade - 1) * BASE_DELAY); + console.log(`\n🔧 Backoff strategy called: attemptsMade=${attemptsMade}, type=${type}`); + console.log(` 📐 Formula: 2^(${attemptsMade}-1) × ${BASE_DELAY}ms = ${delay}ms (${(delay / 1000).toFixed(1)}s)`); + return delay; + } + return BASE_DELAY; + }, + }, }); transcodeWorker.on("completed", (job) => { + exec("rm -rf downloads", (error, stdout, stderr) => { + if (error) { + console.error(`exec error: ${error.message}`); + return; + } + if (stderr) { + console.error(`stderr: ${stderr}`); + return; + } + console.log('Successfully cleaned up downloads directory'); + }) console.log(`Job ${job.id} has completed successfully!`); }); transcodeWorker.on("failed", (job, err) => { - console.log(`Job ${job?.id} has failed with error: ${err.message}`); + const attempt = job?.attemptsMade ?? 0; + const maxAttempts = job?.opts.attempts ?? 5; + const nextDelay = BASE_DELAY * Math.pow(2, attempt - 1); + console.log(`\n❌ Job ${job?.id} FAILED on attempt ${attempt}/${maxAttempts}`); + console.log(` Error: ${err.message}`); + if (attempt < maxAttempts) { + console.log(` ⏳ Next retry in ~${(nextDelay / 1000).toFixed(1)}s (exponential backoff: ${BASE_DELAY / 1000}s × 2^${attempt - 1})`); + } else { + console.log(` 🚫 No more retries — job has permanently failed.`); + } }); diff --git a/server/tsconfig.tsbuildinfo b/server/tsconfig.tsbuildinfo index 9cd85ac..2a8babd 100644 --- a/server/tsconfig.tsbuildinfo +++ b/server/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/app.ts","./src/index.ts","./src/server.ts","./src/config/redis.ts","./src/config/s3.ts","./src/controllers/auth.controller.ts","./src/middleware/auth.middleware.ts","./src/routes/auth.routes.ts","./src/routes/s3.routes.ts","./src/services/auth.service.ts","./src/services/s3.service.ts","./src/services/user.store.ts","./src/utils/generateid.ts","./src/utils/getpresignedurl.ts","./src/utils/jwt.ts","./src/utils/logger.ts","./src/workers/transcode.queue.ts","./src/workers/transcode.worker.ts"],"version":"5.9.3"} \ No newline at end of file +{"root":["./src/app.ts","./src/index.ts","./src/server.ts","./src/config/redis.ts","./src/config/s3.ts","./src/controllers/auth.controller.ts","./src/middleware/auth.middleware.ts","./src/routes/auth.routes.ts","./src/routes/s3.routes.ts","./src/services/auth.service.ts","./src/services/hls.service.ts","./src/services/s3.service.ts","./src/services/transcode.service.ts","./src/services/user.store.ts","./src/utils/generateid.ts","./src/utils/getpresignedurl.ts","./src/utils/jwt.ts","./src/utils/logger.ts","./src/workers/hls.queue.ts","./src/workers/hls.worker.ts","./src/workers/transcode.queue.ts","./src/workers/transcode.worker.ts"],"version":"5.9.3"} \ No newline at end of file diff --git a/web/index.html b/web/index.html index 0bd30d9..da2dca1 100644 --- a/web/index.html +++ b/web/index.html @@ -1,5 +1,6 @@ + @@ -165,7 +166,7 @@ color: var(--text-muted); font-size: 0.8rem; } - + #selected-file-name { display: block; margin-top: 1rem; @@ -206,7 +207,7 @@ color: var(--success-color); border: 1px solid rgba(16, 185, 129, 0.2); } - + .progress-bar { width: 100%; height: 6px; @@ -216,7 +217,7 @@ overflow: hidden; display: none; } - + .progress-fill { height: 100%; background: var(--primary-color); @@ -228,7 +229,7 @@ display: inline-block; width: 1.25rem; height: 1.25rem; - border: 2px solid rgba(255,255,255,0.3); + border: 2px solid rgba(255, 255, 255, 0.3); border-radius: 50%; border-top-color: white; animation: spin 1s ease-in-out infinite; @@ -238,11 +239,11 @@ transform: translate(-50%, -50%); display: none; } - + .btn.loading { color: transparent; } - + .btn.loading .loader { display: block; } @@ -264,12 +265,15 @@ } @keyframes spin { - to { transform: translate(-50%, -50%) rotate(360deg); } + to { + transform: translate(-50%, -50%) rotate(360deg); + } } + @@ -278,7 +282,7 @@

Welcome Back

Sign in to upload files

- +
@@ -304,11 +308,12 @@

Welcome Back

Upload File

Secure direct upload to S3

- +
- + @@ -323,17 +328,21 @@

Upload File

Start Upload
- +
+ +
- + + \ No newline at end of file