In [None]:
!pip install --quiet --root-user-action=ignore faker swifter coqui-tts pydub

# Imports

In [None]:
# Import python packages
import warnings
import json
warnings.filterwarnings("ignore")
from snowflake.snowpark.context import get_active_session
from data_generation.data_generator import DataGenerator
from data_generation.text_to_speech import TextToSpeech

session = get_active_session()

# Data Generation

The classes `DataGenerator` and `TextToSpeech` generates all data required for this demo.

In [None]:
# Structured Data
data_generator = DataGenerator(session)
data_generator.load_configuration()
data_generator.generate_data(start_date='2024-01-01', end_date='2025-07-31')

In [None]:
# Unstructured Data 
tts_generator = TextToSpeech(
    model='tts_models/multilingual/multi-dataset/xtts_v2', 
    voices='data_generation/04_audio/configuration/voices.json'
)

# load conversations from json
recordings = json.load(open('data_generation/04_audio/configuration/call_center_recordings.json'))
output_folder = '/call_center_recordings'

# Create audio files from conversations
tts_generator.dict_to_speech_optimized(recordings, output_folder)

# Upload audio files to Snowflake stage
session.file.put(local_file_name=f'{output_folder}/*', stage_location='@AUDIO/call_center_recordings', auto_compress=False)
session.sql('ALTER STAGE AUDIO REFRESH').collect()

##  Structured Data
The data model of this demo consists of multiple tables and views listed here.

In [None]:
data_generator.dim_dates.show(n=5)
data_generator.dim_suppliers.show()
data_generator.dim_product_hierarchy.show(n=5)
data_generator.dim_products.show(n=5)
data_generator.dim_platforms.show(n=5)
data_generator.dim_customers.show(n=5)
data_generator.dim_dates.show(n=5)
data_generator.fact_transactions.show(n=5)
data_generator.fact_supplier_deliveries.show(n=5)
data_generator.fact_daily_stock_levels.show(n=5)
data_generator.customer_reviews.show(n=5)

## Unstructured Data

This demo provides multiple unstructured datasources listed here.

In [None]:
SELECT * FROM DIRECTORY(@DOCUMENTS) limit 10;

In [None]:
SELECT * FROM DIRECTORY(@AUDIO) limit 10;

# START CALL CENTER GENERATION

In [None]:
import json
import os
import tempfile
from pathlib import Path

import numpy as np
import torch
from pydub import AudioSegment
from TTS.api import TTS

class TextToSpeech:
    def __init__(self, model, voices):
        os.environ["COQUI_TOS_AGREED"] = "1"
        device = "cuda" if torch.cuda.is_available() else "cpu"
        self.tts_model = TTS(model, progress_bar=True).to(device)
        self.voices = json.load(open(voices))

    def tts_to_audiosegment(self, text, speaker):
        """Convert TTS directly to AudioSegment without intermediate conversions"""
        sample_rate = self.tts_model.synthesizer.output_sample_rate
        wav_data = self.tts_model.tts(text=text, speaker=speaker, language='en')
        
        # Convert to int16 for AudioSegment
        wav_data = np.asarray(wav_data, dtype=np.float32)
        wav_data_int16 = (wav_data * 32767).astype(np.int16)
        
        # Create AudioSegment directly from numpy array
        audio_segment = AudioSegment(
            wav_data_int16.tobytes(),
            frame_rate=sample_rate,
            sample_width=2,  # 2 bytes for int16
            channels=1
        )
        return audio_segment

    def dict_to_speech_optimized(self, voices, recordings, output_folder):
        total_recordings = len(recordings['recordings'])
        Path(output_folder).mkdir(parents=True, exist_ok=True)
    
        used_speakers = {}
        for recording_id, recording in enumerate(recordings['recordings']):
            print(f"[Unstructured Data] Generating recording {recording_id+1}/{total_recordings}...", end='\r', flush=True)
            
            if np.random.random() > 0.5:
                # customer is male, agent is female
                customer_voice = np.random.choice(voices['male_voices'])
                agent_voice = np.random.choice(voices['female_voices'])
            else:
                # customer is female, agent is male
                customer_voice = np.random.choice(voices['female_voices'])
                agent_voice = np.random.choice(voices['male_voices'])
                
            speaker_mapping = {
                'customer': customer_voice,
                'agent': agent_voice
            }
            
            # Generate all audio segments
            audio_segments = []
            used_speakers[f'{recording_id:05d}'] = []
            
            for segment in recording['segments']:
                text = segment['text']
                speaker = speaker_mapping[segment['speaker']]
                used_speakers[f'{recording_id:05d}'].append({"text": text, "speaker": speaker})
                
                # Direct conversion to AudioSegment
                audio_segment = self.tts_to_audiosegment(text, speaker)
                audio_segments.append(audio_segment)
            
            # Combine all segments at once
            combined = sum(audio_segments, AudioSegment.empty())
            combined.export(f"{output_folder}/call_center_recording_{recording_id:05d}.wav", format='wav')
        
        return used_speakers

