### Start Up

In [96]:
import base64
import logging
import os
import pytz
import re
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents import IndexDocumentsBatch
from datetime import datetime
from dotenv import load_dotenv
from openai import AzureOpenAI

In [97]:
load_dotenv()

# Remove Jupyter's own logging handlers
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("transcript_indexing.log", mode='w'),
    ]
)

In [98]:
service_endpoint  = os.environ["AZURE_AI_SEARCH_SERVICE_ENDPOINT"]
key = os.environ["AZURE_AI_SEARCH_KEY"]
index_name = "seeking_alpha"

search_client = SearchClient(service_endpoint, index_name, AzureKeyCredential(key))

embed_client = AzureOpenAI(
  azure_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"],
  api_key = os.environ["AZURE_OPENAI_API_KEY"],
  api_version = os.environ["API_VERSION"],
)

### Index Helper Functions

In [99]:
def extract_speakers(text):
    # Match either "Company Participants" or "Executives", and either "Conference Call Participants" or "Analysts"
    pattern = r'(?:Company Participants|Executives|Corporate Participants)(.*?)(?:Conference Call Participants|Analysts)\n(.*?)\n'

    relevant_chunk_match = re.search(pattern, text, re.DOTALL)

    if relevant_chunk_match:
        company_participants = relevant_chunk_match.group(1).strip()
        conference_call_participants = relevant_chunk_match.group(2).strip()

        company_name_title_pairs = company_participants.split('  ')
        conference_name_title_pairs = conference_call_participants.split('  ')

        names = []
        position_dict = {}
        
        for pair in company_name_title_pairs:
            name = pair.split(" - ")[0].strip()
            names.append(name)
            position_dict[name] = 'Insider'

        for pair in conference_name_title_pairs:
            name = pair.split(" - ")[0].strip()
            names.append(name)
            position_dict[name] = 'Outsider'
    else:
        raise ValueError("Failed to extract speakers")

    return names, position_dict

In [100]:
def split_content_by_speakers(content, speakers):
    # Create a regex pattern to match any of the speaker names    
    pattern = '|'.join([fr'{re.escape(speaker)}(?:\s*[-–]\s*.+)?' for speaker in speakers])

    
    # Split the content by the speaker names only if the line contains a single speaker name
    chunks = re.split(f'(?<=\\n)({pattern})(?=\\n)', content)
    
    # Combine the speaker names with their respective text chunks
    result = []
    for i in range(1, len(chunks), 2):
        speaker = chunks[i].split(" - ")[0].strip()
        text_chunk = chunks[i + 1].strip()
        if not text_chunk or text_chunk.strip() == '':
            continue
        if speaker != 'Operator':
            result.append((speaker, text_chunk))

    return result

In [101]:
def extract_date(line):
    date_pattern = r'(\w+ \d{1,2}, \d{4}) (\d{1,2}:\d{2} (?:AM|PM))'
    match = re.search(date_pattern, line)
    
    if match:
        date_str = match.group(1)
        time_str = match.group(2)
        datetime_str = f"{date_str} {time_str}"

        date_of_transcript = datetime.strptime(datetime_str, '%B %d, %Y %I:%M %p')

        et_timezone = pytz.timezone('US/Eastern')
        date_of_transcript = et_timezone.localize(date_of_transcript)

        sg_timezone = pytz.timezone('Asia/Singapore')
        date_of_transcript = date_of_transcript.astimezone(sg_timezone)

        return date_of_transcript.strftime('%B %d, %Y %I:%M %p %Z')
    else:
        logging.error("No date of transcript is extracted")
        return None

