In [43]:
from pytubefix import YouTube
import re


class YouTubeService:
    async def get_title_and_hashtags(self, url: str):
        yt = await self._create_youtube_instance(url)
        print("영상 정보 확인")
        title = yt.title
        description = yt.description
        hashtags = re.findall(r"#\w+", description)
        return {"title": title, "hashtags": " ".join(hashtags)}

    async def get_video_info(self, url: str):
        yt = await self._create_youtube_instance(url)
        audio_stream = yt.streams.filter(only_audio=True).first()
        print("음성 추출 완료")
        return {
            "title": yt.title,
            "audio_url": audio_stream.url if audio_stream else None,
        }

    async def _create_youtube_instance(self, url: str):
        print("YouTube 인스턴스 생성 완료")
        return YouTube(url)


In [67]:
import concurrent.futures
import math
import os
import tempfile
from typing import Any, Dict, List

import ffmpeg
import requests
import soundfile as sf
from faster_whisper import BatchedInferencePipeline, WhisperModel
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from konlpy.tag import Okt

class WhisperTranscriptionService:
    def __init__(self):
        model = WhisperModel(
            "large-v3", device='cuda', compute_type="float16"
        )
        self.model = BatchedInferencePipeline(model=model)
        self.language = None
        self.okt = Okt()
        print("Whisper 모델 초기화 완료")

    def create_session(self):
        session = requests.Session()
        retry = Retry(
            total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]
        )
        adapter = HTTPAdapter(max_retries=retry, pool_connections=100, pool_maxsize=100)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        return session

    def download_chunk(self, args):
        url, start, end, chunk_number, temp_dir = args

        headers = {"Range": f"bytes={start}-{end}"}
        session = self.create_session()

        try:
            response = session.get(url, headers=headers, stream=True)
            chunk_path = os.path.join(temp_dir, f"chunk_{chunk_number:04d}")

            with open(chunk_path, "wb") as f:
                for data in response.iter_content(chunk_size=8192):
                    f.write(data)

            return chunk_path, chunk_number
        except Exception as e:
            print(f"Error downloading chunk {chunk_number}: {str(e)}")
            return None, chunk_number

    def _single_stream_download(self, url: str, temp_dir: str) -> str:
        """단일 스트림으로 파일을 다운로드합니다."""
        print("Starting single stream download...")
        session = self.create_session()
        output_path = os.path.join(temp_dir, "complete_audio.mp4")

        try:
            with session.get(url, stream=True) as response:
                response.raise_for_status()
                with open(output_path, "wb") as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
            return output_path
        except Exception as e:
            raise Exception(f"Failed to download file: {str(e)}")

    def parallel_download(self, url: str, temp_dir: str, num_chunks: int = 10) -> str:
        """병렬 다운로드를 시도하고, 실패 시 단일 스트림으로 폴백"""
        session = self.create_session()

        try:
            # HEAD 요청으로 파일 크기 확인 시도
            response = session.head(url, allow_redirects=True)
            total_size = int(response.headers.get("content-length", 0))

            # HEAD 요청이 실패하면 GET 요청으로 시도
            if total_size == 0:
                response = session.get(url, stream=True)
                total_size = int(response.headers.get("content-length", 0))

            # 파일 크기를 여전히 확인할 수 없는 경우 단일 스트림으로 다운로드
            if total_size == 0:
                print(
                    "Warning: Could not determine file size. Falling back to single stream download."
                )
                return self._single_stream_download(url, temp_dir)
            print("Starting parallel download...")
            chunk_size = total_size // num_chunks
            chunks = []

            for i in range(num_chunks):
                start = i * chunk_size
                end = start + chunk_size - 1 if i < num_chunks - 1 else total_size - 1
                chunks.append((start, end))

            download_args = [
                (url, start, end, i, temp_dir) for i, (start, end) in enumerate(chunks)
            ]

            chunk_paths = []
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=num_chunks
            ) as executor:
                futures = executor.map(self.download_chunk, download_args)
                chunk_paths = [(path, num) for path, num in futures if path is not None]

            if not chunk_paths:
                raise Exception("No chunks were downloaded successfully")

            chunk_paths.sort(key=lambda x: x[1])
            output_path = os.path.join(temp_dir, "complete_audio.mp4")

            with open(output_path, "wb") as outfile:
                for chunk_path, _ in chunk_paths:
                    with open(chunk_path, "rb") as infile:
                        outfile.write(infile.read())
                    os.remove(chunk_path)

            return output_path

        except Exception as e:
            print(
                f"Error in parallel download: {str(e)}. Falling back to single stream download."
            )
            return self._single_stream_download(url, temp_dir)

    def convert_to_wav(self, input_path: str, output_path: str) -> bool:
        try:
            stream = ffmpeg.input(input_path)
            stream = ffmpeg.output(
                stream, output_path, acodec="pcm_s16le", ar="16000", ac="1"
            )
            ffmpeg.run(stream, capture_stdout=True, capture_stderr=True)
            return True
        except ffmpeg.Error as e:
            print("FFmpeg error:", e.stderr.decode())
            return False

    def process_audio_chunk(self, chunk_data: tuple,promp:str = None,filtered_words:list = None) -> List[Dict[str, Any]]:
        audio_path, start_time, duration = chunk_data
        try:
            segments, info = self.model.transcribe(
                audio_path,
                beam_size=5,
                best_of=7,
                batch_size=32,
                temperature=0.7,
                word_timestamps=True,
                initial_prompt=f"음성 제목: {promp}",
                repetition_penalty=2,
                no_repeat_ngram_size=3,
                length_penalty=1.1,
                log_prob_threshold=-0.5,
                no_speech_threshold=0.7,
                patience=1.2,
                hotwords=filtered_words
            )
            if info and hasattr(info, "language"):
                self.language = info.language
            return self._process_segments(segments, start_time)
        except Exception as e:
            print(f"Error processing chunk at {start_time}: {str(e)}")
            return []

    def _process_segments(
        self, segments, start_time: float = 0
    ) -> List[Dict[str, Any]]:
        transcript = []
        for segment in segments:
            transcript.append(
                {
                    "start": round(segment.start + start_time, 2),
                    "end": round(segment.end + start_time, 2),
                    "text": segment.text,
                }
            )
        return transcript

    async def process_with_progress(
        self, url: str, prompt:str, filtered_words:str,chunk_duration: int = 30, num_download_chunks: int = 10
    ) -> List[Dict[str, Any]]:
        with tempfile.TemporaryDirectory() as temp_dir:
            mp4_path = self.parallel_download(url, temp_dir, num_download_chunks)
            print("Download complete!")

            wav_path = os.path.join(temp_dir, "audio.wav")
            if not self.convert_to_wav(mp4_path, wav_path):
                raise Exception("Failed to convert audio to WAV format")

            wav_info = sf.info(wav_path)
            total_duration = wav_info.duration
            total_chunks = math.ceil(total_duration / chunk_duration)

            chunks_data = []
            for i in range(total_chunks):
                start_time = i * chunk_duration
                chunk_wav_path = os.path.join(temp_dir, f"chunk_{i}.wav")

                duration = min(chunk_duration, total_duration - start_time)
                stream = ffmpeg.input(wav_path, ss=start_time, t=duration)
                stream = ffmpeg.output(
                    stream, chunk_wav_path, acodec="pcm_s16le", ar="16000", ac="1"
                )
                ffmpeg.run(stream, quiet=True)

                chunks_data.append((chunk_wav_path, start_time, duration))

            all_segments = []
            for chunk_data in chunks_data:
                segments = self.process_audio_chunk(chunk_data,prompt,filtered_words)
                all_segments.extend(segments)

                if os.path.exists(chunk_data[0]):
                    os.remove(chunk_data[0])

        return all_segments

    async def transcribe(self, audio_url: str,prompt: str = None) -> Dict[str, Any]:
        try:
            try:
                tagged = self.okt.pos(prompt)
                filtered_words = []
                for word, tag in tagged:
                    if tag == "Noun" or tag == "Hashtag":
                        filtered_words.append(word)
            except:
                filtered_words = None
            segments = await self.process_with_progress(
                audio_url, prompt, filtered_words,chunk_duration=30, num_download_chunks=10
            )

            print("텍스트 추출 완료")

            return {"script": segments, "language": self.language}
        except Exception as e:
            print(f"Error in transcribe: {str(e)}")
            raise


