From e49206bab3d9db19ee45f5673f59fe51ac746e37 Mon Sep 17 00:00:00 2001 From: jiankaiii Date: Fri, 26 Sep 2025 06:36:24 +0000 Subject: [PATCH] fix: timeout when uploading large file --- backend/main.py | 88 +++++++----- .../src/lib/embedding/generate-embedding.ts | 126 +++++++++++++----- frontend/src/lib/extract-file-data.ts | 39 ++++-- 3 files changed, 179 insertions(+), 74 deletions(-) diff --git a/backend/main.py b/backend/main.py index acf66da..3d78027 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,7 +1,6 @@ # Copyright (C) 2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - -from fastapi import FastAPI, HTTPException, File, UploadFile +from fastapi import BackgroundTasks, FastAPI, HTTPException, File, UploadFile from pydantic import BaseModel import fitz # PyMuPDF from pathlib import Path @@ -12,11 +11,15 @@ from generate_image_embedding import generate_image_embedding from fastapi.responses import FileResponse, JSONResponse from generate_pptx import create_pptx +from generate_pptx import create_pptx from starlette.background import BackgroundTask import tempfile import imagehash from PIL import Image import io +import uuid +from typing import Dict +import json app = FastAPI() @@ -26,22 +29,10 @@ OUTPUT_DIR.mkdir(parents=True, exist_ok=True) -@app.post("/parse") -async def parse_pdf(file: UploadFile = File(...)): - """ - Endpoint to parse a PDF file uploaded via multipart/form-data. - Extracts images, generates captions and embeddings, and returns the data. - """ - temp_file_path = None +def process_pdf_to_file(job_id: str, pdf_path: str, filename: str): try: - # Create temp file with delete=False to avoid Windows file locking issues - with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: - temp_file.write(await file.read()) - temp_file_path = temp_file.name - - print(f"DEBUG : Temporary PDF file created at: {temp_file_path}") - # Open the PDF file using PyMuPDF (now works on Windows since file is closed) - pdf_file = fitz.open(str(temp_file_path)) + print(f"Processing job {job_id}") + pdf_file = fitz.open(str(pdf_path)) image_data = [] image_order = 1 seen_hashes = set() @@ -88,29 +79,62 @@ async def parse_pdf(file: UploadFile = File(...)): # Prepare the response data response_data = { - "name": file.filename, + "name": filename, "details": f"Extracted {len(image_data)} images from the PDF.", "images": image_data, "text": extracted_text, } - return JSONResponse(content=response_data) + temp_dir = tempfile.gettempdir() + result_path = os.path.join(temp_dir, f"{job_id}.json") + with open(result_path, "w") as f: + json.dump(response_data, f) except Exception as e: - print(f"Error processing PDF: {e}") - raise HTTPException( - status_code=500, detail=f"An error occurred while processing the PDF: {e}" - ) + print(f"Error in processing pdf job_id: {job_id}: {e}") + finally: - # Clean up temporary file on Windows - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - print(f"DEBUG: Cleaned up temporary file: {temp_file_path}") - except Exception as cleanup_error: - print( - f"Warning: Failed to clean up temporary file {temp_file_path}: {cleanup_error}" - ) + try: + if os.path.exists(pdf_path): + os.remove(pdf_path) + except Exception as cleanup_err: + print(f"Warning: Failed to remove temporary PDF {pdf_path}: {cleanup_err}") + + +@app.post("/upload") +async def upload_file( + file: UploadFile = File(...), background_tasks: BackgroundTasks = None +): + try: + # Generate job ID + job_id = str(uuid.uuid4()) + tmp_dir = tempfile.gettempdir() + tmp_path = os.path.join(tmp_dir, f"{job_id}_{file.filename}") + + # Save uploaded file to /tmp + with open(tmp_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + # Schedule background PDF processing + background_tasks.add_task(process_pdf_to_file, job_id, tmp_path, file.filename) + + return {"jobID": job_id} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error uploading file: {e}") + + +@app.get("/result/{job_id}") +def get_result(job_id: str): + temp_dir = tempfile.gettempdir() + result_path = os.path.join(temp_dir, f"{job_id}.json") + if not os.path.exists(result_path): + return JSONResponse( + status_code=202, content={"message": "PDF processing not complete yet."} + ) + + with open(result_path, "r") as f: + result = json.load(f) + return result class PPTXRequest(BaseModel): diff --git a/frontend/src/lib/embedding/generate-embedding.ts b/frontend/src/lib/embedding/generate-embedding.ts index d1d54ec..0e2f76e 100644 --- a/frontend/src/lib/embedding/generate-embedding.ts +++ b/frontend/src/lib/embedding/generate-embedding.ts @@ -6,7 +6,7 @@ import { EmbeddingChunk } from '../types/embedding-chunk' import { embed } from 'ai' import { verifyModel } from '../model/model-manager' import { detokenize, effectiveTokenCount, tokenize } from '../utils' - +import { randomInt } from 'crypto' /** * Generates embeddings for a given text using a specified model. * @@ -17,6 +17,22 @@ import { detokenize, effectiveTokenCount, tokenize } from '../utils' * @returns A promise that resolves to an array of embedding chunks. * @throws An error if the model verification or embedding generation fails. */ + +function sanitizeChunk(text: string): string { + return ( + text + // Collapse long runs of periods (..... -> .) + .replace(/([.])\1{2,}/g, '$1') + // Collapse long runs of dashes, underscores, etc. (optional) + .replace(/([-_*])\1{2,}/g, '$1') + // Remove zero-width and control characters + .replace(/[\u0000-\u001F\u007F-\u009F\u200B]/g, '') + // Collapse extra whitespace + .replace(/\s{2,}/g, ' ') + .trim() + ) +} + export async function generateEmbeddings( text: string, chunkSizeToken: number, @@ -73,46 +89,90 @@ export async function generateEmbeddings( let completedCount = 0 const totalChunks = chunks.length console.log('DEBUG: generateEmbeddings totalChunks:', totalChunks) - const embeddingPromises = chunks.map(async (chunk, index) => { - try { - const { embedding } = await embed({ - model: ollama.embedding(modelName), - value: chunk, - }) - // console.log( - // `Embedding generated for chunk ${index + 1}/${chunks.length}` - // ); - completedCount++ - const completionPercentage = ((completedCount / totalChunks) * 100).toFixed(2) - // console.log( - // `Embedding generation: ${completionPercentage}% (${completedCount}/${totalChunks})` - // ); - const tokens = tokenize(chunk) - console.log( - `DEBUG: generateEmbeddings: ${completionPercentage}% (${completedCount}/${totalChunks}) | ` + - `[${index}]: ${chunk.length} chars | ` + - `Adjusted token (${chunkSizeToken}): ${tokens.length}`, - ) + async function embedChunk(chunk: string, index: number): Promise { + const sanitized = sanitizeChunk(chunk) + // Log full chunk if sanitization changed it + if (sanitized !== chunk) { + const sanitizeLog = ` +Sanitized chunk ${index + 1}: +Before: ${chunk} +After : ${sanitized} +Length: ${chunk.length} -> ${sanitized.length} +-------` + console.log(sanitizeLog) + } + const maxRetries = 5 + const tokens = tokenize(chunk) + const preview = chunk.slice(0, 500) - return { - order: index + 1, // 1-based order - chunk: chunk, - embedding: embedding, // assumed to be a number[] - sourceType: 'user' as const, // Specify sourceType for user-generated embeddings + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + const { embedding } = await embed({ + model: ollama.embedding(modelName), + value: sanitized, + }) + completedCount++ + const completionPercentage = ((completedCount / totalChunks) * 100).toFixed(2) + + const successLog = ` +Successful embedding for chunk ${index + 1}/${totalChunks} +Length: ${chunk.length}, Tokens: ${tokens.length} +Preview: ${preview} +Completion: ${completionPercentage}% (${completedCount}/${totalChunks}) +-------` + console.log(successLog) + return { + order: index + 1, + chunk: sanitized, + embedding, + sourceType: 'user' as const, + } + } catch (err: unknown) { + let message: string + if (err instanceof Error) { + message = err.message + } else if (typeof err === 'string') { + message = err + } else { + message = JSON.stringify(err) + } + + const errorLog = ` +Attempt ${attempt}/${maxRetries} failed for chunk ${index + 1}/${totalChunks} +Length: ${chunk.length}, Tokens: ${tokens.length} +Preview: ${preview} +Error: ${message} +-------` + console.error(errorLog) + if (attempt < maxRetries) { + const jitter = randomInt(0, 100) + const delay = 500 * 2 ** (attempt - 1) + jitter + await new Promise((resolve) => { + setTimeout(() => resolve(), delay) + }) + } } - } catch (error) { - throw new Error( - `Failed to generate embedding for chunk ${index + 1}/${totalChunks}: ${error}`, - ) } - }) - const results = await Promise.all(embeddingPromises) + const finalErrorLog = ` +Failed permanently for chunk ${index + 1}/${totalChunks} +Length: ${chunk.length}, Tokens: ${tokens.length} +Preview: ${preview} +-------` + console.error(finalErrorLog) + return null + } + + const embeddingPromises = chunks.map((chunk, index) => embedChunk(chunk, index)) + const settled = await Promise.all(embeddingPromises) + + const results = settled.filter((r): r is EmbeddingChunk => r !== null) + const endTime = Date.now() const totalTimeTakenMs = endTime - startTime const totalTimeTakenSec = (totalTimeTakenMs / 1000).toFixed(2) console.log( - `Generated ${chunks.length} embeddings in ${totalTimeTakenMs}ms (${totalTimeTakenSec}s)`, + `Generated ${results.length}/${chunks.length} embeddings in ${totalTimeTakenMs}ms (${totalTimeTakenSec}s)`, ) return results diff --git a/frontend/src/lib/extract-file-data.ts b/frontend/src/lib/extract-file-data.ts index 6ea929a..6814806 100644 --- a/frontend/src/lib/extract-file-data.ts +++ b/frontend/src/lib/extract-file-data.ts @@ -49,25 +49,46 @@ export async function extractFileData(file: { const formData = new FormData() formData.append('file', new Blob([new Uint8Array(data)], { type: mimetype }), file.name) - const parsefastApiUrl = new URL('/parse', process.env.FASTAPI_SERVER_URL).href - const fastApiResponse = await fetch(parsefastApiUrl, { + // Upload file and get response to confirm upload status + + const url = new URL('/upload', process.env.FASTAPI_SERVER_URL) + const uploadResponse = await fetch(url, { method: 'POST', body: formData, }) + if (!uploadResponse.ok) { + throw new Error('Failed to upload file to FastAPI server') + } - if (!fastApiResponse.ok) { - throw new Error('Failed to parse PDF on FastAPI server') + const { jobID } = await uploadResponse.json() + + // Poll /result/{jobID} until done + const pollResult = async (): Promise => { + const url = new URL(`/result/${encodeURIComponent(jobID)}`, process.env.FASTAPI_SERVER_URL) + const pollRes = await fetch(url) + if (pollRes.status === 202) { + // Not ready yet, wait and retry + await new Promise((resolve) => { + setTimeout(() => resolve(), 3000) + }) + return pollResult() + } + if (!pollRes.ok) { + throw new Error('Failed to retrieve processed PDF result') + } + return pollRes.json() } - const fastApiData = await fastApiResponse.json() - extractedText = fastApiData.text - extractedImages = fastApiData.images + const parsedData = await pollResult() + extractedText = parsedData.text + extractedImages = parsedData.images } else if (mimetype.includes('text') || ext === '.txt') { fileType = 'txt' - // contentSequence.push({ type: "text", content: data.toString("utf-8") }); + + // contentSequence.push({ type: 'text', content: data.toString('utf-8') }); } else if (mimetype.includes('markdown') || ext === '.md') { fileType = 'md' - // contentSequence.push({ type: "text", content: data.toString("utf-8") }); + // contentSequence.push({ type: 'text', content: data.toString('utf-8') }); } else { throw new Error('Unsupported file type') }