In [102]:
def extract_prefix(line):
    date_pattern = r'(\w+ \d{1,2}, \d{4}) (\d{1,2}:\d{2} [APM]{2})'
    date_match = re.search(date_pattern, line)

    if date_match:
        date_str = date_match.group(1)
        date_format = "%B %d, %Y"
        date_obj = datetime.strptime(date_str, date_format)
        year = date_obj.year
        month = date_obj.month
        date_prefix = f"{year}{month:02d}"
    else:
        logging.error("No date found")

    if "Earning" in line or "Earnings" in line:
        quarter_pattern = r'Q(\d) (\d{4})'
        quarter_match = re.search(quarter_pattern, line)

        if quarter_match:
            quarter = quarter_match.group(1)
            year = quarter_match.group(2)
            quarter_prefix = f"{year}Q{quarter}"
        else:
            logging.error("No fiscal quarter found")
            return None
        
        return f"{date_prefix} - {quarter_prefix} - "
    
    return f"{date_prefix} - "

In [103]:
def extract_ticker(file_name):
    # Define a regex pattern to match the ticker inside parentheses
    ticker_pattern = r'\((.*?)\)'
    match = re.search(ticker_pattern, file_name)
    
    if match:
        ticker = match.group(1)
        return ticker
    else:
        logging.error("No stock ticker is extracted")
        return None

In [104]:
def extract_quarter(file_name):
    pattern = r'Q\d 2\d{3}'
    match = re.search(pattern, file_name)
    if match:
        return match.group()
    else:
        logging.error("No fiscal quarter is extracted")
        return None

In [105]:
def generate_embeddings(text, model="text-embedding-3-large"): 
    return embed_client.embeddings.create(input = [text], model=model).data[0].embedding

In [106]:
def generate_primary_key(ticker, date, chunk_number):
    timestamp = datetime.now(pytz.UTC).strftime('%Y%m%d%H%M%S%f')
    
    raw_id = f"{ticker}_{date}_{chunk_number}_{timestamp}"
    
    # Sanitize ID: replace non-alphanumeric characters with underscores
    sanitized_id = re.sub(r'[^a-zA-Z0-9_-]', '_', raw_id)
    
    # Truncate to ensure ID is within Azure Search limits (512 bytes)
    return sanitized_id[:512]

In [114]:
def preprocess_transcript(file_path):
    logging.info(f"Indexing: {file_path}")

    file_name = os.path.basename(file_path)

    try:
        with open(file_path, 'r') as file:
            file_content = file.read()

        speakers, position_dict = extract_speakers(file_content)
        speakers.append('Operator')

        chunks = split_content_by_speakers(file_content, speakers)

        date_of_transcript = extract_date(file_content)

        stock_ticker = extract_ticker(file_name)

        transcript_type = 'Earnings call' if 'Earnings' in file_name else 'Others'

        if transcript_type == 'Earnings call':
            fiscal_quarter = extract_quarter(file_name)

        processed_chunks = []
        for i, (speaker, text_chunk) in enumerate(chunks, start=1):
            lines = text_chunk.split('\n')
            content = ' '.join(line.strip() for line in lines)
            position = position_dict[speaker]
            chunk_number =  f"{i}"
            chunk_id = generate_primary_key(stock_ticker, date_of_transcript, chunk_number)
            processed_chunks.append({
                'id': chunk_id,
                'content': content,
                'speaker': speaker,
                'speaker_background': position,
                'date_of_transcript': date_of_transcript,
                'fiscal_quarter': fiscal_quarter,            
                'transcript_name': f"{extract_prefix(file_content)}{file_name}",            
                'transcript_type': transcript_type,
                'stock_ticker': stock_ticker,
                'chunk_number': chunk_number,
                'text_vector': generate_embeddings(content)
            })

        return processed_chunks
    
    except ValueError as e:
        # Log error and flag for manual review
        error_msg = f"[{datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S %Z')}] File {file_name}: {str(e)}"
        logging.error(error_msg)
        
        # Append to manual review log
        with open('manual_review.log', 'a') as f:
            f.write(f"{error_msg}\n")
        
        return []  # Return empty list to skip the file


In [108]:
# Create a batch to upload documents
def upload_db(processed_transcript):
    batch = IndexDocumentsBatch()
    batch.add_upload_actions(processed_transcript)

    # Upload the batch to Azure Search
    search_client.index_documents(batch)

### Index

