In [None]:
# 音声文字起こしサービス (Google Colab版)

このノートブックは、ローカル音声ファイル（WAV、MP3、FLAC、M4A、OGG）をGoogle Speech-to-Textを使用して文字起こしを行います。

## 使用方法
1. 必要なパッケージをインストール
2. Google Cloudサービスアカウントキーをアップロード
3. 設定を入力
4. 音声ファイルをアップロード
5. 文字起こし実行
6. 結果をダウンロード

## 事前準備
- Google Cloud Platformでプロジェクト作成
- Speech-to-Text API、Cloud Storage APIを有効化
- サービスアカウント作成とJSONキーダウンロード
- Cloud Storageバケット作成


In [None]:
# 設定値を入力してください
GCS_BUCKET_NAME = "250728transcription-bucket"  # ← 実際のCloud Storageバケット名に変更

# 処理設定
CHUNK_LENGTH_MS = 300000  # 5分チャンク
MAX_CONCURRENT_TRANSCRIPTIONS = 3  # Colab用に並行数を制限
TRANSCRIPTION_TIMEOUT = 1800  # 30分（Colab制限に配慮）

print(f"バケット名: {GCS_BUCKET_NAME}")
print(f"チャンクサイズ: {CHUNK_LENGTH_MS}ms")
print("⚠️ GCS_BUCKET_NAMEを実際のバケット名に変更してください")


バケット名: 250728transcription-bucket
チャンクサイズ: 300000ms
⚠️ GCS_BUCKET_NAMEを実際のバケット名に変更してください


In [None]:
# 設定値を入力してください
GCS_BUCKET_NAME = "250728transcription-bucket"  # ← 実際のCloud Storageバケット名に変更

# 処理設定
CHUNK_LENGTH_MS = 300000  # 5分チャンク
MAX_CONCURRENT_TRANSCRIPTIONS = 3  # Colab用に並行数を制限
TRANSCRIPTION_TIMEOUT = 1800  # 30分（Colab制限に配慮）

print(f"バケット名: {GCS_BUCKET_NAME}")
print(f"チャンクサイズ: {CHUNK_LENGTH_MS}ms")
print("⚠️ GCS_BUCKET_NAMEを実際のバケット名に変更してください")


バケット名: 250728transcription-bucket
チャンクサイズ: 300000ms
⚠️ GCS_BUCKET_NAMEを実際のバケット名に変更してください


In [None]:
## 1. 必要なパッケージのインストール


In [6]:
# 必要なパッケージをインストール
!pip install -q google-cloud-speech google-cloud-storage pydub

# FFmpegをインストール（音声変換用）
!apt-get update -qq
!apt-get install -y ffmpeg

print("✅ インストール完了")


zsh:1: command not found: pip
zsh:1: command not found: apt-get
zsh:1: command not found: apt-get
✅ インストール完了


In [None]:
## 2. 設定


In [12]:
# 設定値を入力してください
GCS_BUCKET_NAME = "250728transcription-bucket"  # ← 実際のCloud Storageバケット名に変更

# 処理設定
CHUNK_LENGTH_MS = 300000  # 5分チャンク
MAX_CONCURRENT_TRANSCRIPTIONS = 3  # Colab用に並行数を制限
TRANSCRIPTION_TIMEOUT = 1800  # 30分（Colab制限に配慮）

print(f"バケット名: {GCS_BUCKET_NAME}")
print(f"チャンクサイズ: {CHUNK_LENGTH_MS}ms")
print("⚠️ GCS_BUCKET_NAMEを実際のバケット名に変更してください")


バケット名: 250728transcription-bucket
チャンクサイズ: 300000ms
⚠️ GCS_BUCKET_NAMEを実際のバケット名に変更してください


In [None]:
## 3. Google Cloudサービスアカウントキーのアップロード


In [None]:
from google.colab import files
import json

print("📁 Google Cloudサービスアカウントキーファイル（JSON）をアップロードしてください:")
uploaded = files.upload()

# アップロードされたファイル名を取得
service_account_file = list(uploaded.keys())[0]
SERVICE_ACCOUNT_PATH = service_account_file