In [68]:
youtube = YouTubeService()

In [69]:
video_info = await youtube.get_video_info("https://youtu.be/EMMC0ym0QOI?si=bx7raBo-QwR3MGy7")

YouTube 인스턴스 생성 완료
음성 추출 완료


In [70]:
video_info["audio_url"]

'https://rr3---sn-ab02a0nfpgxapox-bh2es.googlevideo.com/videoplayback?expire=1730288214&ei=9sUhZ-S4KqmS1d8P2fC86AI&ip=106.254.102.210&id=o-APMMqsFIdvorwX2g8mZSYwSI97kewVVffUbfm63s3jaW&itag=139&source=youtube&requiressl=yes&xpc=EgVo2aDSNQ%3D%3D&met=1730266614%2C&mh=in&mm=31%2C26&mn=sn-ab02a0nfpgxapox-bh2es%2Csn-un57sne7&ms=au%2Conr&mv=m&mvi=3&pcm2cms=yes&pl=18&rms=au%2Cau&initcwndbps=631250&vprv=1&mime=audio%2Fmp4&rqh=1&gir=yes&clen=3098057&dur=507.866&lmt=1730247679854257&mt=1730266148&fvip=5&keepalive=yes&fexp=51312688%2C51326932&c=ANDROID_VR&txp=5532434&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cxpc%2Cvprv%2Cmime%2Crqh%2Cgir%2Cclen%2Cdur%2Clmt&sig=AJfQdSswRQIhAJ1iwrkBkHIGgBxoEOdXDVUki9JxnzCxlH4NwZNL-c7VAiAIuPvuo8iG1lVUslal7d0UgXVSbKRlNX40w6N0WY8ndg%3D%3D&lsparams=met%2Cmh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpcm2cms%2Cpl%2Crms%2Cinitcwndbps&lsig=ACJ0pHgwRgIhALIR7EC_OxXJaEcXyq70r6pKBJzcOc-VBpVflruAyVIaAiEA1Xwp3212wT-5N5XJYpYahE_QW5RwDB_BtFuDeiiUkLA%3D'

In [71]:
video_info["title"]

'시리가 AI 에이전트가 된다?! 애플의 AI, Apple Intelligence 알아보기  iOS 18.1, iOS 18.2 개발자 Beta 업데이트'

In [72]:
whisper = WhisperTranscriptionService()

Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.4.0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../miniconda3/envs/youtube/lib/python3.10/site-packages/faster_whisper/assets/pyannote_vad_model.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.4.1+cu121. Bad things might happen unless you revert torch to 1.x.
Whisper 모델 초기화 완료


In [50]:
transcript = await whisper.transcribe(video_info["audio_url"],video_info["title"])

Starting parallel download...
Download complete!
텍스트 추출 완료


In [51]:
for script in transcript['script']:
    print(script['text'],"\n")

 올해 WWDC에서 공개된 애플의 AI, Apple Intelligence가 드디어 iOS 18.1 정식 버전으로 누구나 사용할 수 있게 공개가 되었습니다 또 추가로 더 많은 AI 기능들이 iOS 18.2 개발자 베타 버전을 통해 공개가 되었는데요 업데이트된 AI 기능들을 하나씩 함께 살펴보겠습니다 이번 iOS 18.1에 포함된 AI 기능들은 강력해진 시리부터 글쓰기 도구, 알림 관련 AI, 클린업 인데요 강력해진 시리는 디자인이 변경되었고 

 애플 인텔리전스로 업데이트된 시리는 사용자가 화면에서 보고 있는 것을 파악하고 그에 맞춤 작업을 수행할 수 있는데요 뿐만 아니라 더 자연스러운 대화를 지원해서 사용자가 말을 더듬거나 멈추더라도 이해하고 반응할 수 있게 되었고 이전 대화를 기억해 대화 맥락도 이해합니다 다음으로 글쓰기 도구는 이렇게 버튼만 딸깍하면 텍스트를 교정하고 다시 쓰고 요약해주는 기능입니다 맞춤법을 고치거나 텍스트의 느낌을 좀 더 친근하게 또는 전문적으로 바꿀 수도 있습니다 

 네이티브 앱은 물론 서드파티 앱에서도 활용이 가능합니다. 메일 앱이나 메시지 앱에서는 이메일 내용을 요약해서 보여주고 알림을 요약해서 보여주는 기능과 답변을 추천해주는 스마트 답장 기능도 추가되었습니다. Reduce Interruption 이라는 새로운 방해 금지 모드도 추가되었습니다. AI가 알림의 중요도를 판단해서 중요한 알림만 노출시켜줍니다. 애플 워치에서도 동일하게 이 방해 금지 모드를 적용할 수 있습니다. 마지막으로 사진의 특정 부분을 지우는 

 기능도 사용 가능합니다. 이번 iOS 18.2 개발자 베타 업데이트에서는 애플 인텔리전스의 다른 기능들도 미리 체험해 볼 수 있었는데요. Genmoji, Image Playground, Visual Intelligence, ChatGPT와 통합된 Siri를 사용할 수 있습니다. 이번 업데이트에서는 다양한 이미지 생성 AI 기능이 포함되었는데요. 베타 버전에서는 웨이트 리스트를 신청하고 승인이 되어야 쓸 수 있습니다. 

