<a href="https://colab.research.google.com/github/rishabh-2005/IITISoC-ML-05/blob/main/voice_enhancer_and_captioning/noise_removal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from functools import lru_cache

In [None]:
!pip install fastapi uvicorn python-multipart
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import FileResponse
import os

In [None]:
app = FastAPI()

In [None]:
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse

# Assuming 'app = FastAPI()' is already defined

@app.get("/", response_class=HTMLResponse)
async def read_root():
    html_content = """
    <!DOCTYPE html>
<html>
<head>
    <title>Cloudinary Video Denoiser</title>
    <style>
        #loading {
            display: none;
            font-weight: bold;
            color: #555;
        }
    </style>
</head>
<body>
    <h1>Denoise Video from Cloudinary Link</h1>

    <form id="cloudinaryForm">
        Cloudinary Video URL:<br>
        <input type="text" name="video_url" id="videoURL" required size="80"><br><br>

        Volume (0–100):<br>
        <input type="number" name="volume" id="volume" value="50" min="0" max="100"><br><br>

        Generate Subtitles:
        <input type="checkbox" name="gen_sub" id="genSub"><br><br>

        <button type="submit" id="submitBtn">Denoise</button>
    </form>

    <div id="loading">⏳ Denoising in progress... please wait</div>

    <h2>Output Video:</h2>
    <video id="outputVideo" width="400" controls></video>

    <script>
    document.getElementById("cloudinaryForm").addEventListener("submit", async function(e) {
        e.preventDefault();

        const videoURL = document.getElementById("videoURL").value;
        const volume = document.getElementById("volume").value;
        const genSub = document.getElementById("genSub").checked;

        const formData = new FormData();
        formData.append("video_url", videoURL);
        formData.append("volume", volume);
        formData.append("gen_sub", genSub);

        // Show loading
        document.getElementById("loading").style.display = "block";
        document.getElementById("submitBtn").disabled = true;

        try {
            const response = await fetch("/denoise/", {
                method: "POST",
                body: formData
            });

            if (!response.ok) {
                throw new Error("Server error");
            }

            const blob = await response.blob();
            const outputUrl = URL.createObjectURL(blob);

            document.getElementById("outputVideo").src = outputUrl + "?t=" + new Date().getTime();
        } catch (err) {
            alert("Error: " + err.message);
            console.error(err);
        } finally {
            document.getElementById("loading").style.display = "none";
            document.getElementById("submitBtn").disabled = false;
        }
    });
    </script>
</body>
</html>
    """
    return HTMLResponse(content=html_content, status_code=200)

In [None]:
!pip install voicefixer

In [None]:
!pip install faster-whisper

In [None]:
from faster_whisper import WhisperModel

In [None]:
from voicefixer import VoiceFixer

In [None]:
!pip install ffmpeg-python
import ffmpeg

In [None]:
import math

In [None]:
print("Initializing VoiceFixer...")
vf = VoiceFixer()

In [None]:
from moviepy.editor import VideoFileClip
#@lru_cache(maxsize=None)
def extract_audio_from_video(mp4_path, wav_output_path):
    print("Executing extract_audio_from_video...")
    # Load video file
    video = VideoFileClip(mp4_path)

    # Extract and write audio to .wav
    audio = video.audio
    if audio is None:
        raise ValueError("No audio stream found in the video.")

    audio.write_audiofile(wav_output_path, codec='pcm_s16le')  # WAV format



In [None]:
def restore():
  print("Processing...")
  vf.restore(input="input_audio.wav", # low quality .wav/.flac file
                       output="outfile.wav", # save file path
                       cuda=False, # GPU acceleration
                       mode=0)
  print("Done. Saved as outfile.wav")


In [None]:
from re import S
from pydub import AudioSegment
#@lru_cache(maxsize=None)
def volume_adjust(slider):
    # Load the audio files
    input_audio = AudioSegment.from_wav("input_audio.wav")
    outfile_audio = AudioSegment.from_wav("outfile.wav")

    audio = AudioSegment.from_wav("input_audio.wav")
    average_volume = audio.dBFS
    x = (100-slider)/100 * 2 * average_volume

    adjusted_input_audio = input_audio.apply_gain(x)

    # Here, we start the overlay at the beginning (position=0).
    combined_audio = adjusted_input_audio.overlay(outfile_audio, position=0)

    # Export the combined audio
    combined_audio.export("combined_audio.wav", format="wav")
    return "combined_audio.wav"


#some code to take the value of slider 0-100 from WEBDEV side and feed it to the volume adjust function


