In [8]:
from pydantic import BaseModel, Field, computed_field, field_serializer
from tqdm.auto import tqdm

import instructor
import asyncio
import openai
import httpx
import rich

from typing import Generator
from pathlib import Path
import json
import os



In [2]:
class Recording(BaseModel):
    path: Path = Field(..., description="The name of the recording file")
    job_id: str | None = Field(None, description="The job id of the transcription job")
    raw_transcript: str | None = Field(None, description="The transcription of the recording")
    summary: str | None = Field(None, description="The summary of the recording")

    @field_serializer("path")
    def path_serializer(path: Path):
        return str(path)


audio_path = Path("/mnt/arrakis/meddibia/meddibia-demos/data/recordings")
audio_files = [Recording(path=str(p)) for p in audio_path.glob("*.mp3")]

audio_files

[Recording(path=PosixPath('/mnt/arrakis/meddibia/meddibia-demos/data/recordings/native-AUD-20240523-061346.mp3'), job_id=None, raw_transcript=None, summary=None),
 Recording(path=PosixPath('/mnt/arrakis/meddibia/meddibia-demos/data/recordings/english-AUD-20240423-WA0003.mp3'), job_id=None, raw_transcript=None, summary=None),
 Recording(path=PosixPath('/mnt/arrakis/meddibia/meddibia-demos/data/recordings/english-AUD-20240423-WA0002.mp3'), job_id=None, raw_transcript=None, summary=None),
 Recording(path=PosixPath('/mnt/arrakis/meddibia/meddibia-demos/data/recordings/native-AUD-20240523-080100.mp3'), job_id=None, raw_transcript=None, summary=None),
 Recording(path=PosixPath('/mnt/arrakis/meddibia/meddibia-demos/data/recordings/english-AUD-20240423-WA0004.mp3'), job_id=None, raw_transcript=None, summary=None),
 Recording(path=PosixPath('/mnt/arrakis/meddibia/meddibia-demos/data/recordings/native-AUD-20240524-013419.mp3'), job_id=None, raw_transcript=None, summary=None),
 Recording(path=Pos

In [21]:
def upload_file(audio_path: Path) -> Generator[bytes, None, None]:
    with open(audio_path, "rb") as f:
        file_size = os.path.getsize(audio_path)
        chunk_size = 1024

        with tqdm(
            total=file_size,
            unit="B",
            unit_scale=True,
            desc=f"Uploading {audio_path.name}",
        ) as pg:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk
                pg.update(len(chunk))


class FileGenerator:
    def __init__(self, audio_path: Path):
        self.generator = upload_file(audio_path)

    def read(self, size=-1):
        try:
            return next(self.generator)
        except StopIteration:
            return b""


async def transcribe(recording: Recording):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{os.environ['TRANSCRIPTION_SERVICE_URL']}/transcribe",
            headers={
                "Authorization": f"Bearer {os.environ['TRANSCRIPTION_SERVICE_KEY']}",
            },
            files={
                "file": (recording.path.name, FileGenerator(recording.path), "audio/mpeg")
            },
            timeout=15
        )

        if response.status_code == 200:
            recording.job_id = response.json()["call_id"]
        else:
            raise ValueError(f"Failed to transcribe {recording.path.name}")
        
        return recording


async def get_transcript(recording: Recording):
    assert recording.job_id is not None

    itr = 0
    while True:
        print(f"checking status of job {recording.job_id} for iteration {itr}")
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{os.environ['TRANSCRIPTION_SERVICE_URL']}/status/{recording.job_id}",
                timeout=15
            )

        if response.status_code == 200:
            status = response.json()["status"]
            if status == "complete":
                recording.raw_transcript = response.json()["transcript"]["text"].strip()
                break
            else:
                await asyncio.sleep(5)
                itr += 1
        else:
            print(f"Failed to get status of job {recording.job_id}")
            print(response.json())
            if itr > 1:
                break
    
    return recording


async def start_transcriptions(audio_files: list[Recording]) -> list[Recording]:
    semaphore = asyncio.Semaphore(4)  # Limit concurrency to 4
    async def process_recording(recording: Recording) -> Recording:
        async with semaphore:
            return await transcribe(recording)

    tasks = [process_recording(recording) for recording in audio_files]
    return await asyncio.gather(*tasks)


async def retrieve_transcriptions(transcribed_recordings: list[Recording]) -> list[Recording]:
    semaphore = asyncio.Semaphore(4)  # Limit concurrency to 4
    async def process_transcript(recording: Recording) -> Recording:
        async with semaphore:
            return await get_transcript(recording)

    tasks = [process_transcript(recording) for recording in transcribed_recordings if recording.job_id is not None]
    return await asyncio.gather(*tasks)

In [15]:
submitted_recordings = await start_transcriptions(audio_files)



Uploading native-AUD-20240523-061346.mp3:   0%|          | 0.00/1.05M [00:00<?, ?B/s]

Uploading english-AUD-20240423-WA0002.mp3:   0%|          | 0.00/1.20M [00:00<?, ?B/s]

Uploading native-AUD-20240523-080100.mp3:   0%|          | 0.00/1.24M [00:00<?, ?B/s]