print(f"✅ アップロード完了: {service_account_file}")

# ファイルの妥当性をチェック
try:
    with open(service_account_file, 'r') as f:
        key_data = json.load(f)
    print(f"プロジェクトID: {key_data.get('project_id', 'N/A')}")
    print(f"クライアントEmail: {key_data.get('client_email', 'N/A')}")
except Exception as e:
    print(f"❌ ファイル読み込みエラー: {e}")


In [None]:
## 4. 音声文字起こしサービスクラス


In [None]:
import os
import asyncio
import tempfile
from pathlib import Path
from typing import Optional
import logging

# Google Cloud関連
from google.cloud import speech
from google.cloud import storage

# 音声処理関連
from pydub import AudioSegment
from pydub.utils import make_chunks

# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class AudioTranscriptionService:
    def __init__(self, service_account_path: str, gcs_bucket_name: str):
        """音声文字起こしサービス（ローカルWAVファイル専用）"""
        self.service_account_path = service_account_path
        self.gcs_bucket_name = gcs_bucket_name
        
        # Google Cloud Speech クライアント初期化
        self.speech_client = speech.SpeechClient.from_service_account_file(service_account_path)
        
        # Google Cloud Storage クライアント初期化
        self.storage_client = storage.Client.from_service_account_json(service_account_path)
    
    def validate_audio_file(self, audio_path: str) -> bool:
        """ローカル音声ファイルの存在と形式を検証"""
        try:
            path = Path(audio_path)
            
            if not path.exists():
                logger.error(f"ファイルが見つかりません: {audio_path}")
                return False
            
            file_size = path.stat().st_size
            if file_size == 0:
                logger.error(f"ファイルが空です: {audio_path}")
                return False
            
            size_mb = file_size / (1024 * 1024)
            logger.info(f"音声ファイルサイズ: {size_mb:.2f}MB")
            
            supported_extensions = {'.wav', '.mp3', '.flac', '.m4a', '.ogg'}
            if path.suffix.lower() not in supported_extensions:
                logger.warning(f"未対応の可能性がある形式: {path.suffix}")
                logger.info("WAV形式への変換を試行します")
            
            return True
            
        except Exception as e:
            logger.error(f"ファイル検証エラー: {str(e)}")
            return False

print("✅ AudioTranscriptionService クラス（パート1）定義完了")


In [None]:
# AudioTranscriptionServiceクラスの続き（音声変換・分割メソッド）

async def convert_to_wav_if_needed(self, audio_path: str, output_path: str) -> bool:
    """音声ファイルをWAV形式に変換（必要な場合のみ）"""
    try:
        path = Path(audio_path)
        
        # すでにWAVファイルで適切な形式の場合はコピーのみ
        if path.suffix.lower() == '.wav':
            audio = AudioSegment.from_wav(audio_path)
            
            if audio.frame_rate == 16000 and audio.channels == 1:
                logger.info("音声ファイルは既に最適な形式です")
                if audio_path != output_path:
                    import shutil
                    shutil.copy2(audio_path, output_path)
                return True
        
        logger.info("音声ファイルを最適化中...")
        
        def convert_audio():
            try:
                # 音声ファイルを読み込み
                if path.suffix.lower() == '.wav':
                    audio = AudioSegment.from_wav(audio_path)
                elif path.suffix.lower() == '.mp3':
                    audio = AudioSegment.from_mp3(audio_path)
                elif path.suffix.lower() == '.flac':
                    audio = AudioSegment.from_file(audio_path, format="flac")
                elif path.suffix.lower() == '.m4a':
                    audio = AudioSegment.from_file(audio_path, format="m4a")
                elif path.suffix.lower() == '.ogg':
                    audio = AudioSegment.from_ogg(audio_path)
                else:
                    audio = AudioSegment.from_file(audio_path)
                
                logger.info(f"音声時間: {len(audio) / 1000:.2f}秒")
                logger.info(f"サンプリングレート: {audio.frame_rate}Hz")
                logger.info(f"チャンネル数: {audio.channels}")
                
                # Google Speech-to-Textに最適化（16kHz、モノラル）
                audio = audio.set_frame_rate(16000).set_channels(1)
                audio.export(output_path, format="wav")
                
            except Exception as e:
                logger.error(f"音声変換中にエラー: {str(e)}")
                raise
        
        await asyncio.to_thread(convert_audio)
        
        if not os.path.exists(output_path):
            raise Exception("音声ファイルの変換に失敗")
        
        audio_size = os.path.getsize(output_path) / (1024 * 1024)
        logger.info(f"音声最適化完了 - ファイルサイズ: {audio_size:.2f}MB")
        return True
        
    except Exception as e:
        logger.error(f"音声変換エラー: {str(e)}")
        return False

