# **Instructions to Keep in Mind Before Running the Notebook:**

1. **Get the Subscription Key**:  
   Go to [dashboard.sarvam.ai](https://dashboard.sarvam.ai) and copy your **API-Subscription Key**.

2. **Upload Files**:  
   Use the **Upload** button in the Jupyter notebook to upload the files (e.g., audio files) you want to process.

3. **Set File Path**:  
   Modify the file path in the code to match the path of your uploaded file, e.g., file_path = '/path/to/your/file.wav'.

4. **Set Download Directory**:  
   Change the directory path where you want the files to be saved after download, e.g., destination_dir = '/path/to/your/local/directory'.

5. **Run the Code**:  
   Execute the code with the correct **API key**, **file paths**, and **download directory**.



### **Note:-**

The Batch API currently supports the file duration of 10 minutes. If you have a longer file, then you can try splitting the file into chunks of 10 minutes each. We have provided the code to split the file into chunks at the end of the notebook.


## 1. Setup and Configuration

First, let's install the required packages:

## 1. Setup and Configuration

First, let's install the required packages:


In [None]:
! pip install -Uqq azure-storage-file-datalake aiofiles aiohttp requests

Now import the necessary libraries and set up logging:


In [2]:
import asyncio
import aiofiles
import requests
import json
from urllib.parse import urlparse
from azure.storage.filedatalake.aio import DataLakeDirectoryClient, FileSystemClient
from azure.storage.filedatalake import ContentSettings
import mimetypes
import logging
from pprint import pprint
import os

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# Configuration
API_SUBSCRIPTION_KEY = "YOUR_API_KEY"

## 2. SarvamClient Class

The SarvamClient class handles all Batch operations:

In [3]:
class SarvamClient:
    def __init__(self, url: str):
        self.account_url, self.file_system_name, self.directory_name, self.sas_token = (
            self._extract_url_components(url)
        )
        self.lock = asyncio.Lock()
        print(f"Initialized SarvamClient with directory: {self.directory_name}")

    def update_url(self, url: str):
        self.account_url, self.file_system_name, self.directory_name, self.sas_token = (
            self._extract_url_components(url)
        )
        print(f"Updated URL to directory: {self.directory_name}")

    def _extract_url_components(self, url: str):
        parsed_url = urlparse(url)
        account_url = f"{parsed_url.scheme}://{parsed_url.netloc}".replace(
            ".blob.", ".dfs."
        )
        path_components = parsed_url.path.strip("/").split("/")
        file_system_name = path_components[0]
        directory_name = "/".join(path_components[1:])
        sas_token = parsed_url.query
        return account_url, file_system_name, directory_name, sas_token

    async def upload_files(self, local_file_paths, overwrite=True):
        print(f"Starting upload of {len(local_file_paths)} files")
        async with DataLakeDirectoryClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            directory_name=self.directory_name,
            credential=None,
        ) as directory_client:
            tasks = []
            for path in local_file_paths:
                file_name = path.split("/")[-1]
                tasks.append(
                    self._upload_file(directory_client, path, file_name, overwrite)
                )
            results = await asyncio.gather(*tasks, return_exceptions=True)
            print(
                f"Upload completed for {sum(1 for r in results if not isinstance(r, Exception))} files"
            )

    async def _upload_file(
        self, directory_client, local_file_path, file_name, overwrite=True
    ):
        try:
            async with aiofiles.open(local_file_path, mode="rb") as file_data:
                mime_type = mimetypes.guess_type(local_file_path)[0] or "audio/wav"
                file_client = directory_client.get_file_client(file_name)
                data = await file_data.read()
                await file_client.upload_data(
                    data,
                    overwrite=overwrite,
                    content_settings=ContentSettings(content_type=mime_type),
                )
                print(f"✅ File uploaded successfully: {file_name}")
                print(f"   Type: {mime_type}")
                return True
        except Exception as e:
            print(f"❌ Upload failed for {file_name}: {str(e)}")
            return False

    async def list_files(self):
        print("\n📂 Listing files in directory...")
        file_names = []
        async with FileSystemClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            credential=None,
        ) as file_system_client:
            async for path in file_system_client.get_paths(self.directory_name):
                file_name = path.name.split("/")[-1]
                async with self.lock:
                    file_names.append(file_name)
        print(f"Found {len(file_names)} files:")
        for file in file_names:
            print(f"   📄 {file}")
        return file_names

    async def download_files(self, file_names, destination_dir):
        print(f"\n⬇️ Starting download of {len(file_names)} files to {destination_dir}")
        async with DataLakeDirectoryClient(
            account_url=f"{self.account_url}?{self.sas_token}",
            file_system_name=self.file_system_name,
            directory_name=self.directory_name,
            credential=None,
        ) as directory_client:
            tasks = []
            for file_name in file_names:
                tasks.append(
                    self._download_file(directory_client, file_name, destination_dir)
                )
            results = await asyncio.gather(*tasks, return_exceptions=True)
            print(
                f"Download completed for {sum(1 for r in results if not isinstance(r, Exception))} files"
            )

    async def _download_file(self, directory_client, file_name, destination_dir):
        try:
            file_client = directory_client.get_file_client(file_name)
            download_path = f"{destination_dir}/{file_name}"
            async with aiofiles.open(download_path, mode="wb") as file_data:
                stream = await file_client.download_file()
                data = await stream.readall()
                await file_data.write(data)
            print(f"✅ Downloaded: {file_name} -> {download_path}")
            return True
        except Exception as e:
            print(f"❌ Download failed for {file_name}: {str(e)}")
            return False

