In [1]:
from google.cloud import storage
from google.cloud.speech_v2.types import cloud_speech
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
from google.api_core.client_options import ClientOptions
from google.cloud import storage
from typing import List
import os
import sys
sys.path.append('..')
import creds 

storage_client = storage.Client()

BUCKET_NAME = creds.gcp_bucket_name
bucket = storage_client.bucket(BUCKET_NAME)

client = SpeechClient(
    client_options=ClientOptions(
        api_endpoint="us-central1-speech.googleapis.com",
    )
)
config = cloud_speech.RecognitionConfig(
    auto_decoding_config=cloud_speech.AutoDetectDecodingConfig(),
    language_codes=["pa-Guru-IN"],
    model="chirp_2",
    features=cloud_speech.RecognitionFeatures(
        enable_automatic_punctuation=True,
    ),
)

def get_all_transcript_files(id, main_dir_prefix = creds.jt):
    blobs = bucket.list_blobs(prefix=f'{main_dir_prefix}/transcriptions/{id}/')
    return blobs

def get_all_audio_split_files_uris(id, main_dir_prefix = creds.jt):
    blobs = bucket.list_blobs(prefix=f'{main_dir_prefix}/audio_wavs/{id}/')
    uris = []
    for blob in blobs:
        uri = f'gs://{BUCKET_NAME}/{blob.name}'
        uris.append(uri)
    return uris

def send_batch_requests(uris_batch, id, main_dir_prefix = creds.jt):
    files = [cloud_speech.BatchRecognizeFileMetadata(uri=uri) for uri in uris_batch]

    request = cloud_speech.BatchRecognizeRequest(
        recognizer=f"projects/{creds.gcp_project_id}/locations/us-central1/recognizers/_",
        config=config,
        files=files,
        recognition_output_config=cloud_speech.RecognitionOutputConfig(
            gcs_output_config=cloud_speech.GcsOutputConfig(
                uri=f"gs://{BUCKET_NAME}/{main_dir_prefix}/transcriptions/{id}",
            ),
        ),
        # processing_strategy=cloud_speech.BatchRecognizeRequest.ProcessingStrategy.DYNAMIC_BATCHING,
    )

    operation = client.batch_recognize(request=request)
    response = operation.result()
    return response

def get_all_ids():
    d = '/mnt/sea/jt_wavs'
    files = os.listdir(d)
    files = [f for f in files if f.endswith('.wav')]
    ids = [f.split(' - ')[1] for f in files]
    return ids

def transcribe_all():
    ids = get_all_ids()

    for id in ids:
        print(f'Processing id: {id}')
        uris = get_all_audio_split_files_uris(id, creds.jt)
        transcript_blobs = get_all_transcript_files(id, creds.jt)
        transcript_uris = []
        for blob in transcript_blobs:
            uri = f'gs://{BUCKET_NAME}/{blob.name}'
            transcript_uris.append(uri)
        uris = remove_uris_where_transcript_already_exist(uris, transcript_uris)
        uris_batches = [uris[i:i + 15] for i in range(0, len(uris), 15)]
        total_batches = len(uris_batches)
        with tqdm(total=total_batches, desc="Processing batches") as pbar:
            with ThreadPoolExecutor(max_workers=50) as executor:
                futures = [executor.submit(send_batch_requests, uris_batch, id) for uris_batch in uris_batches]
                for future in as_completed(futures):
                    pbar.update(1)
                    time.sleep(1)

def transcribe_specific_uris(start_sr_list, id):
    uris = get_all_audio_split_files_uris(id, creds.jt)
    uris_start_sr = [uri.split('__')[2].split('-')[0] for uri in uris]
    selected_uris = []
    for start_sr in start_sr_list:
        selected_uris.append(uris[uris_start_sr.index(start_sr)])

    uris_batches = [selected_uris[i:i + 15] for i in range(0, len(selected_uris), 15)]
    total_batches = len(uris_batches)
    with tqdm(total=total_batches, desc="Processing batches") as pbar:
        with ThreadPoolExecutor(max_workers=50) as executor:
            futures = [executor.submit(send_batch_requests, uris_batch, 'test1', 'testing') for uris_batch in uris_batches]
            for future in as_completed(futures):
                pbar.update(1)

def transcribe_uri(uri, id):
    files = [cloud_speech.BatchRecognizeFileMetadata(uri=uri)]

    request = cloud_speech.BatchRecognizeRequest(
        recognizer=f"projects/{creds.gcp_project_id}/locations/us-central1/recognizers/_",
        config=config,
        files=files,
        recognition_output_config=cloud_speech.RecognitionOutputConfig(
            gcs_output_config=cloud_speech.GcsOutputConfig(
                uri=f"gs://{BUCKET_NAME}/manual/transcriptions/{id}",
            ),
        ),
        processing_strategy=cloud_speech.BatchRecognizeRequest.ProcessingStrategy.DYNAMIC_BATCHING,
    )

    operation = client.batch_recognize(request=request)
    response = operation.result(timeout=1200)
    return response