In [115]:
spx_tickers = ['AAPL', 'MSFT', 'NVDA', 'GOOGL', 'AMZN', 'GOOG', 'META', 'BRK/B', 'LLY', 'AVGO', 'TSLA', 'JPM', 'V', 'UNH', 'XOM', 'PG', 'MA', 'COST', 'JNJ', 'HD', 'ABBV', 'WMT', 'NFLX', 'MRK', 'BAC', 'ORCL', 'AMD', 'KO', 'CVX', 'CRM', 'PEP', 'TMO', 'ADBE', 'LIN', 'MCD', 'ACN', 'IBM', 'GE', 'ABT', 'NOW', 'WFC', 'BX', 'CSCO', 'QCOM', 'PM', 'TXN', 'AMGN', 'DHR', 'INTU', 'CAT', 'ISRG', 'VZ', 'DIS', 'NEE', 'SPGI']
tickers = ['AAPL', 'MSFT']
valid_extensions = {'.txt', '.pdf'}

for ticker in tickers:
    base_folder = f"..\Data Scraping\{ticker}"
    
    for subfolder in ['Earnings Call Transcripts', 'Other Transcripts']:
        folder_path = os.path.join(base_folder, subfolder)

        for filename in sorted(os.listdir(folder_path)):
            file_path = os.path.join(folder_path, filename)
            _, ext = os.path.splitext(filename)
            if os.path.isfile(file_path) and ext.lower() in valid_extensions:
                print(filename)
                processed_transcript = preprocess_transcript(file_path)
                if processed_transcript:
                    upload_db(processed_transcript)


Apple (AAPL) Q2 2018 Results - Earnings Call Transcript.txt
Apple (AAPL) Q2 2023 Earnings Call Transcript.txt
Apple (AAPL) Q3 2018 Results - Earnings Call Transcript.txt
Apple (AAPL) Q4 2018 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q1 2019 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q1 2020 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q1 2021 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q1 2022 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q2 2020 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q2 2021 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q2 2022 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q3 2020 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q3 2021 Results - Earnings Call Transcript.txt
Apple Inc. (AAPL) CEO Tim Cook on Q4 2019

UnboundLocalError: local variable 'fiscal_quarter' referenced before assignment

In [65]:
results = search_client.search(search_text="Q3 2022", include_total_count=True)


for result in results:
    print(result)


{'speaker': 'Krish Sankar', 'speaker_background': 'Outsider', 'fiscal_quarter': 'Q3 2022', 'date_of_transcript': 'July 29, 2022 05:00 AM +08', 'transcript_name': '202207 - 2022Q3 - Apple, Inc. (AAPL) CEO Tim Cook on Q3 2022 Results - Earnings Call Transcript.txt', 'stock_ticker': 'AAPL', 'id': '730', 'chunk_number': '43', 'transcript_type': 'Earnings call', 'content': "And Tim, I apologize, it's also macro-related. You mentioned that it impacted digital advertising within Services. I'm just kind of curious, if the macro does worsen, do you worry about subscriber growth, App Store purchases, et cetera? And conversely, are there any parts of the Service business that you consider recession-proof, like maybe a buy now, pay later or something else? And then I have a quick follow-up for Luca.", '@search.score': 8.766413, '@search.reranker_score': None, '@search.highlights': None, '@search.captions': None}
{'speaker': 'Tim Cook', 'speaker_background': 'Insider', 'fiscal_quarter': 'Q3 2021', 

In [88]:
result = search_client.get_document(key='687')
result

{'id': '687',
 'content': 'Conference Call Participants',
 'speaker': 'Tejas Gala',
 'speaker_background': 'Insider',
 'date_of_transcript': 'February 03, 2023 06:00 AM +08',
 'fiscal_quarter': 'Q1 2023',
 'transcript_name': '202302 - 2023Q1 - Apple, Inc. (AAPL) Q1 2023 Earnings Call Transcript.txt',
 'transcript_type': 'Earnings call',
 'stock_ticker': 'AAPL',
 'chunk_number': '0'}