In [9]:
import os
import io
import sys
import time
import shutil

import json
import yaml
import tarfile

import requests

import pandas as pd
from pandas import DataFrame

from dotenv import load_dotenv
import logging


In [3]:
load_dotenv('../.env')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# API calls

In [110]:
# header: key generated by Sambanova Studio 

headers = {
    'content-type': 'application/json',
    'key': os.getenv('SAMBASTUDIO_KEY'),
}

In [111]:
# API endpoint urls

base_url = 'https://sjc1-demo1.sambanova.net'


In [109]:
# useful methods

def get_call(url, headers):
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        logging.info('GET request successful!')
        logging.debug(f'Response: {response.text}')
    else:
        logging.error(f'GET request failed with status code: {response.status_code}')
        logging.debug(f'Error message: {response.text}')
    return response

def post_call(url, params, headers):
    response = requests.post(url, json=params, headers=headers)

    if response.status_code == 200:
        logging.info('POST request successful!')
        logging.debug(f'Response: {response.text}')
    else:
        logging.error(f'POST request failed with status code: {response.status_code}')
        logging.debug(f'Error message: {response.text}')
    return response

def time_to_seconds(time_str):
    minutes, seconds = map(int, time_str.split(':'))
    return  minutes * 60 + seconds

def get_df_output(response_content: str) -> DataFrame:
    compressed_bytes = io.BytesIO(response_content)
    
    with tarfile.open(fileobj=compressed_bytes, mode="r:gz") as tar:
        output_tar_member = tar.getmember('results/output.csv')
        output_file = tar.extractfile(output_tar_member)
        output_df = pd.read_csv(io.BytesIO(output_file.read()), names = ['audio_path', 'results_path', 'speaker', 'start_time', 'end_time', 'unformatted_transcript', 'formatted_transcript'])
        output_df['start_time'] = output_df.apply(lambda x: time_to_seconds(x['start_time']), axis = 1)
        output_df['end_time'] = output_df.apply(lambda x: x['start_time'] + int(x['end_time'])/16000, axis = 1)
        output_df = output_df[['start_time', 'end_time', 'speaker', 'formatted_transcript']].rename(columns={'formatted_transcript': 'text'})
    
    return output_df

## Datasets

In [162]:
# GET: status of requested dataset

datasets_url = '/api/datasets'
dataset_name = '/PCA_dataset'
url = base_url + datasets_url + dataset_name

get_call(url, headers)

ERROR:root:GET request failed with status code: 500


<Response [500]>

In [43]:
# POST: creation of a new dataset

url = base_url + datasets_url

params = {
  'dataset_name': 'pca_test_v2',
  'application_field': 'speech',
  'job_type': ['batch_predict'],
  'language': 'english',
  'task': 'ASR With Diarization',
  'dataset_storage': 'LOCAL',
  'dataset_path': 'default/cap-engagements/datasets/local-dataset-42cf8587-6ce1-41bb-b7a4-2a069ae146bc', 
  'description': 'test',
  'file_type': 'test',
  'url': '',
}

post_call(url, params, headers)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): sjc1-demo1.sambanova.net:443
DEBUG:urllib3.connectionpool:https://sjc1-demo1.sambanova.net:443 "POST /api/datasets HTTP/1.1" 400 27
ERROR:root:POST request failed with status code: 400
DEBUG:root:Error message: {"detail":"field required"}


<Response [400]>

After talking Varun Malyala, he says that "Uploading a Dataset is not supported via snsdk directly", so we'd need to add a new feature (Feb 26, 24). "Right now, Uploading a local Dataset is only possible via snapi or UI", so we're going to try with snapi later 

## Projects

In [107]:
# POST: creation of a new project

projects_url = '/api/projects'
url = base_url + projects_url

params = {
  'project_name': 'pca_project_test',
  'description': 'test project for pca'
}

post_call(url, params, headers)

In [113]:
# GET: status of requested project

project_name = '/pca_project_test'
url = base_url + projects_url + project_name

response = get_call(url, headers)
parsed_reponse = json.loads(response.text)
project_id = parsed_reponse['data']['project_id']
project_id

INFO:root:GET request successful!


'5399bf56-201e-49d1-b663-dbf4678cebc4'

## Batch inference

In [47]:
# POST: creation of a new batch inference

jobs_url = '/' + project_id + '/jobs'
url = base_url + projects_url + jobs_url