In [None]:
#@lru_cache(maxsize=None)
def transcribe(audio):
    print("Transcribing...")
    model = WhisperModel("small")
    segments, info = model.transcribe(audio)
    language = info.language
    print("Transcription language", language)
    transcribed_text = ""
    segments = list(segments)
    for segment in segments:
        transcribed_text += segment.text + "\n"
    return language, segments, transcribed_text

In [None]:
#@lru_cache(maxsize=None)
def format_time(seconds):

    hours = math.floor(seconds / 3600)
    seconds %= 3600
    minutes = math.floor(seconds / 60)
    seconds %= 60
    milliseconds = round((seconds - math.floor(seconds)) * 1000)
    seconds = math.floor(seconds)
    formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}"

    return formatted_time

In [None]:
def generate_subtitle_file(language, segments):

    subtitle_file = f"sub-{input}.{language}.srt"
    text = ""
    for index, segment in enumerate(segments):
        segment_start = format_time(segment.start)
        segment_end = format_time(segment.end)
        text += f"{str(index+1)} \n"
        text += f"{segment_start} --> {segment_end} \n"
        text += f"{segment.text} \n"
        text += "\n"

    f = open(subtitle_file, "w")
    f.write(text)
    f.close()

    return subtitle_file

In [None]:
def add_subtitle_to_video(soft_subtitle, subtitle_file,  subtitle_language):

    video_input_stream = ffmpeg.input("output_no_sub.mp4")
    subtitle_input_stream = ffmpeg.input(subtitle_file)
    output_video = f"output_sub.mp4"
    subtitle_track_title = subtitle_file.replace(".srt", "")

    if soft_subtitle:
        stream = ffmpeg.output(
            video_input_stream, subtitle_input_stream, output_video, **{"c": "copy", "c:s": "mov_text"},
            **{"metadata:s:s:0": f"language={subtitle_language}",
            "metadata:s:s:0": f"title={subtitle_track_title}"}
        )
        ffmpeg.run(stream, overwrite_output=True)
    else:
        stream = ffmpeg.output(video_input_stream, output_video,

                               vf=f"subtitles={subtitle_file}")

        ffmpeg.run(stream, overwrite_output=True)

In [None]:
def gen_subtitle():
    language, segments, text = transcribe(audio="outfile.wav")
    subtitle_file = generate_subtitle_file(
    language=language,
    segments=segments
    )
    add_subtitle_to_video(
        soft_subtitle=False,
        subtitle_file=subtitle_file,
        subtitle_language=language
    )



In [None]:
from moviepy.editor import VideoFileClip, AudioFileClip

def replace_audio_in_video(video_path, wav_audio_path, output_path):
    # Load the original video and new audio
    video = VideoFileClip(video_path)
    new_audio = AudioFileClip(wav_audio_path)

    # Set new audio to the video
    video_with_audio = video.set_audio(new_audio)

    # Write the output video
    video_with_audio.write_videofile(output_path, codec='libx264', audio_codec='aac')


In [None]:
def denoise(input_file_path, volume, gen_sub):
  extract_audio_from_video(input_file_path, "input_audio.wav")
  restore()
  volume_adjust(volume)
  replace_audio_in_video(input_file_path, "combined_audio.wav", "output_no_sub.mp4")
  if(gen_sub):
    gen_subtitle()


In [None]:
import requests
@app.post("/denoise/")
async def denoise_video(
    video_url: str = Form(...),
    volume: int = Form(0),
    gen_sub: bool = Form(False)
):
    try:
        # Save the uploaded file
        input_path = "input_url_video.mp4"
        response = requests.get(video_url, stream=True)
        if response.status_code != 200:
            return {"error": f"Failed to download video. Status code: {response.status_code}"}

        with open(input_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        # Call your denoise function
        denoise(input_path, volume, gen_sub)

        # Determine the output file name based on subtitle generation
        output_file_name = "output_sub.mp4" if gen_sub else "output_no_sub.mp4"

        # Return the processed video file
        return FileResponse(output_file_name, media_type='video/mp4')

    except Exception as e:
        return {"error": str(e)}

In [None]:
!pip install nest_asyncio
import nest_asyncio

nest_asyncio.apply()

In [None]:
!pip install pyngrok

In [None]:
!ngrok kill

In [None]:
import threading
import uvicorn
from pyngrok import ngrok
ngrok.set_auth_token(YOUR_NGROK_AUTHTOKEN)
# Start FastAPI server in background

def run():
    uvicorn.run(app, host="0.0.0.0", port=8001)

thread = threading.Thread(target=run)
thread.start()
thread.join(timeout=1.0)
#  Start ngrok tunnel
ngrok.kill()
public_url = ngrok.connect(8001)
print("🚀 Public URL:", public_url)