In [73]:
transcript = await whisper.transcribe(video_info["audio_url"])

Starting parallel download...
Download complete!
텍스트 추출 완료


In [74]:
for script in transcript['script']:
    print(script['text'],"\n")

 올해 WWDC에서 공개된 애플의 AI, 애플 인텔리전스가 드디어 iOS 18.1 정식 버전으로 누구나 사용할 수 있게 공개가 되었습니다 또 추가로 더 많은 AI 기능들이 iOS 18.2 개발자 베타 버전을 통해 공개가 되었는데요 업데이트된 AI 기능들을 하나씩 함께 살펴보겠습니다 이번 iOS 18.1에 포함된 AI 기능들은 강력해진 Siri부터 글쓰기 도구, 알림 관련 AI, 클린업인데요 강력해진 Siri는 디자인이 변경되었고 

 애플 인텔리전스로 업데이트된 Siri는 사용자가 화면에서 보고 있는 것을 파악하고 그에 맞춤 작업을 수행할 수 있는데요 뿐만 아니라 더 자연스러운 대화를 지원해서 사용자가 말을 더듬거나 멈추더라도 이해하고 반응할 수 있게 되었고 이전 대화를 기억해 대화 맥락도 이해합니다 다음으로 글쓰기 도구는 이렇게 버튼만 딸깍하면 텍스트를 교정하고 다시 쓰고 요약해주는 기능입니다 맞춤법을 고치거나 텍스트의 느낌을 좀 더 친근하게 또는 전문적으로 바꿀 수도 있습니다 

 네이티브 앱은 물론 서드파티 앱에서도 활용이 가능합니다. 메일 앱이나 메시지 앱에서는 이메일 내용을 요약해서 보여주고 알림을 요약해서 보여주는 기능과 답변을 추천해주는 스마트 답장 기능도 추가되었습니다. Reduce Interruption이라는 새로운 방해 금지 모드도 추가되었습니다. AI가 알림의 중요도를 판단해서 중요한 알림만 노출시켜줍니다. 애플 워치에서도 동일하게 이 방해 금지 모드를 적용할 수 있습니다. 마지막으로 사진의 특정 부분을 지우는 

 이번 iOS 18.2 개발자 베타 업데이트에서는 애플 인텔리전스의 다른 기능들도 미리 체험해 볼 수 있었는데요 이번 업데이트에서는 다양한 이미지 생성 AI 기능이 포함되었는데요 베타 버전에서는 웨이트 리스트를 신청하고 승인이 되어야 쓸 수 있습니다 

 그리고 원하는 이모지에 간단한 설명을 입력하면 제모지가 생성되는데요 생성된 이모지 중에 마음에 드는 것을 선택해 이모지를 사용하듯 사용이 가능합니다 다음으로 이미지 플레이그라운드

In [3]:
url = "https://youtu.be/AA621UofTUA?si=gn4XutRMWUDSYLFL"

In [2]:
text = """Please summarize the sentence according to the following FINAL REQUEST. 
FINAL REQUEST:
1. The provided summary sections are partial summaries of one document. Please combine them into a single cohesive summary.
2. Summarize the main points in bullet points in KOREAN.
3. Each summarized sentence must start with a single emoji that fits the meaning of the sentence.
4. Use various emojis to make the summary more interesting, but keep it concise and relevant.
5. Focus on identifying and presenting only one main topic and one overall summary for the document.
6. Avoid redundant or repeated points, and ensure that the summary covers all key ideas without introducing multiple conclusions or topics.

CONTEXT: 
{context}

FINAL SUMMARY:"""

In [5]:
print('Please summarize the sentence according to the following REQUEST.\nREQUEST:\n1. Summarize the main points in bullet points in KOREAN.\n2. Each summarized sentence must start with an emoji that fits the meaning of the each sentence.\n3. Use various emojis to make the summary more interesting.\n4. Translate the summary into KOREAN if it is written in ENGLISH.\n5. DO NOT translate any technical terms.\n6. DO NOT include any unnecessary information.\n\nCONTEXT:\n{context}\n\nSUMMARY:"\n')

Please summarize the sentence according to the following REQUEST.
REQUEST:
1. Summarize the main points in bullet points in KOREAN.
2. Each summarized sentence must start with an emoji that fits the meaning of the each sentence.
3. Use various emojis to make the summary more interesting.
4. Translate the summary into KOREAN if it is written in ENGLISH.
5. DO NOT translate any technical terms.
6. DO NOT include any unnecessary information.

CONTEXT:
{context}

SUMMARY:"



In [4]:
print("Please summarize the sentence according to the following FINAL REQUEST. \nFINAL REQUEST:\n1. The provided summary sections are partial summaries of one document. Please combine them into a single cohesive summary.\n2. Summarize the main points in bullet points in KOREAN, but DO NOT translate any technical terms.\n3. Each summarized sentence must start with a single emoji that fits the meaning of the sentence.\n4. Use various emojis to make the summary more interesting, but keep it concise and relevant.\n5. Focus on identifying and presenting only one main topic and one overall summary for the document.\n6. Avoid redundant or repeated points, and ensure that the summary covers all key ideas without introducing multiple conclusions or topics.\n7. Please refer to each summary and indicate the key topic.\n8. If the original text is in English, we have already provided a summary translated into Korean, so please do not provide a separate translation.\n\nCONTEXT: \n{context}\n\nFINAL SUMMARY:")

Please summarize the sentence according to the following FINAL REQUEST. 
FINAL REQUEST:
1. The provided summary sections are partial summaries of one document. Please combine them into a single cohesive summary.
2. Summarize the main points in bullet points in KOREAN, but DO NOT translate any technical terms.
3. Each summarized sentence must start with a single emoji that fits the meaning of the sentence.
4. Use various emojis to make the summary more interesting, but keep it concise and relevant.
5. Focus on identifying and presenting only one main topic and one overall summary for the document.
6. Avoid redundant or repeated points, and ensure that the summary covers all key ideas without introducing multiple conclusions or topics.
7. Please refer to each summary and indicate the key topic.
8. If the original text is in English, we have already provided a summary translated into Korean, so please do not provide a separate translation.

CONTEXT: 
{context}

FINAL SUMMARY:


In [4]:
from pytubefix import YouTube

yt = YouTube(url)
audio_stream = yt.streams.filter(only_audio=True).first()

In [3]:
# from faster_whisper import WhisperModel, BatchedInferencePipeline
# from tqdm import tqdm


# model = WhisperModel(
#     "large-v3", device='cuda', compute_type="bfloat16"
# )
# model = BatchedInferencePipeline(model=model)  # 배치 모델일 경우
# print("Whisper 모델 초기화 완료")