## 3. Sarvam AI API Integration

These functions handle the Call-Analytics job lifecycle:

In [4]:
BASE_URL = "https://api.sarvam.ai/call-analytics/"


async def initialize_job():
    print("\n🚀 Initializing job...")
    url = BASE_URL + "job/init"
    headers = {"API-Subscription-Key": API_SUBSCRIPTION_KEY}
    response = requests.post(url, headers=headers)
    print("\nInitialize Job Response:")
    print(f"Status Code: {response.status_code}")
    print("Response Body:")
    pprint(response.json() if response.status_code == 202 else response.text)

    if response.status_code == 202:
        return response.json()
    return None


async def check_job_status(job_id):
    print(f"\n🔍 Checking status for job: {job_id}")
    url = BASE_URL + f"job/{job_id}/status"
    headers = {"API-Subscription-Key": API_SUBSCRIPTION_KEY}
    response = requests.get(url, headers=headers)
    print("\nJob Status Response:")
    print(f"Status Code: {response.status_code}")
    print("Response Body:")
    pprint(response.json() if response.status_code == 200 else response.text)

    if response.status_code == 200:
        return response.json()
    return None


async def start_job(job_id):
    print(f"\n▶️ Starting job: {job_id}")
    url = BASE_URL + "job"
    headers = {
        "API-Subscription-Key": API_SUBSCRIPTION_KEY,
        "Content-Type": "application/json",
    }

    # JOB PARAMETERS
    data = {
        "job_id": job_id,
        "job_parameters": {
            "model": "saaras:v2",
            "with_diarization": True,
            "num_speakers": 1,
            "questions": [
                {
                    "id": "1",
                    "type": "short answer",
                    "text": "what is the sentiment of the user?",
                    "description": "speaker_1 is agent and speaker_0 is user",
                },
                {
                    "id": "2",
                    "type": "short answer",
                    "text": "what is the summary of the call?",
                    "description": "give crisp executive summary",
                },
            ],
        },
    }
    print("\nRequest Body:")
    pprint(data)

    response = requests.post(url, headers=headers, data=json.dumps(data))
    print("\nStart Job Response:")
    print(f"Status Code: {response.status_code}")
    print("Response Body:")
    pprint(response.json() if response.status_code == 200 else response.text)

    if response.status_code == 200:
        return response.json()
    return None

## 4. Main Execution Flow

Here's the main function that orchestrates the entire process:

In [None]:
async def main():
    print("\\n=== Starting Call-Analytics Processing ===")

    # Step 1: Initialize the job
    job_info = await initialize_job()
    if not job_info:
        print("❌ Job initialization failed")
        return

    job_id = job_info["job_id"]
    input_storage_path = job_info["input_storage_path"]
    output_storage_path = job_info["output_storage_path"]

    # Step 2: Upload files
    print(f"\\n📤 Uploading files to input storage: {input_storage_path}")
    client = SarvamClient(input_storage_path)
    local_files = [
        "/Users/vinayakgavariya/Downloads/Call-Recording (1).mp3"
    ]  # Replace with your audio files
    print(f"Files to upload: {local_files}")
    await client.upload_files(local_files)

    # Step 3: Start the job
    job_start_response = await start_job(job_id)
    if not job_start_response:
        print("❌ Failed to start job")
        return

    # Step 4: Monitor job status
    print("\\n⏳ Monitoring job status...")
    attempt = 1
    while True:
        print(f"\\nStatus check attempt {attempt}")
        job_status = await check_job_status(job_id)
        if not job_status:
            print("❌ Failed to get job status")
            break

        status = job_status["job_state"]
        if status == "Completed":
            print("✅ Job completed successfully!")
            break
        elif status == "Failed":
            print("❌ Job failed!")
            break
        else:
            print(f"⏳ Current status: {status}")
            await asyncio.sleep(10)
        attempt += 1

    # Step 5: Download results
    if status == "Completed":
        print(f"\n📥 Downloading results from: {output_storage_path}")
        client.update_url(output_storage_path)  # Update URL to the file path

        # List all the files you want to download
        files = await client.list_files()

        # Specify the local destination directory
        destination_dir = "sarvam-ai-cookbook/notebooks/call-analytics/output"  # Set this to the local path you want

        # Make sure the directory exists before downloading
        os.makedirs(destination_dir, exist_ok=True)

        # Download the files to the local directory
        await client.download_files(files, destination_dir=destination_dir)
        print(f"Files have been downloaded to: {destination_dir}")

        print("\\n=== Processing Complete ===")


