In [46]:
# Import all necessary libraries

import os
import time
import httpx
import logging
import subprocess
from openai import OpenAI, OpenAIError
import difflib
import threading
from queue import Queue


In [44]:
#Definition of get_llm_from_api function. Gives back list of all available LLMs from KISSKI
def get_llms_from_api():
    """
    Get all LLMs from the API.
    """
    client = OpenAI(
        api_key = "f00b07316f6f7f5e2f1519e7be703dba", base_url="https://chat-ai.academiccloud.de/v1/"
    )
    response = client.models.list()
    llms = []
    for model in response.data:
        llms.append(
            {
                "id_name": model.id,
                "short_name": model.name,
            }
        )
    return llms

#Ausführen der Funktion
get_llms_from_api()

[{'id_name': 'meta-llama-3.1-8b-instruct',
  'short_name': 'Meta Llama 3.1 8B Instruct'},
 {'id_name': 'openai-gpt-oss-120b', 'short_name': 'OpenAI GPT OSS 120B'},
 {'id_name': 'gemma-3-27b-it', 'short_name': 'Gemma 3 27B Instruct'},
 {'id_name': 'qwen3-32b', 'short_name': 'Qwen 3 32B'},
 {'id_name': 'qwen3-30b-a3b-thinking-2507',
  'short_name': 'Qwen 3 30B A3B Thinking 2507'},
 {'id_name': 'qwen3-235b-a22b', 'short_name': 'Qwen 3 235B A22B 2507'},
 {'id_name': 'llama-3.3-70b-instruct',
  'short_name': 'Meta Llama 3.3 70B Instruct'},
 {'id_name': 'qwen2.5-vl-72b-instruct',
  'short_name': 'Qwen 2.5 VL 72B Instruct'},
 {'id_name': 'medgemma-27b-it', 'short_name': 'MedGemma 27B Instruct'},
 {'id_name': 'qwq-32b', 'short_name': 'Qwen QwQ 32B'},
 {'id_name': 'deepseek-r1', 'short_name': 'DeepSeek R1 0528'},
 {'id_name': 'deepseek-r1-distill-llama-70b',
  'short_name': 'DeepSeek R1 Distill Llama 70B'},
 {'id_name': 'mistral-large-instruct', 'short_name': 'Mistral Large Instruct'},
 {'id_na

In [7]:
# Configure logger and get_llm_response function
logger = logging.getLogger('test_logger')
logger.setLevel(logging.DEBUG) # Set logging level to DEBUG for more detailed output


# Definition of get_llm_response function
def get_llm_response(
    messages: list,
    model="meta-llama-3.1-8b-instruct",
    temperature=0.7,
    key=None,
    max_tokens=1000,
    url=None,
):
    logger.debug("Starting get_llm_response function.")

    # Set API credentials
    try:
        logger.debug("Attempting to get API key and URL.")
        if key is None:
            key = "f00b07316f6f7f5e2f1519e7be703dba"
            logger.debug(f"Retrieved key from userdata: {'Key found' if key else 'Key not found'}")
        else:
            logger.debug("Key provided directly.")

        if url is None:
            url = "https://chat-ai.academiccloud.de/v1/" # Replace with your env var name
            logger.debug(f"Using default URL: {url}")
        else:
             logger.debug(f"URL provided directly: {url}")

    except KeyError as e:
        logger.error(f"KeyError when getting API key or URL: {e}")
        return "I'm sorry, but I couldn't process your request.", {}

    if not key:
        logger.error("API key is missing.")
        return "I'm sorry, but the API key is not set.", {}

    logger.debug(f"API Key status: {'Set' if key else 'Not Set'}")
    logger.debug(f"Base URL: {url}")

    # Initialize client
    try:
        logger.debug("Initializing OpenAI client.")
        client = OpenAI(
            api_key=key,
            base_url=url,
            timeout=httpx.Timeout(60.0, connect=10.0)
        )
        logger.debug("OpenAI client initialized successfully.")
    except Exception as e:
         logger.error(f"Error initializing OpenAI client: {e}")
         return "I'm sorry, there was an error initializing the API client.", {}


    metadata = {}

    # Make API request
    try:
        logger.debug("Making API request.")
        start_time = time.perf_counter()
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens,
        )
        end_time = time.perf_counter()
        duration = end_time - start_time
        logger.debug("API request completed.")

        # Extract response data
        response_content = response.choices[0].message.content
        prompt_tokens = response.usage.prompt_tokens if response.usage else 'N/A'
        generated_tokens = response.usage.completion_tokens if response.usage else 'N/A'

        # Log information
        logger.info(f"LLM response time: {duration:.4f} seconds")
        logger.debug(f"Prompt tokens: {prompt_tokens}, Generated tokens: {generated_tokens}")

        # Create metadata dictionary
        metadata = {
            "prompt_tokens": prompt_tokens,
            "generated_tokens": generated_tokens,
            "duration": duration,
        }
        logger.debug("Metadata created.")

        return response_content, metadata
    except OpenAIError as e:
        logger.error(f"An OpenAIError occurred while calling the LLM API: {e}")
        return "I'm sorry, but I couldn't process your request due to an API error.", metadata
    except Exception as e:
        logger.error(f"An unexpected error occurred during the API call: {e}")
        return "I'm sorry, an unexpected error occurred.", metadata

In [8]:
#Definition of record_audio (once) function 

def record_audio(output_filename, duration=15):
    """
    Records audio from the microphone for a given duration and saves it to a file.

    Args:
        output_filename (str): The name of the output .wav file.
        duration (int): The duration of the recording in seconds.
    """
    print(f"Starting recording for {duration} seconds...")
    command = [
        'ffmpeg',
        '-f', 'avfoundation',  # Use AVFoundation for macOS
        '-i', ':0',            # Select the default audio device
        '-t', str(duration),   # Set the recording duration
        '-y',                  # Overwrite output file if it exists
        output_filename
    ]
    
    try:
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Recording saved successfully as {output_filename}")
        print("FFmpeg output:")
        print(result.stdout)
        print(result.stderr)
    except subprocess.CalledProcessError as e:
        print(f"An error occurred during recording: {e}")
        print("FFmpeg output (stderr):")
        print(e.stderr)
    except FileNotFoundError:
        print("ffmpeg not found. Please ensure ffmpeg is installed and in your PATH.")

# Record 10 seconds of audio and save it as 'meeting.wav'
record_audio('meeting.wav', duration=15)

Starting recording for 15 seconds...
Recording saved successfully as meeting.wav
FFmpeg output:

ffmpeg version 7.1.1 Copyright (c) 2000-2025 the FFmpeg developers
  built with Apple clang version 16.0.0 (clang-1600.0.26.6)
  configuration: --prefix=/opt/homebrew/Cellar/ffmpeg/7.1.1_1 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags='-Wl,-ld_classic' --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libharfbuzz --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-li

In [9]:
#Transcribe Audio with STT-Service with Docker Container, definition and execution
def transcribe_audio(file_path):
    """
    Sends an audio file to the STT service and returns the transcription.

    Args:
        file_path (str): The path to the audio file.
    """
    logger.info(f"Attempting to transcribe audio file: {file_path}")
    
    # The URL of the transcription service in the Docker container
    transcribe_url = "http://localhost:8080/transcribe/file"
    
    try:
        if not os.path.exists(file_path):
            logger.error(f"Audio file not found at: {file_path}")
            return "Error: Audio file not found."

        with open(file_path, "rb") as audio_file:
            files = {"file": (os.path.basename(file_path), audio_file, "audio/wav")}
            
            logger.debug(f"Sending POST request to {transcribe_url}")
            with httpx.Client() as client:
                response = client.post(transcribe_url, files=files, timeout=60)
            
            # Check if the request was successful
            response.raise_for_status()
            
            transcription = response.json()
            logger.info("Successfully received transcription.")
            logger.debug(f"Transcription result: {transcription}")
            return transcription

    except httpx.RequestError as e:
        logger.error(f"An error occurred while requesting transcription: {e}")
        return f"Error connecting to the transcription service: {e}"
    except httpx.HTTPStatusError as e:
        logger.error(f"Received an HTTP error: {e.response.status_code} - {e.response.text}")
        return f"HTTP Error: {e.response.status_code}"
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")
        return f"An unexpected error occurred: {e}"

#transcribe the audio recording
audio_to_transcribe = 'meeting.wav'
transcription_result = transcribe_audio(audio_to_transcribe)

print("Transcription Result:")
print(transcription_result)

Transcription Result:
{'transcription': ' über den Airports?  Okay.  Legen eine Airports in das Labelkäse.  Sie ist in der Kühlenfahrt und 30 Sekunden.  2.2.  Auf einem iPhone-Liefer.  Das sind deinen Airports-Gegoppel.  Das ist die Option.  Ein Stellung.  Nun schluss auf.', 'language': 'de', 'confidence': 0.945986270904541}


In [None]:
#Originaltext that was transcribed
original_text = '''Um besser zu verstehen, welche Vorteile dir Django bietet, werfen wir einen Blick auf Server im Allgemeinen. 
Als Erstes muss der Server wissen, dass er eine Webseite ausliefern soll. Der Server hat mehrere "Ports". 
Ein Port ist vergleichbar mit einem Briefkasten, der auf eingehende Briefe antwortet.'''




In [None]:
# Erstellen Sie die Nachrichten für die LLM-API
messages = [
    {"role": "system", "content": "Sie sind ein hilfreicher Assistent, der Texte zusammenfasst."},
    {"role": "user", "content": f"Fassen Sie den folgenden Text zusammen:\n\n {transcription_result} "}
]

# Rufen Sie die get_llm_response Funktion auf (aus der Zelle oben)
summary, metadata = get_llm_response(messages)

# Geben Sie die Zusammenfassung aus
print("Zusammenfassung:")
print(summary)
print(metadata)

# Optional: Metadaten anzeigen
# print("\nMetadaten:")
# display(metadata)

Zusammenfassung:
Ein Server benötigt Informationen, um eine Website auszuliefern. Er hat verschiedene Ports, die wie Briefkästen funktionieren, die auf bestimmte Anfragen reagieren.
{'prompt_tokens': 162, 'generated_tokens': 41, 'duration': 1.1010920840017207}


In [40]:
# Initialize rolling summary
rolling_summary = ""

# After each new transcription, update the rolling summary
def update_rolling_summary(new_transcription, previous_summary):
    messages = [
        {"role": "system", "content": "Du bist ein hilfreicher Assistent, der fortlaufende Zusammenfassungen erstellt."},
        {"role": "user", "content": f"Vorherige Zusammenfassung:\n{previous_summary}\n\nNeue Nachricht:\n{new_transcription}\n\nFasse alles zusammen."}
    ]
    summary, metadata = get_llm_response(messages)
    return summary

# Example usage after getting a new transcription
new_transcription = transcription_result.get('transcription', '')
rolling_summary = update_rolling_summary(new_transcription, rolling_summary)

print("Rolling Summary:")
print(rolling_summary)

Rolling Summary:
Zusammenfassung:

Ein Webserver ist für die Verbindung zwischen Benutzer und Website verantwortlich. Er nimmt Anfragen (Request) entgegen und antwortet mit der Website (Response). Um die Website bereitzustellen, benötigt der Webserver jedoch Inhalte, die durch eine Framework wie Django erstellt werden können. Django hilft bei der Erstellung von Inhalten, um die Website nutzbar zu machen.


In [None]:
#Pseudoaudiostreaming with proper logging
def record_chunk(filename, duration):
    """
    Records a short chunk of audio using ffmpeg.
    
    Args:
        filename (str): Output filename for the audio chunk
        duration (int): Duration in seconds to record
    """
    logger.info(f"Starting recording of {duration}s audio chunk to {filename}")
    command = [
        'ffmpeg',
        '-f', 'avfoundation',  # macOS
        '-i', ':0',
        '-t', str(duration),
        '-y',
        filename
    ]
    try:
        logger.debug(f"Executing ffmpeg command: {' '.join(command)}")
        subprocess.run(command, check=True, capture_output=True)
        logger.info(f"Successfully recorded audio chunk to {filename}")
    except subprocess.SubprocessError as e:
        logger.error(f"Error recording audio chunk: {e}")
        raise

def transcribe_chunk(file_path):
    """
    Sends an audio chunk to the transcription service.
    
    Args:
        file_path (str): Path to the audio file to transcribe
        
    Returns:
        str: The transcribed text
    """
    logger.info(f"Transcribing audio chunk: {file_path}")
    url = "http://localhost:8080/transcribe/file"
    
    try:
        if not os.path.exists(file_path):
            logger.error(f"Audio file not found: {file_path}")
            return "Error: Audio file not found."
            
        with open(file_path, "rb") as audio_file:
            files = {"file": (os.path.basename(file_path), audio_file, "audio/wav")}
            
            logger.debug(f"Sending POST request to {url}")
            with httpx.Client() as client:
                response = client.post(url, files=files, timeout=60)
            
            # Check if the request was successful
            response.raise_for_status()
            
            transcription = response.json().get('transcription', '')
            logger.info(f"Successfully transcribed chunk: {file_path}")
            logger.debug(f"Transcription result: {transcription[:50]}..." if len(transcription) > 50 else transcription)
            return transcription
            
    except httpx.RequestError as e:
        logger.error(f"Request error during transcription: {e}")
        return f"Error connecting to the transcription service: {e}"
    except httpx.HTTPStatusError as e:
        logger.error(f"HTTP error during transcription: {e.response.status_code} - {e.response.text}")
        return f"HTTP Error: {e.response.status_code}"
    except Exception as e:
        logger.error(f"Unexpected error during transcription: {e}")
        return f"An unexpected error occurred: {e}"

def pseudo_streaming(max_chunks=None, chunk_duration=10):

    logger.info("Starting pseudo-streaming audio transcription service")
    chunk_queue = Queue()
    stop_event = threading.Event()
    chunk_idx = 0
    rolling_summary = ""
    recorder_exception = None
    worker_exception = None
    
    def recorder():
        nonlocal recorder_exception
        try:
            idx = 0
            while not stop_event.is_set() and (max_chunks is None or idx < max_chunks):
                chunk_file = f"chunk_{idx}.wav"
                logger.info(f"Processing chunk {chunk_file}")
                try: 
                    record_chunk(chunk_file, duration=chunk_duration)
                    chunk_queue.put(chunk_file)
                    logger.debug(f"[recorder] Put {chunk_file} in queue")
                except Exception as e: 
                    logger.error(f"[recorder] Error recording chunk {idx}: {e}")
                    recorder_exception = e
                    stop_event.set()
                    break
                idx += 1 
            logger.info("[recorder] Finished recording loop")
        except Exception as e:
            recorder_exception = e 
            stop_event.set()
            logger.exception("[recorder] Unhandled exception")
                
    def worker():
        nonlocal rolling_summary, worker_exception
        processed = 0
        try:
            while not stop_event.is_set():
                try:
                    chunk_file = chunk_queue.get(timeout=2)
                except Exception:
                    # timeout, check termination condition
                    if max_chunks is not None and processed >= max_chunks:
                        logger.debug("[worker] No more expected chunks and processed reached max_chunks")
                        break
                    continue

                logger.info(f"[worker] Transcribing {chunk_file}")
                try:
                    transcription = transcribe_chunk(chunk_file)
                    # append to full transcription
                    with open("full_transcription.txt", "a", encoding="utf-8") as f:
                        f.write(transcription + "\n")
                    # update rolling summary with LLM
                    messages = [
                        {"role": "system", "content": "Fasse den erhaltenen Text zusammen. Falls eine Vorherige Zusammenfassung existiert, ergänze sie um die Neue Nachricht. Falls keine Vorherige Zusammenfassung existiert, beginne eine Neue. Fasse den Inhalt so kurz wie möglich zusammen"},
                        {"role": "user", "content": f"Vorherige Zusammenfassung:\n{rolling_summary}\n\nNeue Nachricht:\n{transcription}\n\n."}
                    ]
                    summary, metadata = get_llm_response(messages)
                    rolling_summary = summary
                    logger.info(f"[worker] Updated rolling summary (chunks processed: {processed+1})")
                except Exception as e:
                    logger.error(f"[worker] Error processing {chunk_file}: {e}")
                    worker_exception = e
                finally: 
                    #cleanup 
                    try: 
                        if os.path.exists(chunk_file):
                            os.remove(chunk_file)
                            logger.debug(f"[worker] Removed temporary chunk file {chunk_file}")
                    except Exception as e:
                        logger.error(f"[worker] Error cleaning up {chunk_file}: {e}")
                    processed += 1
                    chunk_queue.task_done()
                
                if max_chunks is not None and processed >= max_chunks:
                    logger.debug("[worker] Reached max_chunks, stopping")
                    stop_event.set()
                    break
            logger.info("[worker] Exiting worker loop")        
        except Exception as e:
            worker_exception = e
            stop_event.set()
            logger.exception("[worker] Unhandled exception")

    # Start threads inside the function
    rec_thread = threading.Thread(target=recorder, name="recorder_thread", daemon=True)
    work_thread = threading.Thread(target=worker, name="worker_thread", daemon=True)
    rec_thread.start()
    work_thread.start()

    try: 
        while work_thread.is_alive():
            work_thread.join(timeout=1)
    except KeyboardInterrupt:
        logger.info("Pseudo_streaming stopped by user via KeyboardInterrupt")
        stop_event.set()

    stop_event.set()
    rec_thread.join(timeout=2)
    work_thread.join(timeout=2)

    if recorder_exception: 
        logger.error(f"Recorder error occured: {recorder_exception}")
    if worker_exception: 
        logger.error(f"Worker error occured: {worker_exception}")

    logger.info("Pseudo_streaming ended")
    return rolling_summary


result = pseudo_streaming(max_chunks=2, chunk_duration=10)
print(result)


Neue Nachricht:
 Es gibt Anzeichen dafür, dass Menschen auf dem Wohnungsmarkt in Göttingen rassistisch diskriminiert werden. Eine Studie hat festgestellt, dass Menschen mit fiktiven Namen, die unterschiedliche Herkunftsrückgründe haben, unterschiedlich behandelt werden. Während Josef Eilgen, ein Name mit möglicherweise nicht deutscher Herkunft, seltener zu Wohnungsbesichtigungen eingeladen wird, wird Jakob Schütte, ein Name mit deutscher Herkunft, häufiger eingeladen. Diese Ergebnisse deuten darauf hin, dass rassistische Diskriminierung vorliegt, was illegal ist.


In [None]:
# Record audio chunk
                record_chunk(chunk_file, duration=10)
                
                # Transcribe the chunk
                transcription = transcribe_chunk(chunk_file)
            
                with open("full_transcription.txt", "a", encoding="utf-8") as f:
                f.write(transcription + "\n")    

                # Clean up
                logger.debug(f"Removing temporary file: {chunk_file}")
                os.remove(chunk_file)
                
            
                messages = [
                    {"role": "system", "content": "Fasse den erhaltenen Text zusammen. Falls eine Vorherige Zusammenfassung existiert, ergänze sie um die Neue Nachricht. Falls keine Vorherige Zusammenfassung existiert, beginne eine Neue."},
                    {"role": "user", "content": f"Vorherige Zusammenfassung:\n{rolling_summary}\n\nNeue Nachricht:\n{transcription}\n\n."}
                ]
                rolling_summary, metadata = get_llm_response(messages)
                #print("Zusammenfassung")
                print(rolling_summary)
                #print(metadata)

                # Increment for next iteration
                chunk_idx += 


        except KeyboardInterrupt:
            logger.info("Pseudo-streaming stopped by user")
        except Exception as e:
            logger.error(f"Error in pseudo-streaming: {e}")
        finally:
            logger.info(f"Pseudo-streaming ended after processing {chunk_idx} chunks")
            return None #rolling_summary