# segments, info = model.transcribe(
#     audio_stream.url,
#     batch_size=64,  # 배치 모델인 경우
#     repetition_penalty=1.5,
#     beam_size=10,
#     patience=2,
#     no_repeat_ngram_size=4,
# )

In [6]:
from faster_whisper import WhisperModel, BatchedInferencePipeline
from tqdm import tqdm
import soundfile as sf
import math
import requests
import tempfile
import os
import concurrent.futures
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import threading
import ffmpeg

class ProgressBar:
    def __init__(self, total_size, desc="Downloading"):
        self.pbar = tqdm(total=total_size, unit='iB', unit_scale=True, desc=desc)
        self.lock = threading.Lock()

    def update(self, size):
        with self.lock:
            self.pbar.update(size)

    def close(self):
        self.pbar.close()

def create_session():
    session = requests.Session()
    retry = Retry(
        total=5,
        backoff_factor=0.1,
        status_forcelist=[500, 502, 503, 504]
    )
    adapter = HTTPAdapter(
        max_retries=retry,
        pool_connections=100,
        pool_maxsize=100
    )
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

def download_chunk(args):
    url, start, end, chunk_number, temp_dir, progress_bar = args
    
    headers = {'Range': f'bytes={start}-{end}'}
    session = create_session()
    
    try:
        response = session.get(url, headers=headers, stream=True)
        chunk_path = os.path.join(temp_dir, f'chunk_{chunk_number:04d}')
        
        with open(chunk_path, 'wb') as f:
            for data in response.iter_content(chunk_size=8192):
                size = f.write(data)
                progress_bar.update(size)
        
        return chunk_path, chunk_number
    except Exception as e:
        print(f"Error downloading chunk {chunk_number}: {str(e)}")
        return None, chunk_number

def parallel_download(url, temp_dir, num_chunks=10):
    session = create_session()
    response = session.head(url)
    total_size = int(response.headers.get('content-length', 0))
    
    if total_size == 0:
        raise ValueError("Could not determine file size")
    
    chunk_size = total_size // num_chunks
    chunks = []
    
    for i in range(num_chunks):
        start = i * chunk_size
        end = start + chunk_size - 1 if i < num_chunks - 1 else total_size - 1
        chunks.append((start, end))
    
    progress_bar = ProgressBar(total_size, "Parallel downloading")
    
    download_args = [
        (url, start, end, i, temp_dir, progress_bar)
        for i, (start, end) in enumerate(chunks)
    ]
    
    chunk_paths = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_chunks) as executor:
        futures = executor.map(download_chunk, download_args)
        chunk_paths = [(path, num) for path, num in futures if path is not None]
    
    progress_bar.close()
    
    chunk_paths.sort(key=lambda x: x[1])
    output_path = os.path.join(temp_dir, "complete_audio.mp4")
    
    with open(output_path, 'wb') as outfile:
        for chunk_path, _ in chunk_paths:
            with open(chunk_path, 'rb') as infile:
                outfile.write(infile.read())
            os.remove(chunk_path)
    
    return output_path

def convert_to_wav(input_path, output_path):
    """MP4를 WAV로 변환"""
    try:
        stream = ffmpeg.input(input_path)
        stream = ffmpeg.output(stream, output_path, 
                             acodec='pcm_s16le', 
                             ar='16000',
                             ac='1')
        ffmpeg.run(stream, capture_stdout=True, capture_stderr=True)
        return True
    except ffmpeg.Error as e:
        print('FFmpeg error:', e.stderr.decode())
        return False

def process_audio_chunk(chunk_data):
    """개별 오디오 청크 처리"""
    model, audio_path, start_time, duration = chunk_data
    try:
        segments, info = model.transcribe(
            audio_path,
            beam_size=5,
            batch_size=32,
            word_timestamps=True,
            initial_prompt=None
        )
        
        # segments를 리스트로 변환하고 시간 조정
        chunk_segments = []
        for segment in segments:
            segment_dict = {
                'start': segment.start + start_time,
                'end': segment.end + start_time,
                'text': segment.text,
                'words': [
                    {
                        'start': word.start + start_time,
                        'end': word.end + start_time,
                        'word': word.word,
                        'probability': word.probability
                    }
                    for word in segment.words
                ]
            }
            chunk_segments.append(segment_dict)
        
        return chunk_segments
    except Exception as e:
        print(f"Error processing chunk at {start_time}: {str(e)}")
        return []