def save_transcript(save_dir, id, main_dir):
    blobs = get_all_transcript_files(id, main_dir)
    results = []
    for blob in tqdm(blobs):
        result_bytes = bucket.blob(blob.name).download_as_bytes()
        result = cloud_speech.BatchRecognizeResults.from_json(result_bytes, ignore_unknown_fields=True)
        transcript = result.results[0].alternatives[0].transcript
        metadata = blob.name.split('__')
        id = metadata[0].split('/')[-1]
        chunk_index = metadata[1]
        timedata = metadata[2].split('_')[0]
        start_sr = timedata.split('-')[0]
        end_sr = timedata.split('-')[1]
        sample_rate = metadata[3].split('_')[0]
        results.append((id, chunk_index, start_sr, end_sr, sample_rate, transcript))

    # cast chunk_index, start_sr, end_sr, sample_rate to int
    results = [(id, int(chunk_index), int(start_sr), int(end_sr), int(sample_rate), transcript) for id, chunk_index, start_sr, end_sr, sample_rate, transcript in results]
    print(f'Len of transcripts of {id}: {len(results)}')
    df = pd.DataFrame(results, columns=['id', 'chunk_index', 'start_sr', 'end_sr', 'sample_rate', 'transcript'])
    df['chunk_index'] = df['chunk_index'].astype(int)
    df['start_sr'] = df['start_sr'].astype(int)
    df['end_sr'] = df['end_sr'].astype(int)
    df['sample_rate'] = df['sample_rate'].astype(int)
    df = df.sort_values(by=['chunk_index'])
    df.to_csv(f'{save_dir}/{id}.csv', index=False)

def save_all_transcripts():
    ids = get_all_ids()
    ids = ['21m4IDjf-dc']
    for id in ids:
        save_transcript('/mnt/sea/jt_transcripts', id, creds.jt)

def remove_uris_where_transcript_already_exist(audio_uris, transcript_uris):
    # Extract the base identifier for each audio file (assuming the identifier is the file name without extension)
    audio_ids = {uri.split('/')[-1].split('.')[0] for uri in audio_uris}
    transcript_ids = ['_'.join(uri.split('_')[:-2]).split('/')[-1] for uri in transcript_uris]

    result = []
    for audio_id, audio_uri in zip(audio_ids, audio_uris):
        if audio_id not in transcript_ids:
            result.append(audio_uri)

    if len(result) == 0:
        print("All audio files have been transcribed.")
    elif len(result) != len(audio_uris):
        print(f"Removed {len(audio_uris) - len(result)} audio files where transcript already exists.")

    
    return result




In [None]:
import concurrent.futures
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
from tqdm import tqdm
import os
import pandas as pd

d = '/mnt/sea/yt_splits/singh_brar'
td = '/mnt/sea/yt_transcripts/singh_brar'

if not os.path.exists(td):
    os.makedirs(td)

def transcribe_gcs_v2(
    audio_file: str,
) -> cloud_speech.RecognizeResponse:
    
    with open(audio_file, "rb") as f:
        content = f.read()

    request = cloud_speech.RecognizeRequest(
        recognizer=f"projects/{creds.gcp_project_id}/locations/us-central1/recognizers/_",
        config=config,
        content=content,
    )

    response = client.recognize(request=request)
    return response

def process_audio_file(audio_file):
    return transcribe_gcs_v2(audio_file)


def process_audio_files(audio_files, id):
    transcripts = []
    results = []

    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
            # Submit tasks for each URI
            future_to_uri = {executor.submit(process_audio_file, audio_file): audio_file for audio_file in audio_files}
            with tqdm(total=len(audio_files)) as pbar:

                for future in concurrent.futures.as_completed(future_to_uri):
                    uri = future_to_uri[future]
                    # Retrieve the result of each task
                    response = future.result()
                    transcript = response.results[0].alternatives[0].transcript
                    transcripts.append(transcript)

                    name = uri.split('/')[-1]
                    metadata = name.split('__')
                    if len(name.split('__')) == 5:
                        id = metadata[0].split('/')[-1] + '__' + metadata[1]
                    elif len(name.split('__')) == 4:
                        id = metadata[0].split('/')[-1]
                    else:
                        raise Exception('Invalid name - multiple __ in it')
                    chunk_index = metadata[-3]
                    timedata = metadata[-2].split('_')[0]
                    start_sr = timedata.split('-')[0]
                    end_sr = timedata.split('-')[1]
                    sample_rate = metadata[-1].split('_')[0].split('.')[0]
                    results.append((id, chunk_index, start_sr, end_sr, sample_rate, transcript))   
                    
                    pbar.update(1) 

        print(len(transcripts))

        df = pd.DataFrame(results, columns=['id', 'chunk_index', 'start_sr', 'end_sr', 'sample_rate', 'transcript'])
        df['chunk_index'] = df['chunk_index'].astype(int)
        df['start_sr'] = df['start_sr'].astype(int)
        df['end_sr'] = df['end_sr'].astype(int)
        df['sample_rate'] = df['sample_rate'].astype(int)
        df = df.sort_values(by=['chunk_index'])
        df.to_csv(f'{td}/{id}.csv', index=False)
    except Exception as e:
        print(f'Error occured for id: {id}')
        print('Exception occured: ', e)