async def split_audio_for_processing(self, audio_path: str, chunk_length_ms: int = 300000) -> list:
    """長時間音声を処理可能なチャンクに分割"""
    try:
        logger.info("音声ファイルを分割中...")
        
        def split_audio():
            audio = AudioSegment.from_wav(audio_path)
            chunks = make_chunks(audio, chunk_length_ms)
            
            chunk_files = []
            temp_dir = tempfile.mkdtemp()
            
            for i, chunk in enumerate(chunks):
                chunk_path = os.path.join(temp_dir, f"chunk_{i:04d}.wav")
                chunk.export(chunk_path, format="wav")
                chunk_files.append(chunk_path)
            
            return chunk_files
        
        chunk_files = await asyncio.to_thread(split_audio)
        logger.info(f"音声を{len(chunk_files)}個のチャンクに分割完了")
        return chunk_files
        
    except Exception as e:
        logger.error(f"音声分割エラー: {str(e)}")
        return []

# クラスにメソッドを追加
AudioTranscriptionService.convert_to_wav_if_needed = convert_to_wav_if_needed
AudioTranscriptionService.split_audio_for_processing = split_audio_for_processing

print("✅ AudioTranscriptionService クラス（パート2）定義完了")


In [None]:
# AudioTranscriptionServiceクラスの続き（文字起こし処理メソッド）

async def upload_to_gcs(self, local_path: str, gcs_path: str) -> bool:
    """ファイルをGoogle Cloud Storageにアップロード"""
    try:
        bucket = self.storage_client.bucket(self.gcs_bucket_name)
        blob = bucket.blob(gcs_path)
        blob.upload_from_filename(local_path)
        logger.info(f"GCSにアップロード完了: {gcs_path}")
        return True
        
    except Exception as e:
        logger.error(f"GCSアップロードエラー: {str(e)}")
        return False

async def transcribe_audio_chunk(self, gcs_uri: str, chunk_index: int) -> Optional[str]:
    """音声チャンクを文字起こし"""
    try:
        logger.info(f"チャンク {chunk_index} の文字起こし開始")
        
        audio = speech.RecognitionAudio(uri=gcs_uri)
        config = speech.RecognitionConfig(
            encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
            sample_rate_hertz=16000,
            language_code="ja-JP",
            enable_automatic_punctuation=True,
            enable_word_time_offsets=True,
            model="latest_long",
        )
        
        operation = await asyncio.to_thread(
            self.speech_client.long_running_recognize,
            config=config,
            audio=audio
        )

        logger.info(f"チャンク {chunk_index} の認識処理を待機中...")
        response = await asyncio.to_thread(operation.result, timeout=TRANSCRIPTION_TIMEOUT)
        
        transcript = ""
        for result in response.results:
            transcript += result.alternatives[0].transcript + " "
        
        logger.info(f"チャンク {chunk_index} の文字起こし完了")
        return transcript.strip()
        
    except Exception as e:
        logger.error(f"チャンク {chunk_index} の文字起こしエラー: {str(e)}")
        return None

