In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Mixtral-8x7B

In [None]:
!pip install datasets evaluate jiwer librosa
!pip install --upgrade bitsandbytes transformers==4.50.0 accelerate
!pip install ctranslate2==4.4.0 whisperx
!apt-get install libcudnn8 libcudnn8-dev

In [None]:
import os
import glob
import json
import re
import torch
import whisperx
from tqdm import tqdm  # 進度條

def extract_number(f):
    """
    從檔名中抓出第一個數字，用來排序。
    若找不到數字，就回傳無限大 (讓它排在最後)。
    """
    match = re.search(r'\d+', os.path.basename(f))
    return int(match.group()) if match else float('inf')

# 1. 找出所有 .wav 檔案，並依照檔名中的數字排序
audio_folder = "/content/drive/MyDrive/Colab Notebooks/private"
audio_files = sorted(
    glob.glob(os.path.join(audio_folder, "*.wav")),
    key=extract_number
)
print(f"找到 {len(audio_files)} 個 .wav 檔案。")

# 2. 決定要用 CPU 還是 GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用裝置：{device}")

# 3. 載入 WhisperX 語音模型
whisperx_model = whisperx.load_model("large-v3", device, compute_type = "float16")

# 4. 用來儲存最終結果的 list（給 JSON）
results = []

# === 用來儲存 task1 的格式輸出 ===
task1_lines = []

# 5. 處理所有音檔
for idx, audio_path in enumerate(tqdm(audio_files, desc="處理進度")):
    file_id = os.path.splitext(os.path.basename(audio_path))[0]
    print(f"\n>>> 正在處理檔案 {idx+1}/{len(audio_files)}：{file_id}.wav")

    # 5.1 用 WhisperX 轉錄
    raw_result = whisperx_model.transcribe(audio_path)
    segments = raw_result["segments"]

    # 5.2 自動偵測語言並載入對齊模型
    lang_code = raw_result["language"]
    print(f"偵測到語言: {lang_code}")

    align_model, metadata = whisperx.load_align_model(lang_code, device)

    # 5.3 進行詞級對齊
    aligned_result = whisperx.align(
        segments,
        align_model,
        metadata,
        audio_path,
        device
    )

    # 5.4 組成 words_info
    words_info = []
    for w in aligned_result["word_segments"]:
        word_text = w["word"]
        start_time = float(w["start"])
        end_time = float(w["end"])
        words_info.append({
            "word": word_text,
            "start": start_time,
            "end": end_time
        })

    # 5.5 串成完整文字（加空格）
    full_text = " ".join([w["word"] for w in words_info]).strip()

    # 5.6 統計資訊
    print(f"　- 偵測語言：{lang_code}")
    print(f"　- 轉錄詞數：{len(words_info)}")
    print(f"　- 轉錄文字長度：{len(full_text)} 字元")

    # 5.7 結果加入 JSON 結構
    results.append({
        "file_id": file_id,
        "language": lang_code,
        "words": words_info,
        "text": full_text
    })

    # 5.8 結果加入 task1_lines
    task1_lines.append(f"{file_id}\t{full_text}")

# 6. 輸出 JSON
output_json_path = "/content/drive/MyDrive/Colab Notebooks/aicup0607/transcription_results_whisperx.json"
with open(output_json_path, "w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n✅ 已儲存所有轉錄與對齊結果到：{output_json_path}")

# 7. 輸出 task1 格式 TXT
output_txt_path = "/content/drive/MyDrive/Colab Notebooks/aicup0607/task1_output_whisperx.txt"
with open(output_txt_path, "w", encoding="utf-8") as f:
    f.write("\n".join(task1_lines))
print(f"✅ 已輸出比賽格式 TXT 檔至：{output_txt_path}")

In [None]:
from huggingface_hub import login
login("YOUR_HUGGINGFACE_TOKEN")  # 替換成你的 Hugging Face Token

In [None]:
# 1️⃣ 安裝
!pip install --upgrade pip
!pip install torch transformers accelerate

import json
import re
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

# 2️⃣ 模型載入
model_name = "mistralai/Mistral-7B-Instruct-v0.2"
# model_name = "mistralai/Mixtral-8x7B-Instruct-v0.1"

device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)

In [None]:
# 3️⃣ Prompt 建構
def build_prompt(text):
    labels = [
        "PATIENT", "DOCTOR", "USERNAME", "FAMILYNAME", "PERSONALNAME", "PROFESSION",
        "ROOM", "DEPARTMENT", "HOSPITAL", "ORGANIZATION", "STREET", "CITY",
        "DISTRICT", "COUNTY", "STATE", "COUNTRY", "ZIP", "LOCATION-OTHER", "AGE",
        "DATE", "TIME", "DURATION", "SET", "PHONE", "FAX", "EMAIL", "URL",
        "IPADDRESS", "SOCIAL_SECURITY_NUMBER", "MEDICAL_RECORD_NUMBER",
        "HEALTH_PLAN_NUMBER", "ACCOUNT_NUMBER", "LICENSE_NUMBER", "VEHICLE_ID",
        "DEVICE_ID", "BIOMETRIC_ID", "ID_NUMBER", "OTHER"
    ]
    label_str = ", ".join(labels)

    system_prompt = f"""
你是一個醫療病例報告SHI檢測模型，請從以下輸入文本中抽取SHI類別的實體，並且只使用以下的label：
{label_str}。
""".strip()

    user_prompt = f"""
請找出文本中的SHI，並只能輸出 JSON 格式，每個label對應的值放在JSON中，每個元素包含"label"與"entity_text"兩個欄位。若在文字中只要有可能符合，請務必標註，不要輸出空陣列。以下是範例
輸入:"A 69-year-old patient Jack Bryant, identified by Episode Number 27O537406U and Medical Record 2755374.ARU, resides on Sandering Street in Barwon Heads, Western Australia, with a ZIP code of 6906."
輸出:
[
  {{"label": "AGE", "entity_text": "69"}},
  {{"label": "PATIENT", "entity_text": "Jack Bryant"}},
  {{"label": "MEDICAL_RECORD_NUMBER", "entity_text": "2755374.ARU"}},
  {{"label": "STREET", "entity_text": "Sandering"}},
  {{"label": "CITY", "entity_text": "Barwon Heads"}},
  {{"label": "STATE", "entity_text": "Western Australia"}}
]：
{text}
""".strip()
    return system_prompt, user_prompt

