# Voice AI Pipeline using Vertex AI

This notebook transforms the original voice analysis script into a Vertex AI Pipeline.

## Import Libraries

In [None]:
from kfp import dsl
from kfp.v2 import compiler
from google.cloud import aiplatform
from typing import List, Dict
import boto3
from botocore.exceptions import ClientError
import json, os, ast, re
from datetime import datetime
import pandas as pd
import numpy as np
from scipy.special import softmax
import scrubadub, scrubadub_spacy
import snowflake.connector as sc
from snowflake.connector.pandas_tools import write_pandas
import vertexai
import vertexai.preview.generative_models as generative_models
from vertexai.generative_models import GenerativeModel, Part

## Pipeline Components

### Component: List and download Files from S3

In [None]:
@dsl.component(
    base_image="python:3.9",
    packages_to_install=["boto3", "pandas"]
)
def list_transcripts(aws_access_key: str, 
                     aws_secret_key: str, 
                     source_bucket: str, 
                     transcripts_location: str, 
                     max_objects: int) -> List[List[str]]:
    """Lists available transcripts from S3 bucket."""
    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key,
        aws_secret_access_key=aws_secret_key
    )

    response = s3_client.list_objects_v2(Bucket=source_bucket, Prefix=transcripts_location)
    
    list_transcripts = []
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('.json'):
            list_transcripts.append([obj['Key'], str(obj['LastModified'])])
            if len(list_transcripts) >= max_objects:
                break

    return list_transcripts

### Component: Process Transcripts

In [None]:
@dsl.component(
    base_image="python:3.9",
    packages_to_install=["boto3", "pandas", "numpy", "scipy", "transformers", "torch"]
)
def create_intra_call_analysis(aws_access_key: str,
                              aws_secret_key: str,
                              source_bucket: str,
                              file_key: str,
                              contact_id: str) -> Dict:
    """Creates intra-call analysis data."""
    # Read transcript
    s3_client = boto3.client('s3', 
                            aws_access_key_id=aws_access_key,
                            aws_secret_access_key=aws_secret_key)
    
    response = s3_client.get_object(Bucket=source_bucket, Key=file_key)
    transcript_data = json.loads(response['Body'].read().decode('utf-8'))
    
    # Process transcript (implementation from original notebook)
    df_intra = process_transcript(transcript_data, contact_id)
    df_sentiment = get_sentiment_scores(df_intra.caption.to_list())
    df_intra = pd.concat([df_intra, df_sentiment], axis=1)
    df_intra = get_different_times(df_intra)
    
    return df_intra.to_dict()

@dsl.component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-aiplatform", "vertexai", "scrubadub", "scrubadub-spacy"]
)
def create_inter_call_analysis(project_id: str,
                              location: str,
                              intra_call_data: Dict,
                              transcript_data: Dict,
                              last_modified_date: str) -> Dict:
    """Creates inter-call analysis data."""
    df_intra_call = pd.DataFrame.from_dict(intra_call_data)
    
    # Redact PII Data
    scrubber = scrubadub.Scrubber()
    scrubber.add_detector(scrubadub_spacy.detectors.SpacyEntityDetector)
    df_intra_call.caption = df_intra_call.caption.apply(scrubber.clean)
    
    # Extract KPIs using Vertex AI
    extractor = KPIExtractor(project_id, location)
    transcript = " ".join(df_intra_call.caption)
    call_gen_kpis = extractor.extract_genai_kpis(transcript)
    
    # Create inter-call dictionary (implementation from original notebook)
    inter_call_dict = create_inter_call_dict(df_intra_call, call_gen_kpis, transcript_data, last_modified_date)
    
    return inter_call_dict

### Component: Write Data to Snowflake

In [None]:
@dsl.component(
    base_image="python:3.9",
    packages_to_install=["snowflake-connector-python", "pandas"]
)
def save_to_snowflake(conn_params: Dict,
                      intra_call_data: Dict,
                      inter_call_data: Dict):
    """Saves processed data to Snowflake."""
    conn = sc.connect(**conn_params)
    
    # Convert dictionaries back to dataframes
    df_intra = pd.DataFrame.from_dict(intra_call_data)
    df_inter = pd.DataFrame.from_dict(inter_call_data)
    
    # Save to Snowflake
    write_pandas(conn, df_inter, 'SRC_GCP_INTER_CALLS')
    write_pandas(conn, df_intra, 'SRC_GCP_INTRA_CALLS')
    
    conn.close()

## Define Pipeline

In [None]:
@dsl.pipeline(
    name='voice-ai-pipeline',
    description='Pipeline for processing voice transcripts'
)
def voice_ai_pipeline(
    project_id: str,
    location: str,
    aws_access_key: str,
    aws_secret_key: str,
    source_bucket: str,
    transcripts_location: str,
    max_objects: int,
    snowflake_conn_params: Dict
):
    # List available transcripts
    list_task = list_transcripts(
        aws_access_key=aws_access_key,
        aws_secret_key=aws_secret_key,
        source_bucket=source_bucket,
        transcripts_location=transcripts_location,
        max_objects=max_objects
    )
    
    # Process each transcript
    with dsl.ParallelFor(items=list_task.output) as transcript:
        # Extract contact_id and last_modified_date
        contact_id = dsl.RawArg(f"{transcript[0].split('/')[-1].split('.')[0].split('analysis')[0].strip('_')}")
        last_modified = transcript[1]
        
        # Create intra-call analysis
        intra_task = create_intra_call_analysis(
            aws_access_key=aws_access_key,
            aws_secret_key=aws_secret_key,
            source_bucket=source_bucket,
            file_key=transcript[0],
            contact_id=contact_id
        )
        
        # Create inter-call analysis
        inter_task = create_inter_call_analysis(
            project_id=project_id,
            location=location,
            intra_call_data=intra_task.output,
            transcript_data=intra_task.outputs['transcript_data'],
            last_modified_date=last_modified
        )
        
        # Save results to Snowflake
        save_to_snowflake(
            conn_params=snowflake_conn_params,
            intra_call_data=intra_task.output,
            inter_call_data=inter_task.output
        )

## Compile and Run Pipeline

In [None]:
# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=voice_ai_pipeline,
    package_path='voice_ai_pipeline.json'
)

# Initialize Vertex AI
aiplatform.init(project=project_id, location=location)

# Create pipeline job
job = aiplatform.PipelineJob(
    display_name='voice-ai-pipeline-job',
    template_path='voice_ai_pipeline.json',
    parameter_values={
        'project_id': project_id,
        'location': location,
        'aws_access_key': aws_access_key,
        'aws_secret_key': aws_secret_key,
        'source_bucket': source_bucket,
        'transcripts_location': transcripts_location,
        'max_objects': max_objects,
        'snowflake_conn_params': conn_params
    }
)

# Run the pipeline
job.run()