Uploading english-AUD-20240423-WA0003.mp3:   0%|          | 0.00/344k [00:00<?, ?B/s]

Uploading english-AUD-20240423-WA0004.mp3:   0%|          | 0.00/81.9k [00:00<?, ?B/s]

Uploading native-AUD-20240524-013419.mp3:   0%|          | 0.00/1.18M [00:00<?, ?B/s]

Uploading english-AUD-20240423-WA0000.mp3:   0%|          | 0.00/906k [00:00<?, ?B/s]

Uploading native-AUD-20240523-063027.mp3:   0%|          | 0.00/787k [00:00<?, ?B/s]

Uploading native-AUD-20240523-063744.mp3:   0%|          | 0.00/1.54M [00:00<?, ?B/s]

Uploading english-AUD-20240423-WA0001.mp3:   0%|          | 0.00/1.06M [00:00<?, ?B/s]

Uploading native-AUD-20240524-013401.mp3:   0%|          | 0.00/541k [00:00<?, ?B/s]

Uploading english-AUD-20240423-WA0005.mp3:   0%|          | 0.00/748k [00:00<?, ?B/s]

Uploading native-AUD-20240524-013418.mp3:   0%|          | 0.00/1.18M [00:00<?, ?B/s]

In [29]:
transcribed_recordings = await retrieve_transcriptions(submitted_recordings)



checking status of job fc-01HZFJJMHWDP2XTX4C14N0FRMG for iteration 0
checking status of job fc-01HZFJJNT6KVZMK4P4YH493GNP for iteration 0
checking status of job fc-01HZFJJKRYHENZD3HMTV9B0SWP for iteration 0
checking status of job fc-01HZFJJK3HY0PABHMD8BGPKNMR for iteration 0
checking status of job fc-01HZFJJMQPBEYGWZVET6WYWDZ4 for iteration 0
checking status of job fc-01HZFJJNKXWR4130C4VYE7XHEC for iteration 0
checking status of job fc-01HZFJJPKMBAKGPTXDPJ5DWMKK for iteration 0
checking status of job fc-01HZFJJPDRANXCNZY1KAA8BX02 for iteration 0
checking status of job fc-01HZFJJQDA4VHXCXS8X5BDEE1M for iteration 0
checking status of job fc-01HZFJJSHMXN9Y4MKZE8XCTSDN for iteration 0
checking status of job fc-01HZFJJRAB4Q8T09VFQT0PRYA6 for iteration 0
checking status of job fc-01HZFJJR766X39RP8BDNXKDXGE for iteration 0
checking status of job fc-01HZFJJVHE8ZAB76RZCY82327K for iteration 0


In [32]:
data = []

for r in transcribed_recordings:
    data.append(Recording(**r.model_dump()))



In [39]:
with open(audio_path/'data.json', 'w') as f:
    json.dump([d.model_dump() for d in data], f)

In [11]:
with open(audio_path/'data.json', 'r') as f:
    data = [Recording(**d) for d in json.load(f)]



In [14]:
client = instructor.from_openai(openai.AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"]), mode=instructor.Mode.MD_JSON)

class Summarization(BaseModel):
    summary: str = Field(..., description="A summary of findings and relevant clinical information from the recording")

def get_messages(recording: Recording) -> dict:
    return [
        {
            "role": "system",
            "content": """\
                You are a medical expert. You are given a transcript from a consultation between a doctor and a patient.
                Review the transcript then write out a summary of the conversation that includes all the key points and relevant clinical information.
            """
        },
        {
            "role": "user",
            "content": f"Here is the transcript:\n\n{recording.raw_transcript}"
        }
    ]


async def summarize(recording: Recording):
    res = await client.chat.completions.create(
        model="gpt-4o",
        response_model=Summarization,
        temperature=0.0,
        messages=get_messages(recording)
    )
    recording.summary = res.summary
    return recording


async def summarize_recordings(recordings: list[Recording]) -> list[Recording]:
    semaphore = asyncio.Semaphore(2)  # Limit concurrency to 2
    async def process_recording(recording: Recording) -> Recording:
        print("summarizing", recording.path.name)
        async with semaphore:
            return await summarize(recording)

    tasks = [process_recording(recording) for recording in recordings]
    return await asyncio.gather(*tasks)

In [15]:
summarized_recordings = await summarize_recordings(data)

summarizing native-AUD-20240523-061346.mp3
summarizing english-AUD-20240423-WA0003.mp3
summarizing english-AUD-20240423-WA0002.mp3
summarizing native-AUD-20240523-080100.mp3
summarizing english-AUD-20240423-WA0004.mp3
summarizing native-AUD-20240524-013419.mp3
summarizing english-AUD-20240423-WA0000.mp3
summarizing native-AUD-20240523-063027.mp3
summarizing native-AUD-20240523-063744.mp3
summarizing english-AUD-20240423-WA0001.mp3
summarizing native-AUD-20240524-013401.mp3
summarizing english-AUD-20240423-WA0005.mp3
summarizing native-AUD-20240524-013418.mp3


In [17]:
with open(audio_path/'data.json', 'w') as f:
    json.dump([d.model_dump() for d in summarized_recordings], f)