async def process_audio_chunks_parallel(self, chunk_files: list) -> list:
    """複数の音声チャンクを並行処理で文字起こし"""
    tasks = []
    gcs_uris = []
    
    # 各チャンクをGCSにアップロード
    for i, chunk_file in enumerate(chunk_files):
        gcs_path = f"audio_chunks/chunk_{i:04d}.wav"
        await self.upload_to_gcs(chunk_file, gcs_path)
        gcs_uri = f"gs://{self.gcs_bucket_name}/{gcs_path}"
        gcs_uris.append(gcs_uri)
    
    # 並行処理で文字起こし実行
    for i, gcs_uri in enumerate(gcs_uris):
        task = self.transcribe_audio_chunk(gcs_uri, i)
        tasks.append(task)
    
    # 同時実行数を制限（Colab用）
    semaphore = asyncio.Semaphore(MAX_CONCURRENT_TRANSCRIPTIONS)
    
    async def limited_transcribe(task):
        async with semaphore:
            return await task
    
    results = await asyncio.gather(*[limited_transcribe(task) for task in tasks])
    return results

# クラスにメソッドを追加
AudioTranscriptionService.upload_to_gcs = upload_to_gcs
AudioTranscriptionService.transcribe_audio_chunk = transcribe_audio_chunk
AudioTranscriptionService.process_audio_chunks_parallel = process_audio_chunks_parallel

print("✅ AudioTranscriptionService クラス（パート3）定義完了")


In [None]:
# AudioTranscriptionServiceクラスの続き（メイン処理メソッド）

async def save_transcript_locally(self, transcript: str, output_path: str) -> bool:
    """文字起こし結果をローカルファイルに保存"""
    try:
        logger.info(f"文字起こし結果をローカルに保存中: {output_path}")
        
        output_dir = Path(output_path).parent
        output_dir.mkdir(parents=True, exist_ok=True)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(transcript)
        
        file_size = os.path.getsize(output_path) / 1024
        logger.info(f"ローカル保存完了 - ファイルサイズ: {file_size:.2f}KB")
        return True
        
    except Exception as e:
        logger.error(f"ローカル保存エラー: {str(e)}")
        return False

async def process_audio_transcription(self, audio_path: str, output_path: str, chunk_length_ms: int = 300000) -> bool:
    """ローカル音声ファイルの文字起こし処理"""
    if not self.validate_audio_file(audio_path):
        raise Exception("入力音声ファイルの検証に失敗")
    
    temp_dir = tempfile.mkdtemp()
    wav_path = os.path.join(temp_dir, "optimized_audio.wav")
    chunk_files = []
    
    try:
        logger.info("音声ファイルの文字起こし処理を開始")
        logger.info(f"入力ファイル: {audio_path}")
        logger.info(f"出力ファイル: {output_path}")
        
        # 1. 音声ファイルをWAV形式に変換・最適化
        if not await self.convert_to_wav_if_needed(audio_path, wav_path):
            raise Exception("音声ファイルの最適化に失敗")
        
        # 2. 音声を処理可能なチャンクに分割
        chunk_files = await self.split_audio_for_processing(wav_path, chunk_length_ms)
        if not chunk_files:
            raise Exception("音声分割に失敗")
        
        # 3. 並行処理で文字起こし実行
        transcripts = await self.process_audio_chunks_parallel(chunk_files)
        
        # 4. 結果を結合
        final_transcript = "\\n".join([t for t in transcripts if t])
        
        if not final_transcript.strip():
            raise Exception("文字起こし結果が空です")
        
        # 5. 結果をローカルに保存
        success = await self.save_transcript_locally(final_transcript, output_path)
        if not success:
            raise Exception("ローカル保存に失敗")
        
        logger.info("音声ファイルの文字起こし処理完了")
        return True
        
    except Exception as e:
        logger.error(f"処理エラー: {str(e)}")
        raise
    
    finally:
        # クリーンアップ
        import shutil
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)
        
        for chunk_file in chunk_files:
            if os.path.exists(chunk_file):
                os.unlink(chunk_file)

# クラスにメソッドを追加
AudioTranscriptionService.save_transcript_locally = save_transcript_locally
AudioTranscriptionService.process_audio_transcription = process_audio_transcription

print("✅ AudioTranscriptionService クラス（完成）定義完了")


In [None]:
## 5. 音声ファイルのアップロード