# 4️⃣ WhisperX JSON 載入
with open('/content/drive/MyDrive/Colab Notebooks/aicup0607/transcription_results_whisperx.json', 'r', encoding='utf-8') as f:
    whisper_data = json.load(f)

# 5️⃣ LLM NER 辨識
def run_ner(text):
    system_prompt, user_prompt = build_prompt(text)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]
    input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer([input_text], return_tensors="pt").to(device)

    outputs = model.generate(
        **inputs,
        max_new_tokens=512,
        eos_token_id=tokenizer.eos_token_id
    )
    result_text = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

    # 先移除多餘的 [INST] / [/INST] 區塊
    result_text = re.sub(r'\[INST\][\s\S]*?\[/INST\]', '', result_text).strip()
    # 👀 顯示模型回答
    print("📢 模型回答：")
    print(result_text)
    print("=" * 28)

    # 嘗試提取 JSON 區塊
    json_candidates = re.findall(r'\[[\s\S]*?\]', result_text)
    if json_candidates:
        json_str = json_candidates[-1]
    else:
        print("⚠️ 找不到 JSON 區塊，以下為原始輸出：")
        print(result_text)
        json_str = "[]"

    # JSON 解析
    try:
        parsed_output = json.loads(json_str)
        print(f"✅ JSON解析成功")
    except Exception as e:
        print(f"⚠️ JSON解析失敗：{e}")
        print(f"模型輸出：{json_str}")
        parsed_output = []
    return parsed_output

# 6️⃣ NER結果產生
ner_results = {}
for entry in whisper_data:
    file_id = entry.get('file_id') or entry.get('id') or 'unknown'
    text = entry['text']
    ner_results[file_id] = run_ner(text)

# 7️⃣ 儲存ner_results.json
with open('/content/drive/MyDrive/Colab Notebooks/aicup0607/ner_results.json', 'w', encoding='utf-8') as fout:
    json.dump(ner_results, fout, indent=2, ensure_ascii=False)

print("✅ 已完成 NER 辨識，結果已存至 ner_results.json")

In [None]:
import json
import re

with open('/content/drive/MyDrive/Colab Notebooks/aicup0607/transcription_results_whisperx.json', 'r', encoding='utf-8') as f:
    whisper_data = json.load(f)

with open('/content/drive/MyDrive/Colab Notebooks/aicup0607/ner_results.json', 'r', encoding='utf-8') as f:
    ner_results = json.load(f)

# 8️⃣ 對齊時間戳
def align_entity_to_time(entity_text, words):
    cleaned_entity = re.sub(r'[^\w\s]', '', entity_text).lower()
    entity_tokens = cleaned_entity.split()
    for i in range(len(words) - len(entity_tokens) + 1):
        segment = ' '.join(
            re.sub(r'[^\w\s]', '', words[j]['word']).lower()
            for j in range(i, i + len(entity_tokens))
        )
        if segment == ' '.join(entity_tokens):
            start_time = words[i]['start']
            end_time = words[i + len(entity_tokens) - 1]['end']
            return start_time, end_time
    return None, None

# 9️⃣ 輸出task2_answer.txt
output_lines = []
for entry in whisper_data:
    file_id = entry.get('file_id') or entry.get('id') or 'unknown'
    words = entry['words']
    entities = ner_results.get(file_id, [])

    for entity in entities:
        # Check if the entity is a dictionary before accessing keys
        if isinstance(entity, dict):
            label = entity.get('label')
            entity_text = entity.get('entity_text')

            # 檢查 label 和 entity_text 是否都存在且 entity_text 不為空
            if label is None or entity_text is None or not entity_text.strip():
                 print(f"⚠️ file_id: {file_id} 中的實體資料不完整 ({entity})，已略過")
                 continue

            start_time, end_time = align_entity_to_time(entity_text, words)
            if start_time is not None and end_time is not None:
                line = f"{file_id}\t{label}\t{start_time:.3f}\t{end_time:.3f}\t{entity_text}"
                output_lines.append(line)
            else:
                print(f"⚠️ 在 {file_id} 找不到對應時間戳，略過 -> {entity_text}")
        else:
            print(f"⚠️ file_id: {file_id} 中的實體不是預期的字典格式 ({entity}, type: {type(entity)})，已略過")


with open('/content/drive/MyDrive/Colab Notebooks/aicup0607/task2_output.txt', 'w', encoding='utf-8') as fout:
    fout.write('\n'.join(output_lines))

print("✅ 已完成 task2_answer.txt 的生成！")