dirs = os.listdir(d)
t_files = os.listdir(td)

count = 0
print(f'Processing {len(dirs)} directories')
for dir in dirs:
    count += 1
    if f'{dir}.csv' in t_files:
        continue
    print(f'Processing {dir} ... {count}')
    files = os.listdir(f'{d}/{dir}')
    files = [f'{d}/{dir}/{f}' for f in files]
    process_audio_files(files, dir)

In [None]:
# JT DATA SAVE

from datasets import Dataset, DatasetDict, Audio
import os
import pandas as pd

jt_transcripts = '/mnt/sea/jt_transcripts'
jt_splits = '/mnt/sea/jt_splits'

transcript_files = os.listdir(jt_transcripts)
dirs = [x[:-4] for x in transcript_files]

rows = []
df = pd.DataFrame()
for d in dirs:
    df_transcript = pd.read_csv(os.path.join(jt_transcripts, d + '.csv'))
    df_transcript['audio'] = [f'{jt_splits}/{d}/{d}__{str(i)}__{str(row["start_sr"])}-{str(row["end_sr"])}__{str(row["sample_rate"])}.wav' for i, row in df_transcript.iterrows()]
    df_transcript = df_transcript.rename(columns={'transcript': 'text'})
    df_transcript['duration'] = (df_transcript['end_sr'] - df_transcript['start_sr']) / df_transcript['sample_rate']
    df = pd.concat([df, df_transcript])

print(df.shape)
df.head()

In [None]:
sum(df['duration']) / 3600, 13000/(150 * 60)

(149.81596521049855, 1.4444444444444444)

In [None]:
from IPython.display import Audio as show_audio
show_audio(df.iloc[0]['audio'], autoplay=True, rate=48000)

In [None]:
ds = Dataset.from_pandas(df.reset_index(drop=True))
ds = ds.cast_column('audio', Audio(sampling_rate = 16000))
print(ds)
ds = DatasetDict({'train': ds})
ds.save_to_disk('/mnt/sea/speech/punjabi_asr_datasets/jt_dataset')
ds.push_to_hub('codingninja/jt_dataset', token=creds.hf_c_token)

In [None]:
# YT DATASET SAVE

from datasets import Dataset, DatasetDict, Audio
import os
import pandas as pd

transcripts = '/mnt/sea/yt_transcripts'
splits = '/mnt/sea/yt_splits'

source_dirs = os.listdir(transcripts)

rows = []
df = pd.DataFrame()

for source_dir in source_dirs:
    source_path = os.path.join(transcripts, source_dir)
    transcript_files = os.listdir(source_path)

    for transcript_file in transcript_files:
        basename = transcript_file[:-4]
        df_transcript = pd.read_csv(os.path.join(source_path, transcript_file))
        df_transcript['audio'] = [
            f'{splits}/{source_dir}/{basename}/{basename}__{str(i)}__{str(row["start_sr"])}-{str(row["end_sr"])}__{str(row["sample_rate"])}.wav'
            for i, row in df_transcript.iterrows()
        ]
        df_transcript = df_transcript.rename(columns={'transcript': 'text'})
        df_transcript['duration'] = (df_transcript['end_sr'] - df_transcript['start_sr']) / df_transcript['sample_rate']
        df_transcript['source'] = source_dir  
        df = pd.concat([df, df_transcript])

print(df.shape)
df.head()


In [None]:
sum(df['duration']) / 3600


In [None]:
from IPython.display import Audio as show_audio
show_audio(df.iloc[0]['audio'], autoplay=True, rate=48000)

In [None]:
df['sample_rate'].value_counts()

In [None]:
ds = Dataset.from_pandas(df.reset_index(drop=True))
ds = ds.cast_column('audio', Audio(sampling_rate = 48000))
print(ds)
ds = DatasetDict({'train': ds})
ds.save_to_disk('/mnt/pi/datasets/speech/yt_dataset')
ds.push_to_hub(creds.yt_ds_hub, token=creds.hf_c_token)

In [None]:
# transcribe_all()

# save_all_transcripts()


# start_sr_test_list = ['409', '1513310', '2055563', '6532832', '39980477', '40872491', '44207098', '44546211', '46477679', '53141979', '54744982', '55656655', '65524505']

# transcribe_specific_uris(start_sr_test_list, '21m4IDjf-dc')
# save_transcript('/mnt/sea/jt_transcripts', 'test1', 'testing')