params = {
  'task': 'ASR With Diarization',
  'job_type': 'batch_predict',
  'job_name': 'pca_test_pipeline',
  'project': project_id,
  'model_checkpoint': 'Diarization_ASR_Pipeline_V2',
  'description': 'test diarization pipeline',
  'dataset': 'PCA_test',
}

response = post_call(url, params, headers)
parsed_reponse = json.loads(response.text)
job_id = parsed_reponse['data']['job_id']

In [131]:
# GET: status of batch inference

url = base_url + projects_url + jobs_url + '/' + job_id

response = get_call(url, headers)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): sjc1-demo1.sambanova.net:443
DEBUG:urllib3.connectionpool:https://sjc1-demo1.sambanova.net:443 "GET /api/projects/5399bf56-201e-49d1-b663-dbf4678cebc4/jobs/458c3b1b-7deb-4d9f-b0ad-12fd04b47f2a HTTP/1.1" 200 1251
INFO:root:GET request successful!
DEBUG:root:Response: {"data":{"job_name":"pca_test_pipeline","job_id":"458c3b1b-7deb-4d9f-b0ad-12fd04b47f2a","job_type":"batch_predict","project_id":"5399bf56-201e-49d1-b663-dbf4678cebc4","status":"EXIT_WITH_0","dataset_id":"42cf8587-6ce1-41bb-b7a4-2a069ae146bc","input_data_path":"default/cap-engagements/datasets/local-dataset-42cf8587-6ce1-41bb-b7a4-2a069ae146bc","model_checkpoint":"Diarization_ASR_Pipeline_V2","checkpoint_id":null,"hyperparams":[{"param_name":"sockets","value":3,"description":"Number of sockets each instance of the model uses"}],"user_id":"rodrigo.maldonado","time_created":"2024-02-23T20:45:37.866825+00:00","time_updated":"2024-02-23T20:47:44.084972+00:00","descr

## Retrieve results

In [138]:
# GET: download results from pipeline

download_results_url = '/results/download'
url = base_url + projects_url + jobs_url + '/' + job_id + download_results_url

response = get_call(url, headers)
df = get_df_output(response.content)
df.head()

INFO:root:GET request successful!


Unnamed: 0,start_time,end_time,speaker,text
0,0,2.5,SPEAKER_01,Our Primeti 33. What is yet as emergency?
1,3,11.9,SPEAKER_00,"Yes, sir, I need to, uh, uh. I need an ambulan..."
2,11,12.3,SPEAKER_01,A car.
3,13,14.7,SPEAKER_00,Carol Wood Drive. Yes.
4,14,14.6,SPEAKER_01,


# Snapi

In [1]:
!snapi app list 

ASR With Diarization
Name                : ASR With Diarization
ID                  : b6aefdf7-02a4-4384-9c3c-8a81d735a54e
Playground          : False
Prediction Input    : text

ASR Without Diarization
Name                : ASR Without Diarization
ID                  : a36cc322-dd36-40e3-9641-d87ac48fe2c4
Playground          : False
Prediction Input    : file

CLIP
Name                : CLIP
ID                  : 6c14325a-1be7-4e48-b38f-19b33745fc3b
Playground          : False
Prediction Input    : text

Databox
Name                : Databox
ID                  : 199e9684-785c-4df0-8dc3-49e808d8eba5
Playground          : False
Prediction Input    : text

Deepseek 6.7B single socket
Name                : Deepseek 6.7B single socket
ID                  : 2eeb4b7f-bc56-48c4-8814-ef9d1e8806b8
Playground          : True
Prediction Input    : text

DePlot
Name                : DePlot
ID                  : 40f16b58-72a9-404f-a7c3-afc0d27a2343
Playground          : False
Prediction Input    :

In [6]:
# app id for ASR With Diarization

app_id = 'b6aefdf7-02a4-4384-9c3c-8a81d735a54e'

In [7]:
# we have to reference a json file that has the path of the data set that we'll upload

!snapi dataset add --help

[1m                                                                                [0m
[1m [0m[1;33mUsage: [0m[1msnapi dataset add [OPTIONS][0m[1m                                            [0m[1m [0m
[1m                                                                                [0m
 Add a new dataset                                                              
                                                                                
[2m╭─[0m[2m Options [0m[2m───────────────────────────────────────────────────────────────────[0m[2m─╮[0m
[2m│[0m    [1;36m-[0m[1;36m-file[0m                          [1;33mTEXT[0m                                      [2m│[0m
[2m│[0m [31m*[0m  [1;36m-[0m[1;36m-dataset[0m[1;36m-name[0m       [1;32m-n[0m         [1;33mTEXT[0m  Dataset name [2m[default: None][0m        [2m│[0m
[2m│[0m                                          [2;31m[required]  [0m                        [2m│[0m
[2m│[0m [31m*

In [10]:
command = f'echo yes | snapi dataset add \
    --dataset-name local_file_test_3 \
    --job_type batch_predict \
    --apps b6aefdf7-02a4-4384-9c3c-8a81d735a54e \
    --source_type localMachine \
    --source_file ../data/datasets/source.json \
    --application_field language \
    --language english \
    --description test_description_3'
os.system(command)


Folder Information:
  - Number of Files: 1
  - Total Size: 19.62 MB

Are you sure you want to proceed? ([33myes[0m/no)
: Uploading files
Dataset folder upload complete: ../test_data/
Dataset added successfully.
Time taken to upload the dataset: 5.327938079833984 seconds


0

# ASR Pipeline

Progress so far about the ASR pipeline. Next, a list with the tasks that still need to be done:
- There's an error with dataset API search or GET (more details [here](https://sjc1-demo1.sambanova.net/api/docs#/)), which are returning 500 everytime they're getting info from a previously deleted dataset. I was checking this issue with Sharad Venkateswaran on a help-sambastudio channel 
- Integration tests with all methods. So far, every method has been tested, but we need to do integration tests as next step. Process will be: dataset creation, project creation, job creation, job monitoring, and retrieve results. Also, this pipeline contemplates as input a path with the audio file, and outputs a csv, so take that in mind. Contact Jorge Piedrahita or Rodrigo Maldonado for more details
- Documentation on class and methods

In [151]:
def load_config(file_path):
    with open(file_path, 'r') as file:
        config = yaml.safe_load(file)
    return config

config = load_config('../config.yaml')

PENDING_RDU_JOB_STATUS = 'PENDING_RDU'
SUCCESS_JOB_STATUS = 'EXIT_WITH_0'

In [155]:
class BatchASRProcessor():
    
    def __init__(self, config) -> None:
        self.headers = {
            'content-type': 'application/json',
            'key': os.getenv('SAMBASTUDIO_KEY'),
        }
        self.datasets_path = config['asr']['datasets']['datasets_path']
        self.dataset_id = None
        self.dataset_name = config['asr']['datasets']['dataset_name']
        self.dataset_description = config['asr']['datasets']['dataset_description']
        self.dataset_source_type = config['asr']['datasets']['dataset_source_type']
        self.dataset_source_file = config['asr']['datasets']['dataset_source_file']
        self.dataset_language = config['asr']['datasets']['dataset_language']
        
        self.asr_with_diarization_app_id = config['asr']['apps']['asr_with_diarization_app_id']
        self.application_field = config['asr']['apps']['application_field']
        
        self.base_url = config['asr']['urls']['base_url']
        self.datasets_url = config['asr']['urls']['datasets_url'] 
        self.projects_url = config['asr']['urls']['projects_url'] 
        self.jobs_url = config['asr']['urls']['jobs_url'] 
        self.download_results_url = config['asr']['urls']['download_results_url'] 
    
        self.project_name = config['asr']['projects']['project_name']
        self.project_description = config['asr']['projects']['project_description']
        
        self.job_name = config['asr']['jobs']['job_name']
        self.job_task = config['asr']['jobs']['job_task']
        self.job_type = config['asr']['jobs']['job_type']
        self.job_description = config['asr']['jobs']['job_description']
        self.model_checkpoint = config['asr']['jobs']['model_checkpoint']
        
        self.output_path = config['asr']['output']['output_path']
        
        
    def _get_call(self, url, params = None, success_message = None):
        response = requests.get(url, params=params, headers=self.headers)

        if response.status_code == 200:
            logging.info('GET request successful!')
            logging.info(success_message)
            logging.debug(f'Response: {response.text}')
        else:
            logging.error(f'GET request failed with status code: {response.status_code}')
            raise Exception(f'Error message: {response.text}')
        return response

    def _post_call(self, url, params, success_message = None):
        response = requests.post(url, json=params, headers=self.headers)

        if response.status_code == 200:
            logging.info('POST request successful!')
            logging.info(success_message)
            logging.debug(f'Response: {response.text}')
        else:
            logging.error(f'POST request failed with status code: {response.status_code}')
            raise Exception(f'Error message: {response.text}')
        return response
    
    def _delete_call(self, url):
        response = requests.delete(url, headers=self.headers)    
        if response.status_code == 200:
            logging.info(f'Dataset {self.dataset_name} deleted successfully.')
            logging.debug(f'Response: {response.text}')
        else:
            logging.error(f'Failed to delete the resource. Status code: {response.status_code}')
            raise Exception(f'Error message: {response.text}')    
        return response

    def _time_to_seconds(self, time_str):
        minutes, seconds = map(int, time_str.split(':'))
        return  minutes * 60 + seconds

    def _get_df_output(self, response_content: str) -> DataFrame:
        compressed_bytes = io.BytesIO(response_content)
        
        with tarfile.open(fileobj=compressed_bytes, mode="r:gz") as tar:
            output_tar_member = tar.getmember(self.output_path)
            output_file = tar.extractfile(output_tar_member)
            output_df = pd.read_csv(io.BytesIO(output_file.read()), names = ['audio_path', 'results_path', 'speaker', 'start_time', 'sample_duration', 'unformatted_transcript', 'formatted_transcript'])
            output_df['start_time'] = output_df.apply(lambda x: self._time_to_seconds(x['start_time']), axis = 1)
            output_df['end_time'] = output_df.apply(lambda x: x['start_time'] + int(x['sample_duration'])/16000, axis = 1)
            output_df = output_df[['start_time', 'end_time', 'speaker', 'formatted_transcript']].rename(columns={'formatted_transcript': 'text'})
        
        return output_df

    def search_dataset(self):
        url = self.base_url + self.datasets_url + '/search'
        params = {
            'dataset_name': self.dataset_name
        }
        response = self._get_call(url, params, f'Dataset {self.dataset_name} found in SambaStudio')
        parsed_reponse = json.loads(response.text)
        return parsed_reponse['data']['dataset_id']

    def delete_dataset(self):
        dataset_id = self.search_dataset()
        url = self.base_url + self.datasets_url + '/' + dataset_id
        response = self._delete_call(url)
        logging.info(response.text)
        
    def create_dataset(self, path):
                
        # create pca directory and source.json file
        pca_directory = self.datasets_path + '/' + self.dataset_name
        
        if not os.path.isdir(self.datasets_path):
            os.mkdir(self.datasets_path) 
            
        if not os.path.isdir(pca_directory):
            logging.info(f'Datasets path: {pca_directory} wan\'t found')
            
            source_file_data = {
                "source_path": pca_directory
            }
            with open(self.dataset_source_file, 'w') as json_file:
                json.dump(source_file_data, json_file)
            os.mkdir(pca_directory)
            
            logging.info(f'PCA Directory: {pca_directory} created')
    
        # validate audio file
        audio_format = path.split('.')[-1]
        
        if audio_format == 'mp3':
            shutil.copyfile(path, pca_directory + '/pca_file.mp3')
        elif audio_format == 'wav':
            shutil.copyfile(path, pca_directory + '/pca_file.wav')
        else:
            raise Exception('Only mp3 and wav audio files supported')
        
        # create dataset
        command = f'echo yes | snapi dataset add \
            --dataset-name {self.dataset_name} \
            --job_type {self.job_type} \
            --apps {self.asr_with_diarization_app_id} \
            --source_type {self.dataset_source_type} \
            --source_file {self.dataset_source_file} \
            --application_field {self.application_field} \
            --language {self.dataset_language} \
            --description "{self.dataset_description}"'
        
        os.system(command)
        logging.info(f'Creating dataset: {self.dataset_name}')
                    
    def create_load_project(self):

        url = self.base_url + self.projects_url + '/' + self.project_name

        response = self._get_call(url, success_message=f'Project {self.project_name} found in SambaStudio')
        not_found_error_message = f"{self.project_name} not found"

        if not_found_error_message in response.text:
            
            logging.info(f'Project {self.project_name} wasn\'t found in SambaStudio')
            
            url = base_url + projects_url

            params = {
                'project_name': self.project_name,
                'description': self.project_description
            }

            response = self._post_call(url, params, success_message=f'Project {self.project_name} created!')

        parsed_reponse = json.loads(response.text)
        project_id = parsed_reponse['data']['project_id']
        return project_id
    
    def run_job(self, project_id):
        
        url = self.base_url + self.projects_url + self.jobs_url.format(project_id=project_id)
        
        params = {
            'task': self.job_task,
            'job_type': self.job_type,
            'job_name': self.job_name,
            'project': project_id,
            'model_checkpoint': self.model_checkpoint,
            'description': self.job_description,
            'dataset': self.dataset_name,
        }

        response = self._post_call(url, params, success_message='Job running')
        parsed_reponse = json.loads(response.text)
        job_id = parsed_reponse['data']['job_id']
        
        return job_id
    
    def check_job_progress(self, project_id, job_id):

        url = self.base_url + self.projects_url + self.jobs_url.format(project_id=project_id) + '/' + job_id

        status = PENDING_RDU_JOB_STATUS
        while status != SUCCESS_JOB_STATUS:
            response = self._get_call(url, success_message='Still waiting for job to finish')
            parsed_reponse = json.loads(response.text)   
            status = parsed_reponse['data']['status']
            logging.info(f'Job status: {status}')
            if status == SUCCESS_JOB_STATUS:
                logging.info(f'Job finished!')
                break
            time.sleep(10)
        
        return True
    
    def check_dataset_creation_progress(self):
        # TO-DO: check if dataset creation is async. Then, this method will be necessary to know when it finishes and continue with the pipeline.
        # Take 'check_job_progress' as reference
        url = self.base_url + self.datasets_url + '/' + self.dataset_name
        print(url)
        response = self._get_call(url)
        print(response.text)
        
        return True
    
    def retrieve_results(self, project_id, job_id):
        url = self.base_url + self.projects_url + self.jobs_url.format(project_id=project_id) + '/' + job_id + self.download_results_url

        response = self._get_call(url, success_message='Results downloaded!')
        df = self._get_df_output(response.content)
        return df

In [156]:
asr = BatchASRProcessor(config)

In [161]:
asr.search_dataset()

ERROR:root:GET request failed with status code: 500


Exception: Error message: {"detail":"Something went wrong"}

In [159]:
asr.delete_dataset()

INFO:root:GET request successful!
INFO:root:Dataset PCA_dataset found in SambaStudio
INFO:root:Dataset PCA_dataset deleted successfully.


{"detail":"The Dataset: f335c6f9-dfdb-4f41-a5b8-00b9e0784abe was successfully marked for deletion from the Dataset Hub."}


In [160]:
# test time and to know if dataset creation is async or not
start = time.time()
asr.create_dataset(path='../test_data/911_test.wav')
end = time.time()
asr.check_dataset_creation_progress()
end2 = time.time()
print(start)
print(end)
print(end2)


Folder Information:
  - Number of Files: 1
  - Total Size: 19.62 MB

Are you sure you want to proceed? ([33myes[0m/no)
: Uploading files


INFO:root:Creating dataset: PCA_dataset


Dataset folder upload complete: ../data/datasets/PCA_dataset
Dataset added successfully.
Time taken to upload the dataset: 3.572451114654541 seconds
https://sjc1-demo1.sambanova.net/api/datasets/PCA_dataset


ERROR:root:GET request failed with status code: 500


Exception: Error message: {"detail":"Failed to get dataset"}

In [196]:
project_id = asr.create_load_project()

INFO:root:GET request successful!
INFO:root:Project PostCallAnalysis_Project found in SambaStudio


In [262]:
job_id = asr.run_job(project_id)

INFO:root:POST request successful!
INFO:root:Job running


In [263]:
result = asr.check_job_progress(project_id, job_id) 

INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PENDING_RDU
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PENDING_RDU
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PENDING_RDU
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PREDICTING
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PREDICTING
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PREDICTING
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PREDICTING
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PREDICTING
INFO:root:GET request successful!
INFO:root:Still waiting for job to finish
INFO:root:Job status: PREDICTING
INFO:root:GET re

In [264]:
df = asr.retrieve_results(project_id, job_id)
df

INFO:root:GET request successful!
INFO:root:Results downloaded!


Unnamed: 0,start_time,end_time,speaker,text
0,0,2.5,SPEAKER_01,Our Primeti 33. What is yet as emergency?
1,3,11.9,SPEAKER_00,"Yes, sir, I need to, uh, uh. I need an ambulan..."
2,11,12.3,SPEAKER_01,A car.
3,13,14.7,SPEAKER_00,Carol Wood Drive. Yes.
4,14,14.6,SPEAKER_01,
5,15,16.6,SPEAKER_00,"Yeah, I."
6,17,21.4,SPEAKER_01,"Okay, sir. What's the phone number you calling..."
7,22,31.6,SPEAKER_00,"Uh, sir? Oh, I have a. We have a gentleman her..."
8,32,37.2,SPEAKER_01,"Okay, how does he. He's a 50 years old. Ser 50..."
9,38,39.5,SPEAKER_00,"Yes, he's not breathing, sir."