def process_with_progress(url, model, chunk_duration=30, num_download_chunks=10):
    """
    전체 처리 프로세스 관리
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        print("Starting parallel download...")
        mp4_path = parallel_download(url, temp_dir, num_download_chunks)
        print("Download complete!")
        
        # MP4를 WAV로 변환
        wav_path = os.path.join(temp_dir, "audio.wav")
        if not convert_to_wav(mp4_path, wav_path):
            raise Exception("Failed to convert audio to WAV format")
        
        # WAV 파일 정보 읽기
        wav_info = sf.info(wav_path)
        total_duration = wav_info.duration
        
        # 청크 계산
        total_chunks = math.ceil(total_duration / chunk_duration)
        
        # 진행률 표시
        pbar = tqdm(total=total_chunks, desc="Processing audio chunks")
        
        # 청크 처리를 위한 데이터 준비
        chunks_data = []
        for i in range(total_chunks):
            start_time = i * chunk_duration
            chunk_wav_path = os.path.join(temp_dir, f"chunk_{i}.wav")
            
            # 청크 추출
            duration = min(chunk_duration, total_duration - start_time)
            stream = ffmpeg.input(wav_path, ss=start_time, t=duration)
            stream = ffmpeg.output(stream, chunk_wav_path, 
                                 acodec='pcm_s16le', 
                                 ar='16000',
                                 ac='1')
            ffmpeg.run(stream, quiet=True)
            
            chunks_data.append((model, chunk_wav_path, start_time, duration))
        
        # 청크 처리 및 결과 수집
        all_segments = []
        for chunk_data in chunks_data:
            segments = process_audio_chunk(chunk_data)
            all_segments.extend(segments)
            pbar.update(1)
            
            # 사용한 청크 파일 삭제
            if os.path.exists(chunk_data[1]):
                os.remove(chunk_data[1])
        
        pbar.close()
        
    return all_segments

# 모델 초기화
model = WhisperModel(
    "large-v3", 
    device='cuda', 
    compute_type="float16"  # bfloat16 대신 float16 사용
)
print("Whisper 모델 초기화 완료")
model = BatchedInferencePipeline(model=model)

# 트랜스크립션 실행
segments = process_with_progress(
    audio_stream.url,
    model,
    chunk_duration=30,
    num_download_chunks=10
)

# 결과 저장
for i, segment in enumerate(segments):
    print(f"{segment['start']:.2f} -> {segment['end']:.2f}: {segment['text']}")

Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.4.0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../miniconda3/envs/youtube/lib/python3.10/site-packages/faster_whisper/assets/pyannote_vad_model.bin`


Whisper 모델 초기화 완료
Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.4.1+cu121. Bad things might happen unless you revert torch to 1.x.
Starting parallel download...


Parallel downloading: 100%|██████████| 32.3M/32.3M [00:02<00:00, 11.3MiB/s]


Download complete!


It can be re-enabled by calling
   >>> import torch
   >>> torch.backends.cuda.matmul.allow_tf32 = True
   >>> torch.backends.cudnn.allow_tf32 = True
See https://github.com/pyannote/pyannote-audio/issues/1370 for more details.

  return _nested.nested_tensor(
Processing audio chunks: 100%|██████████| 177/177 [03:07<00:00,  1.06s/it]

0.00 -> 29.82:  꼼꼼한 딥러닝 논문 리뷰와 코드 실습. 이번 시간에 리뷰할 논문은 현대 딥러닝 기반의 자연어처리 기술의 핵심 아키텍처가 되고 있는 트랜스포머입니다. 트랜스포머 논문의 원래 제목은 Attention is all you need 입니다. 논문의 제목에서 알 수 있듯이 트랜스포머라는 아키텍처에는 이 Attention이라고 하는 것이 가장 메인 아이디어로서 사용이 된다는 걸 알 수 있습니다. 실제로 트랜스포머는 Attention이라는 메커니즘을 전적으로 활용하는 아키텍처입니다.
30.00 -> 52.40:  트랜스포머가 나오게 된 계기를 이해하기 위해서 딥러닝 기반의 기계 번역 발전 과정에 대해 확인해 보겠습니다. 2021년 기준으로 최신 자연어처리 쪽 고성능 모델들은 이런 트랜스포머 아키텍처를 기반으로 하고 있습니다. 최근까지 화제가 되었던 GPT와 BERT는 모두 이러한 트랜스포머의 아키텍처를 적절히 활용하여 좋은 성능을 내고 있습니다.
60.00 -> 89.66:  있다는 점이 특징입니다. 자연어처리 태스크 중에서 가장 대표적이면서 중요한 태스크 중 하나는 기계 번역입니다. 실제로 기계 번역 기술의 발전 과정을 확인해 보시면 1986년도 즈음에 RNN이 제한되었고 그로부터 약 10년 정도가 지난 뒤에 LSTM이 등장하였습니다. 이러한 LSTM을 활용하면 다양한 시퀀스 정보를 모델링할 수 있는데요. 대표적으로 주가 예측, 주기함수 예측 등이 가능합니다. 이러한 LSTM을 활용해서 2014년도에는 딥러닝 기반 기술로
90.00 -> 118.84:  시퀀스트 시퀀스가 등장하였습니다. 시퀀스트 시퀀스는 현대의 딥러닝 기술들이 다시 빠르게 나오기 시작한 시점인 2014년도에 이러한 LSTM을 활용해서 고정된 크기의 컨텍스트 벡터를 사용하는 방식으로 번역을 수행하는 방법을 제안하였습니다. 다만 이러한 시퀀스트 시퀀스 모델이 나왔을 때의 시점만 하더라도 고정된 크기의 컨텍스트 벡터를 쓰고 있기 때문에 소스 문장을 전부 고정된 크기의 한 벡터에다가




In [1]:
# url = "https://youtu.be/AA621UofTUA?si=gn4XutRMWUDSYLFL"

# from faster_whisper import WhisperModel
# from tqdm import tqdm
# import numpy as np
# import soundfile as sf
# import tempfile
# import os
# import ffmpeg
# import subprocess
# from yt_dlp import YoutubeDL
# import io

# def get_audio_stream(url):
#     """URL에서 오디오 스트림 정보를 가져옵니다."""
#     ydl_opts = {
#         'format': 'bestaudio/best',
#         'quiet': True,
#         'no_warnings': True,
#         'extract_audio': True
#     }
    
#     with YoutubeDL(ydl_opts) as ydl:
#         info = ydl.extract_info(url, download=False)
#         audio_url = info['url']
#         duration = info.get('duration', 0)
        
#         return audio_url, duration

# def process_stream_with_progress(url, model, chunk_duration=30):
#     """
#     스트리밍 방식으로 오디오를 처리합니다.
    
#     Parameters:
#     - url: 오디오 URL
#     - model: WhisperModel 인스턴스
#     - chunk_duration: 각 청크의 길이(초)
#     """
#     # 스트림 URL 가져오기
#     audio_url, total_duration = get_audio_stream(url)
    
#     # ffmpeg 명령어 설정
#     ffmpeg_cmd = [
#         'ffmpeg',
#         '-i', audio_url,
#         '-f', 'wav',
#         '-ar', '16000',
#         '-ac', '1',
#         '-hide_banner',
#         '-loglevel', 'error',
#         'pipe:1'
#     ]
    
#     # 진행률 표시 설정
#     total_chunks = int(np.ceil(total_duration / chunk_duration))
#     pbar = tqdm(total=total_chunks, desc="Processing audio chunks")
    
#     # 결과 저장용 리스트
#     all_segments = []
    
#     try:
#         # ffmpeg 프로세스 시작
#         process = subprocess.Popen(
#             ffmpeg_cmd,
#             stdout=subprocess.PIPE,
#             bufsize=10**8  # 버퍼 크기 설정
#         )
        
#         # 임시 디렉토리 생성
#         with tempfile.TemporaryDirectory() as temp_dir:
#             chunk_size = int(16000 * chunk_duration * 2)  # 16000Hz * seconds * 2 bytes per sample
#             chunk_number = 0
            
#             while True:
#                 # 청크 읽기
#                 audio_chunk = process.stdout.read(chunk_size)
#                 if not audio_chunk:
#                     break
                
#                 # 청크를 임시 파일로 저장
#                 chunk_path = os.path.join(temp_dir, f'chunk_{chunk_number}.wav')
#                 with open(chunk_path, 'wb') as f:
#                     # WAV 헤더 작성
#                     f.write(b'RIFF')
#                     f.write((chunk_size + 36).to_bytes(4, 'little'))
#                     f.write(b'WAVE')
#                     f.write(b'fmt ')
#                     f.write((16).to_bytes(4, 'little'))
#                     f.write((1).to_bytes(2, 'little'))  # PCM
#                     f.write((1).to_bytes(2, 'little'))  # Mono
#                     f.write((16000).to_bytes(4, 'little'))  # Sample rate
#                     f.write((32000).to_bytes(4, 'little'))  # Byte rate
#                     f.write((2).to_bytes(2, 'little'))  # Block align
#                     f.write((16).to_bytes(2, 'little'))  # Bits per sample
#                     f.write(b'data')
#                     f.write(len(audio_chunk).to_bytes(4, 'little'))
#                     f.write(audio_chunk)
                