In [None]:
print("🎵 音声ファイル（WAV、MP3、FLAC、M4A、OGG）をアップロードしてください:")
audio_files = files.upload()

# アップロードされたファイル名を取得
audio_file_name = list(audio_files.keys())[0]
output_file_name = "transcription_result.txt"

print(f"✅ アップロード完了: {audio_file_name}")
print(f"出力ファイル名: {output_file_name}")

# ファイルサイズを表示
file_size = Path(audio_file_name).stat().st_size / (1024 * 1024)
print(f"ファイルサイズ: {file_size:.2f}MB")


In [None]:
## 6. 文字起こし実行


In [None]:
# サービス初期化
print("🚀 文字起こしサービスを初期化中...")
service = AudioTranscriptionService(
    service_account_path=SERVICE_ACCOUNT_PATH,
    gcs_bucket_name=GCS_BUCKET_NAME
)

# 文字起こし実行
print("📝 文字起こし処理を開始します...")
print("※長時間音声の場合、処理完了まで時間がかかる場合があります。")
print("※処理中はブラウザを閉じないでください。")

try:
    success = await service.process_audio_transcription(
        audio_path=audio_file_name,
        output_path=output_file_name,
        chunk_length_ms=CHUNK_LENGTH_MS
    )
    
    if success:
        print("\\n" + "="*60)
        print("✅ 文字起こし処理が完了しました！")
        
        # 結果ファイルのサイズを表示
        if Path(output_file_name).exists():
            result_size = Path(output_file_name).stat().st_size / 1024
            print(f"結果ファイルサイズ: {result_size:.2f}KB")
            
            # 結果の一部を表示
            with open(output_file_name, 'r', encoding='utf-8') as f:
                content = f.read()
                preview = content[:500] + "..." if len(content) > 500 else content
                print(f"\\n📄 結果プレビュー:\\n{preview}")
        
        print("="*60)
    else:
        print("❌ 文字起こし処理に失敗しました")
        
except Exception as e:
    print(f"\\n❌ エラーが発生しました: {str(e)}")
    import traceback
    traceback.print_exc()


In [None]:
## 7. 結果ファイルのダウンロード


In [None]:
# 結果ファイルが存在する場合はダウンロード
if Path(output_file_name).exists():
    print(f"📥 結果ファイル '{output_file_name}' をダウンロードします...")
    files.download(output_file_name)
    print("✅ ダウンロード完了")
else:
    print("❌ 結果ファイルが見つかりません")


In [None]:
## 8. クリーンアップ（オプション）


In [None]:
# アップロードしたファイルを削除（オプション）
import os

files_to_cleanup = [SERVICE_ACCOUNT_PATH, audio_file_name, output_file_name]

print("🧹 アップロードしたファイルをクリーンアップしますか？")
cleanup = input("y/N: ").lower().strip()

if cleanup == 'y':
    for file_path in files_to_cleanup:
        if os.path.exists(file_path):
            os.remove(file_path)
            print(f"削除: {file_path}")
    print("✅ クリーンアップ完了")
else:
    print("ファイルは保持されます")


In [None]:
## トラブルシューティング

### よくあるエラーと対処法

1. **認証エラー**
   - サービスアカウントキーファイルが正しいか確認
   - Speech-to-Text APIが有効化されているか確認

2. **バケットエラー**
   - `GCS_BUCKET_NAME`が正しく設定されているか確認
   - バケットが存在し、サービスアカウントに権限があるか確認

3. **音声ファイルエラー**
   - 対応形式（WAV、MP3、FLAC、M4A、OGG）か確認
   - ファイルが破損していないか確認

4. **メモリエラー**
   - 大きな音声ファイルの場合、`CHUNK_LENGTH_MS`を小さくする
   - Colab Proの使用を検討

5. **タイムアウトエラー**
   - `TRANSCRIPTION_TIMEOUT`の値を増やす
   - 音声ファイルを短く分割して処理

### 制限事項
- 音声言語：日本語のみ
- Google Cloud APIの利用料金が発生
- インターネット接続が必要
- Colabの実行時間制限（無料版：12時間、Pro版：24時間）
