# Voice Recording Transcription on Colab
Whisperを使用してGoogle Drive上の音声ファイルを文字起こしし、結果をSpreadsheetに記録します。

In [None]:
# ライブラリのインストール
!pip install git+https://github.com/openai/whisper.git
!pip install gspread oauth2client google-api-python-client
!sudo apt update && sudo apt install ffmpeg

In [None]:
import os
import torch
import whisper
import gspread
from google.colab import auth
from google.auth import default
from google.colab import drive
import datetime
import io
import traceback
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload

In [None]:
# 設定
SPREADSHEET_ID = 'YOUR_SPREADSHEET_ID' # @param {type:"string"}
SHEET_NAME = 'シート1' # @param {type:"string"}
DRIVE_MOUNT_PATH = '/content/drive'

In [None]:
# Google Driveのマウントと認証
if not os.path.exists(DRIVE_MOUNT_PATH):
    drive.mount(DRIVE_MOUNT_PATH)

auth.authenticate_user()
creds, _ = default()
gc = gspread.authorize(creds)

In [None]:
# 文字起こし実行関数
def run_transcription(model, file_id, file_name):
    print(f"Transcribing {file_name}...")
    
    downloaded_file_path = f"/content/{file_name}"
    
    # Google API Client を使用してダウンロード
    # 既に認証済みのcredsを使用
    drive_service = build('drive', 'v3', credentials=creds)
    
    try:
        request = drive_service.files().get_media(fileId=file_id)
        fh = io.FileIO(downloaded_file_path, 'wb')
        downloader = MediaIoBaseDownload(fh, request)
        
        done = False
        while done is False:
            status, done = downloader.next_chunk()

        # Whisper実行
        result = model.transcribe(downloaded_file_path, verbose=False, language='ja')
        
        # タイムスタンプ付きで整形
        formatted_text = ""
        for segment in result["segments"]:
            start = str(datetime.timedelta(seconds=int(segment['start'])))
            text = segment['text']
            formatted_text += f"[{start}] {text}\n"
            
        return formatted_text
        
    except Exception as e:
        print(f"Transcription failed: {e}")
        traceback.print_exc()
        return None
    finally:
        if os.path.exists(downloaded_file_path):
            os.remove(downloaded_file_path)

In [None]:
# Whisperモデルのロード (GPU自動判定)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
model = whisper.load_model("medium", device=device)

In [None]:
# メイン処理
try:
    wb = gc.open_by_key(SPREADSHEET_ID)
    sheet = wb.worksheet(SHEET_NAME)
except Exception as e:
    print(f"Error opening spreadsheet: {e}")
else:
    # Load Data
    rows = sheet.get_all_values()
    if not rows:
        print("No data found in spreadsheet.")
    else:
        header = rows[0]
        data = rows[1:]

        # カラムインデックスの特定 (ヘッダー名で検索)
        idx_status = -1
        idx_ts_id = -1
        idx_file_url = -1
        idx_category = -1
        idx_date = -1
        idx_file_id = -1
        idx_file_name = -1

        for i, col_name in enumerate(header):
            if "ステータス" in col_name: idx_status = i
            if "文字起こし" in col_name and "ID" in col_name: idx_ts_id = i
            if "URL" in col_name or "パス" in col_name: idx_file_url = i
            if "科目" in col_name or "カテゴリ" in col_name: idx_category = i
            if "日付" in col_name: idx_date = i
            if col_name == "FileID" or "ファイルID" in col_name: idx_file_id = i
            if "ファイル名" in col_name or col_name == "Name": idx_file_name = i
        
        # 必須カラムチェック
        missing = []
        if idx_status == -1: missing.append('ステータス')
        if idx_file_id == -1: missing.append('FileID')
        if idx_file_name == -1: missing.append('ファイル名')
        if missing:
            print(f"Error: Required column(s) not found: {', '.join(missing)}")
        else:
            # バッチ更新用のリスト
            updates = []

            for row_idx, row in enumerate(data):
                actual_row_num = row_idx + 2
                
                status = row[idx_status]
                
                if status == "未実行":
                    file_id = row[idx_file_id]
                    file_name = row[idx_file_name]
                    category = row[idx_category] if idx_category != -1 else "Unknown"
                    date_str = row[idx_date] if idx_date != -1 else "Unknown"

                    print(f"Processing Row {actual_row_num}: {category} ({date_str})")
                    
                    # Whisper実行
                    transcript_text = run_transcription(model, file_id, file_name)
                    
                    if transcript_text:
                        # 保存パス設定
                        save_path = f"/content/drive/MyDrive/録音_ARCHIVE/{category}/{date_str}_{category}_transcript.txt"
                        
                        try:
                            os.makedirs(os.path.dirname(save_path), exist_ok=True)
                            
                            with open(save_path, "w", encoding="utf-8") as f:
                                f.write(transcript_text)
                            
                            print(f"Saved transcript to: {save_path}")
                            
                            # バッチ更新用にリストへ追加
                            status_cell = gspread.utils.rowcol_to_a1(actual_row_num, idx_status + 1)
                            updates.append({'range': status_cell, 'values': [['完了']]})
                            if idx_ts_id != -1:
                                ts_cell = gspread.utils.rowcol_to_a1(actual_row_num, idx_ts_id + 1)
                                updates.append({'range': ts_cell, 'values': [[save_path]]})
                                
                        except Exception as e:
                            print(f"Error saving/updating: {e}")

            # バッチ更新実行
            if updates:
                try:
                    sheet.batch_update(updates)
                    print(f"Spreadsheet updated: {len(updates)} cells")
                except Exception as e:
                    print(f"Error in batch update: {e}")