#                 try:
#                     # 청크 처리
#                     segments, _ = model.transcribe(
#                         chunk_path,
#                         beam_size=5,
#                         batch_size=32,
#                         word_timestamps=True,
#                         condition_on_previous_text=True
#                     )
                    
#                     # 시간 오프셋 조정 및 세그먼트 저장
#                     time_offset = chunk_number * chunk_duration
#                     for segment in segments:
#                         segment_dict = {
#                             'start': segment.start + time_offset,
#                             'end': segment.end + time_offset,
#                             'text': segment.text,
#                             'words': [
#                                 {
#                                     'start': word.start + time_offset,
#                                     'end': word.end + time_offset,
#                                     'word': word.word,
#                                     'probability': word.probability
#                                 }
#                                 for word in segment.words
#                             ]
#                         }
#                         all_segments.append(segment_dict)
                
#                 except Exception as e:
#                     print(f"Error processing chunk {chunk_number}: {str(e)}")
                
#                 finally:
#                     # 임시 파일 삭제
#                     if os.path.exists(chunk_path):
#                         os.remove(chunk_path)
                
#                 # 진행률 업데이트
#                 pbar.update(1)
#                 chunk_number += 1
    
#     finally:
#         pbar.close()
#         if process.poll() is None:
#             process.terminate()
#             process.wait()
    
#     return all_segments

# # 모델 초기화
# model = WhisperModel(
#     "large-v3", 
#     device='cuda', 
#     compute_type="float16"
# )
# print("Whisper 모델 초기화 완료")

# # 트랜스크립션 실행
# segments = process_stream_with_progress(
#     url,  # 유튜브 URL
#     model,
#     chunk_duration=30
# )

# # 결과 출력
# for segment in segments:
#     print(f"{segment['start']:.2f} -> {segment['end']:.2f}: {segment['text']}")

Whisper 모델 초기화 완료


Processing audio chunks:   0%|          | 0/177 [00:11<?, ?it/s]


KeyboardInterrupt: 

In [7]:
from langchain.docstore.document import Document

In [10]:
import json
with open("script.json","r",encoding='utf-8') as f:
    data = json.load(f)

In [14]:
documents = [
            Document(page_content="\n".join([t["text"] for t in data]))
        ]

In [15]:
documents[0].page_content

"In the last chapter, you and I started to step through the internal workings of a transformer.\nThis is one of the key pieces of technology inside large language models, and a lot of\nother tools in the modern wave of AI.\nIt first hit the scene in a now-famous 2017 paper called Attention is All You Need, and\nin this chapter, you and I will dig into what this attention mechanism is, visualizing how\nit processes data.\nAs a quick recap, here's the important context I want you to have in mind.\nThe goal of the model that you and I are studying is to take in a piece of text and predict\nwhat word comes next.\nThe input text is broken up into little pieces that we call tokens, and these are very often\nwords or pieces of words, but just to make the examples in this video easier for you\nand me to think about, let's simplify by pretending that tokens are always just words.\nThe first step in a transformer is to associate each token with a high-dimensional vector,\nwhat we call its embedd

In [20]:
import tiktoken

def calculate_tokens(text, model="gpt-4o-mini"):
    encoding = tiktoken.encoding_for_model(model)
    tokens = encoding.encode(text)
    return len(tokens)

In [21]:
calculate_tokens(documents[0].page_content)

5728

In [22]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
summarize_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000, chunk_overlap=500
        )

In [23]:
split_docs = summarize_splitter.split_documents(documents)

In [24]:
len(split_docs)

19

In [42]:
type(split_docs[0])

langchain_core.documents.base.Document

In [27]:
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chat_models import ChatOpenAI
from langchain import hub
summary_prompt = hub.pull("teddynote/summary-stuff-documents-korean")
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.7, streaming=True)
summary_chain = create_stuff_documents_chain(llm, summary_prompt)

  llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.7, streaming=True)


In [45]:
sumaries = []
for split_doc in split_docs:
    print(type(split_doc.page_content))
    partial_summary = summary_chain.invoke({"context": [split_doc]})
    sumaries.append(partial_summary)

<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>
<class 'str'>


In [55]:
partial_summary = Document(page_content= "\n".join(sumaries))

In [56]:
SUMMARY_RESULT = summary_chain.invoke(
                {"context": partial_summaries_doc}
            )

In [57]:
len(SUMMARY_RESULT.split("\n"))

8

In [17]:
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.chat_models import ChatOpenAI
from langchain.chains import create_qa_with_sources_chain
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
import os
from pytubefix import YouTube
import asyncio
import torch
from faster_whisper import WhisperModel

In [18]:
load_dotenv()  # .env 파일에서 환경 변수 로드

# OpenAI API 키 설정
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

In [19]:
def get_video_info(url):
    yt = YouTube(url)
    audio_stream = yt.streams.filter(only_audio=True).first()
    return {
        "title": yt.title,
        "audio_url": audio_stream.url if audio_stream else None
    }


In [20]:
video_url = "https://www.youtube.com/shorts/a--NSC19MXM"
video_info = get_video_info(video_url)
print(f"Video Title: {video_info['title']}")

Video Title: 가장 쉬운 까르보나라


In [21]:
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"

In [22]:
whisper_model = WhisperModel("large-v3", device=device, compute_type=compute_type)

def transcribe_audio(audio_url):
    segments, info = whisper_model.transcribe(audio_url)
    transcript = [{"text": segment.text, "start": segment.start, "end": segment.end} for segment in segments]
    return {"script": transcript, "language": info.language}

In [23]:
transcript = transcribe_audio(video_info['audio_url'])
print(f"Transcript Language: {transcript['language']}")
print(f"First few lines of transcript: {transcript['script'][:3]}")

transcribe.py       :324  2024-10-12 11:30:14,287 Processing audio with duration 00:59.118
transcribe.py       :425  2024-10-12 11:30:19,512 Detected language 'ko' with probability 1.00


Transcript Language: ko
First few lines of transcript: [{'text': ' 한 남자가 베이컨을 가져오는데요', 'start': 0.0, 'end': 10.46}, {'text': ' 갑자기 계란을 가져와 깨부수기 시작합니다', 'start': 10.46, 'end': 12.74}, {'text': ' 노른자를 건져내 따로 담아줍니다', 'start': 12.74, 'end': 14.74}]