# Run the main function
if __name__ == "__main__":
    await main()

## **For files longer than 10 minutes**

### **Define the `split_audio` Function**

This function splits an audio file into smaller chunks of a specified duration. This is useful for processing long audio files that exceed the API's input length limit.

### Note:-
Please modify this code according to your use case.

In [15]:
def split_audio(audio_path, chunk_duration_ms):
    """
    Splits an audio file into smaller chunks of specified duration.

    Args:
        audio_path (str): Path to the audio file to be split.
        chunk_duration_ms (int): Duration of each chunk in milliseconds.

    Returns:
        list: A list of AudioSegment objects representing the audio chunks.
    """
    audio = AudioSegment.from_file(audio_path)  # Load the audio file
    chunks = []
    if len(audio) > chunk_duration_ms:
        # Split the audio into chunks of the specified duration
        for i in range(0, len(audio), chunk_duration_ms):
            chunks.append(audio[i : i + chunk_duration_ms])
    else:
        # If the audio is shorter than the chunk duration, use the entire audio
        chunks.append(audio)
    return chunks

In [16]:
def transcribe_audio_chunks(
    audio_file_path, api_url, headers, data, chunk_duration_ms=5 * 60 * 1000
):
    """
    Transcribes audio chunks using the Speech-to-Text API.

    Args:
        audio_file_path (str): Path to the audio file.
        api_url (str): The API endpoint URL for Speech-to-Text.
        headers (dict): Headers containing authentication information.
        data (dict): Data payload for the transcription API.
        chunk_duration_ms (int): Duration of each audio chunk in milliseconds.

    Returns:
        dict: Collated response containing the transcript.
    """
    # Split the audio into chunks
    chunks = split_audio(audio_file_path, chunk_duration_ms)
    responses = []  # List to store the transcription results

    # Process each chunk
    for idx, chunk in enumerate(chunks):
        # Export the chunk to a BytesIO object (in-memory binary stream)
        chunk_buffer = io.BytesIO()
        chunk.export(chunk_buffer, format="wav")
        chunk_buffer.seek(0)  # Reset the pointer to the start of the stream

        # Prepare the file for the API request
        files = {
            "file": ("/content/SecondCallAnalyticsJob.wav", chunk_buffer, "audio/wav")
        }

        try:
            # Make the POST request to the API
            response = requests.post(api_url, headers=headers, files=files, data=data)
            if response.status_code == 200 or response.status_code == 201:
                print(f"Chunk {idx} POST Request Successful!")
                response_data = response.json()
                transcript = response_data.get("transcript", "")
                responses.append({"transcript": transcript})
            else:
                # Handle failed requests
                print(
                    f"Chunk {idx} POST Request failed with status code: {response.status_code}"
                )
                print("Response:", response.text)
        except Exception as e:
            # Handle any exceptions during the request
            print(f"Error processing chunk {idx}: {e}")
        finally:
            # Ensure the buffer is closed after processing
            chunk_buffer.close()

    # Collate the transcriptions from all chunks
    collated_responses = {
        "collated_transcript": " ".join([i["transcript"] for i in responses])
    }
    return collated_responses

## **Additional Resources**

For more details, refer to the official **Saaras API documentation** and join the community for support:

- **Documentation**: [docs.sarvam.ai](https://docs.sarvam.ai/)
- **Community**: [Join the Discord Community](https://discord.com/invite/T8BGY8TU)

---

## **Final Notes**

- Keep your API key secure.
- Use clear audio for best results.
- Explore advanced features like diarization and word-level timestamps.

**Keep Building!** 🚀