tts_generator = TextToSpeech(
    model='tts_models/multilingual/multi-dataset/xtts_v2', 
    voices='data_generation/04_audio/configuration/voices.json'
)

# load recordings
recordings = json.load(open('data_generation/04_audio/configuration/call_center_recordings.json'))
output_folder = '/call_center_recordings15'

used_speakers = tts_generator.dict_to_speech_optimized(voices, recordings, output_folder)

In [None]:
CREATE OR REPLACE STAGE AUDIO2
	DIRECTORY = ( ENABLE = true ) 
	ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE' );

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
session.file.put(local_file_name='/call_center_recordings15/*', stage_location='@AUDIO2/call_center_recordings', auto_compress=False)
session.sql('ALTER STAGE AUDIO2 REFRESH').collect()

In [None]:
CREATE OR REPLACE TABLE TMP_SPEAKER_RECOGNITION AS 
SELECT
  RELATIVE_PATH,
  AI_TRANSCRIBE(TO_FILE('@AUDIO2', RELATIVE_PATH), {'timestamp_granularity': 'speaker'}) T_OUTPUT
FROM 
  DIRECTORY('@AUDIO2')
WHERE
  startswith(RELATIVE_PATH, 'call_center_recordings/');

SELECT * FROM TMP_SPEAKER_RECOGNITION;

In [None]:
WITH segments AS (
    SELECT
      s.RELATIVE_PATH,
      s.T_OUTPUT['audio_duration']::FLOAT AUDIO_DURATION,
      fv.value['start']::FLOAT TRANSCRIBE_START,
      fv.value['end']::FLOAT TRANSCRIBE_END,
      fv.value['speaker_label']::TEXT SPEAKER_ID,
      fv.value['text']::TEXT TRANSCRIPTION
    FROM TMP_SPEAKER_RECOGNITION s,
      lateral flatten(T_OUTPUT['segments']) fv
    ORDER BY
      s.RELATIVE_PATH,
      TRANSCRIBE_START
),
ordered_transcriptions AS (
  SELECT 
    RELATIVE_PATH,
    TRANSCRIBE_START,
    SPEAKER_ID,
    TRANSCRIPTION,
    LAG(SPEAKER_ID) OVER (PARTITION BY RELATIVE_PATH ORDER BY TRANSCRIBE_START) AS prev_speaker,
    ROW_NUMBER() OVER (PARTITION BY RELATIVE_PATH ORDER BY TRANSCRIBE_START) AS rn
  FROM segments
),

speaker_groups AS (
  SELECT 
    RELATIVE_PATH,
    TRANSCRIBE_START,
    SPEAKER_ID,
    TRANSCRIPTION,
    SUM(CASE WHEN SPEAKER_ID != prev_speaker OR prev_speaker IS NULL THEN 1 ELSE 0 END) 
      OVER (PARTITION BY RELATIVE_PATH ORDER BY TRANSCRIBE_START ROWS UNBOUNDED PRECEDING) AS speaker_group
  FROM ordered_transcriptions
),

grouped_transcriptions AS (
  SELECT 
    RELATIVE_PATH,
    MIN(TRANSCRIBE_START) AS group_start,
    SPEAKER_ID,
    speaker_group,
    LISTAGG(TRANSCRIPTION, ' ') WITHIN GROUP (ORDER BY TRANSCRIBE_START) AS combined_transcription
  FROM speaker_groups
  GROUP BY RELATIVE_PATH, SPEAKER_ID, speaker_group
)

SELECT 
  RELATIVE_PATH,
  LISTAGG(SPEAKER_ID || '\n' || combined_transcription, '\n---\n') 
    WITHIN GROUP (ORDER BY group_start) AS TRANSCRIPTION_DIARIZED
FROM grouped_transcriptions
GROUP BY RELATIVE_PATH;