In [24]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10)
summary_prompt = hub.pull("teddynote/summary-stuff-documents-korean")
llm = ChatOpenAI(
            model_name="gpt-4o-mini",
            temperature=0.7,
            streaming=True,
        )

  llm = ChatOpenAI(


In [25]:
from langchain.chains.combine_documents import create_stuff_documents_chain

In [26]:
from langchain_community.document_loaders import JSONLoader, TextLoader
docs = TextLoader("script.txt").load()

In [27]:
docs

[Document(metadata={'source': 'script.txt'}, page_content="In the last chapter, you and I started to step through the internal workings of a transformer.\nThis is one of the key pieces of technology inside large language models, and a lot of\nother tools in the modern wave of AI.\nIt first hit the scene in a now-famous 2017 paper called Attention is All You Need, and\nin this chapter, you and I will dig into what this attention mechanism is, visualizing how\nit processes data.\nAs a quick recap, here's the important context I want you to have in mind.\nThe goal of the model that you and I are studying is to take in a piece of text and predict\nwhat word comes next.\nThe input text is broken up into little pieces that we call tokens, and these are very often\nwords or pieces of words, but just to make the examples in this video easier for you\nand me to think about, let's simplify by pretending that tokens are always just words.\nThe first step in a transformer is to associate each toke

In [28]:
from langchain_core.documents import Document
document = [Document(page_content="\n".join([t["text"] for t in transcript["script"]]))]

In [29]:
summary_chain = create_stuff_documents_chain(llm,summary_prompt)
result = await summary_chain.ainvoke({"context": document})

_client.py          :1786 2024-10-12 11:31:22,301 HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


In [30]:
result

'- 🥓 한 남자가 베이컨을 가져옴.  \n- 🥚 계란을 깨고 노른자를 따로 담음.  \n- 🥄 흰자는 생으로 먹고, 소금을 넣음.  \n- 🌳 나무젓가락을 사용함.  \n- 🍝 파스타를 넣고 재료를 섞음.  \n- 🧂 후추와 그라라빠다노를 뿌림.  \n- 🔥 베이컨을 구워 계란치즈 소스를 섞음.  \n- 🍽️ 까르보나라를 예쁘게 담아 완성함.  \n- 🤤 저도 한번 꼭 먹어보고 싶네요.  '

In [31]:
embeddings = OpenAIEmbeddings()

  embeddings = OpenAIEmbeddings()


In [32]:
result.strip().split("\n")

['- 🥓 한 남자가 베이컨을 가져옴.  ',
 '- 🥚 계란을 깨고 노른자를 따로 담음.  ',
 '- 🥄 흰자는 생으로 먹고, 소금을 넣음.  ',
 '- 🌳 나무젓가락을 사용함.  ',
 '- 🍝 파스타를 넣고 재료를 섞음.  ',
 '- 🧂 후추와 그라라빠다노를 뿌림.  ',
 '- 🔥 베이컨을 구워 계란치즈 소스를 섞음.  ',
 '- 🍽️ 까르보나라를 예쁘게 담아 완성함.  ',
 '- 🤤 저도 한번 꼭 먹어보고 싶네요.']

In [33]:
documents = text_splitter.create_documents([t["text"] for t in transcript["script"]])
for doc in documents:
    doc.page_content += "\n" + summary.strip()

NameError: name 'summary' is not defined

In [1]:
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv("RUNPOD_API_KEY")
you_url = "https://youtube.com/shorts/a--NSC19MXM?si=yiun-HK_7wX1sNvL"

In [2]:
import requests
import os
from dotenv import load_dotenv


# RunPod RUNSYNC 엔드포인트 URL
url = "https://api.runpod.ai/v2/uq96boxkzy99ev/runsync"

# FastAPI의 /hello 엔드포인트로 요청하기 위한 데이터 (내부적으로 사용할 파라미터 설정)
body = {"input":{
    "api":{
        "method":"POST",
        "endpoint":"/ping",
    },
    "payload":{},
}}
# 요청 헤더에 API 키 추가
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

# RUNSYNC 요청 보내기
response = requests.post(url, json=body, headers=headers)

# 응답 확인
if response.status_code == 200:
    print("Response:", response.json())
else:
    print(f"Error {response.status_code}: {response.text}")


Response: {'id': 'sync-26147017-ff6e-408d-9622-80c484868c42-e1', 'status': 'IN_QUEUE'}


In [7]:
import requests

# 작업 ID (작업 완료된 job ID)
job_id = response.json()['id']

# RunPod API STATUS 엔드포인트 URL
status_url = f"https://api.runpod.ai/v2/wm1xrz07all039/status/{job_id}"


# 요청 헤더에 API 키 추가
headers = {
    "Authorization": f"Bearer {api_key}"
}

# 작업 상태 및 결과 확인 요청 보내기
response = requests.get(status_url, headers=headers)

# 응답 확인
if response.status_code == 200:
    job_result = response.json()
    if job_result.get("status") == "COMPLETED":
        print("Job Completed! Result:", job_result.get("output"))
    else:
        print(f"Job Status: {job_result.get('status')}")
else:
    print(f"Error {response.status_code}: {response.text}")


Job Status: IN_QUEUE


In [27]:
import requests
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
api_key = os.getenv("RUNPOD_API_KEY")
endpoint_id = os.getenv("RUNPOD_ENDPOINT_ID")

# RunPod RUNSYNC 엔드포인트 URL
url = f"https://api.runpod.ai/v2/{endpoint_id}/runsync"
you_url = "https://youtu.be/omEk2BNDt1I?si=xjtbYANtlux5CTfB"


# FastAPI의 /hello 엔드포인트로 요청하기 위한 데이터
payload = {
    "input": {
        "endpoint": "/get_title_hash",
        "method": "GET",
        # "headers": {"x-session-id": "1234asdf"},
        "params": {"url": you_url},
    }
}

# 요청 헤더에 API 키 추가
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

# RUNSYNC 요청 보내기
response = requests.post(url, json=payload, headers=headers)

# 응답 확인
if response.status_code == 200:
    print("Response:", response.json())
else:
    print(f"Error {response.status_code}: {response.text}")


Response: {'delayTime': 6071, 'executionTime': 3825, 'id': 'sync-c9bea7a9-08ea-447d-bd62-e481515985b4-e1', 'output': {'hashtags': '#파뿌리 #생일 #친구 #예능 #노랭이', 'title': '수제 김밥 30줄로 생일 파티합니다!! 역대급 선물 언박싱까지!!!!!!!'}, 'status': 'COMPLETED', 'workerId': 'vto3bvdf9z7v0a'}


In [28]:
# import requests
# import os
# from dotenv import load_dotenv

# # Load environment variables
# load_dotenv()
# api_key = os.getenv("RUNPOD_API_KEY")
# endpoint_id = os.getenv("RUNPOD_ENDPOINT_ID")

# # RunPod RUNSYNC 엔드포인트 URL
# url = f"https://api.runpod.ai/v2/{endpoint_id}/runsync"

# # FastAPI의 /hello 엔드포인트로 요청하기 위한 데이터
# payload = {
#     "input": {
#         "endpoint": "/get_script_summary",
#         "method": "GET",
#         "headers": {"x-session-id": "1234asdf"},
#         "params": {"url": you_url},
#     }
# }

# # 요청 헤더에 API 키 추가
# headers = {
#     "Authorization": f"Bearer {api_key}",
#     "Content-Type": "application/json"
# }

# # RUNSYNC 요청 보내기
# response = requests.post(url, json=payload, headers=headers)

# # 응답 확인
# if response.status_code == 200:
#     print("Response:", response.json())
# else:
#     print(f"Error {response.status_code}: {response.text}")


In [33]:
import requests
import os
import time
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
api_key = os.getenv("RUNPOD_API_KEY")
endpoint_id = os.getenv("RUNPOD_ENDPOINT_ID")

# RunPod RUNSYNC 엔드포인트 URL
url = f"https://api.runpod.ai/v2/{endpoint_id}/run"

# FastAPI의 /hello 엔드포인트로 요청하기 위한 데이터
payload = {
    "input": {
        "endpoint": "/get_script_summary",
        "method": "GET",
        "headers": {"x-session-id": "1234asdf"},
        "params": {"url": you_url},
    }
}

# 요청 헤더에 API 키 추가
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

# RUNSYNC 요청 보내기
response = requests.post(url, json=payload, headers=headers)

# 응답 확인
if response.status_code == 200:
    result = response.json()
    print("Initial Response:", result)
    
    if result.get('status') in ['IN_PROGRESS',"IN_QUEUE"]:
        job_id = result.get('id')
        status_url = f"https://api.runpod.ai/v2/{endpoint_id}/status/{job_id}"
        
        while True:
            status_response = requests.get(status_url, headers=headers)
            if status_response.status_code == 200:
                status_data = status_response.json()
                print(f"Current status: {status_data.get('status')}")
                
                if status_data.get('status') == 'COMPLETED':
                    print(f"결과값:{status_data}")
                    result_url = f"https://api.runpod.ai/v2/{endpoint_id}/result/{job_id}"
                    result_response = requests.get(result_url, headers=headers)
                    
                    if result_response.status_code == 200:
                        final_result = result_response.json()
                        print("Final Result:", final_result)
                        break
                    else:
                        print(f"Error fetching results: {result_response.status_code}")
                        print(f"Error message: {result_response.text}")
                        break
                elif status_data.get('status') == 'FAILED':
                    print("Job failed")
                    break
            else:
                print(f"Error checking status: {status_response.status_code}")
                print(f"Error message: {status_response.text}")
                break
            
            time.sleep(5)  # 5초 대기 후 다시 상태 확인
    else:
        print("Job completed immediately")
        print("Final Result:", result)
else:
    print(f"Error {response.status_code}: {response.text}")

Initial Response: {'id': 'c2cb7372-07b6-4146-b2a4-7c8ad4e0eb34-e1', 'status': 'IN_QUEUE'}
Current status: IN_QUEUE
Current status: IN_QUEUE
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_PROGRESS
Current status: IN_P

In [24]:
import requests
import os
from dotenv import load_dotenv
import time

# Load environment variables
load_dotenv()
api_key = os.getenv("RUNPOD_API_KEY")
endpoint_id = os.getenv("RUNPOD_ENDPOINT_ID")

# RunPod RUNSYNC 엔드포인트 URL
url = f"https://api.runpod.ai/v2/{endpoint_id}/status/{job_id}"
url2 = f"https://api.runpod.ai/v2/{endpoint_id}/result/{job_id}"

# 요청 헤더에 API 키 추가
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json"
}

while True:
    # RUNSYNC 요청 보내기
    response = requests.get(url,headers=headers)

    # 응답 확인
    if response.status_code == 200:
        if response.json().get("status") == "COMPLETED":
            response = requests.get(url2,headers=headers)
            break
        else:
            print(f"Job status: {response.json().get('status')}")
    else:
        print(f"Error {response.status_code}: {response.text}")
    time.sleep(5)


Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS


In [26]:
response.text

'404 page not found'

In [16]:
import json
RUNPOD_API_URL = f"https://api.runpod.ai/v2/{endpoint_id}/runsync"
headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json",
}
payload = {
    "input": {
        "endpoint": "/rag_stream_chat",
        "method": "POST",
        "headers": {"x-session-id": "1234asdf"},
        "params": {"prompt": "영상의 주제가 뭔가요?"},
    }
}

response = requests.post(
    RUNPOD_API_URL, headers=headers, json=payload, stream=True
)

# for chunk in response.iter_content(chunk_size=None):
#     if chunk:
#         chunk_data = chunk.decode("utf-8").strip()
#         if chunk_data.startswith("data: "):
#             chunk_content = chunk_data[6:]
#             if chunk_content == "[DONE]":
#                 break
#             try:
#                 content = json.loads(chunk_content)
#                 print("Stream content:", content)
#             except json.JSONDecodeError:
#                 print("Invalid JSON:", chunk_content)


In [28]:
answer = ""
for chunk in response.json().get("output"):
    answer += chunk.get("content")
    print(answer)


영상
영상의
영상의 주
영상의 주제
영상의 주제는
영상의 주제는 까
영상의 주제는 까르
영상의 주제는 까르보
영상의 주제는 까르보나라
영상의 주제는 까르보나라 요
영상의 주제는 까르보나라 요리
영상의 주제는 까르보나라 요리 과정
영상의 주제는 까르보나라 요리 과정에
영상의 주제는 까르보나라 요리 과정에 대한
영상의 주제는 까르보나라 요리 과정에 대한 내용
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다.
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨,
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란,
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면수
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면수 등을
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면수 등을 사용
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면수 등을 사용하여
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면수 등을 사용하여 맛
영상의 주제는 까르보나라 요리 과정에 대한 내용입니다. 영상에서는 베이컨, 계란, 면수 등

In [3]:
with open("script.txt","r") as f:
    lines = f.readlines()

In [5]:
context = "".join(lines)

In [7]:
print(context)

In the last chapter, you and I started to step through the internal workings of a transformer.
This is one of the key pieces of technology inside large language models, and a lot of
other tools in the modern wave of AI.
It first hit the scene in a now-famous 2017 paper called Attention is All You Need, and
in this chapter, you and I will dig into what this attention mechanism is, visualizing how
it processes data.
As a quick recap, here's the important context I want you to have in mind.
The goal of the model that you and I are studying is to take in a piece of text and predict
what word comes next.
The input text is broken up into little pieces that we call tokens, and these are very often
words or pieces of words, but just to make the examples in this video easier for you
and me to think about, let's simplify by pretending that tokens are always just words.
The first step in a transformer is to associate each token with a high-dimensional vector,
what we call its embedding.
Now the m

In [1]:
from pytubefix import YouTube
url = "https://www.youtube.com/watch?v=yF_YIxxjWU4"
yt = YouTube(url, use_po_token=True)

In [None]:
yt.title

You can use the tool: https://github.com/YunzheZJU/youtube-po-token-generator, to get the token
