In [106]:
import os
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from abc import ABC, abstractmethod

In [107]:
docx_loader = DirectoryLoader("../../../preprocessing/DATA", glob="**/*.docx")

In [108]:
docs = docx_loader.load()

In [109]:
import re


def extract_info_from_filename(filename):
    """
    Input: filename ("MSFTTranscriptFY23Q4")
    Output: Extract stock symbol, year and quarter from filename
    """
    pattern = r"([A-Z]+)TranscriptFY(\d{2})Q(\d)"
    match = re.search(pattern, filename)

    if match:
        symbol = match.group(1)
        fiscal_year = match.group(2)
        fiscal_quarter = match.group(3)
        return symbol, fiscal_year, fiscal_quarter
    else:
        return None

In [110]:
doc_chunks = []

for doc in docs:
    source = doc.metadata["source"]
    symbol, fiscal_year, fiscal_quarter = extract_info_from_filename(source)

    text_splitter = RecursiveCharacterTextSplitter(
        separators=[
            "\n## ",
            "\n### ",
            "\n#### ",
            "\n##### ",
            "\n###### ",
            "```\n\n",
            "\n\n***\n\n",
            "\n\n---\n\n",
            "\n\n___\n\n",
            "\n\n",
            "\n",
            " ",
            "",
        ]
    )

    chunks = text_splitter.split_text(doc.page_content)
    for i, chunk in enumerate(chunks):
        doc = Document(
            page_content=chunk,
            metadata={
                "source": source,
                "symbol": symbol,
                "fiscal_year": fiscal_year,
                "fiscal_quarter": fiscal_quarter,
                "chunk": i,
            },
        )
        doc_chunks.append(doc)

In [111]:
len(doc_chunks)

60

In [112]:
from dotenv import dotenv_values
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import openai
import pandas as pd
import numpy as np
import time
import requests

# specify the name of the .env file name
env_name = "../../../../.env"  # change to your own .env file name
config = dotenv_values(env_name)

if config["KEYS_FROM"] == "KEYVAULT":
    print("keyvault was selected.")
    keyVaultName = config["KEY_VAULT_NAME"]
    KVUri = f"https://{keyVaultName}.vault.azure.net"

    credential = DefaultAzureCredential()
    client = SecretClient(vault_url=KVUri, credential=credential)

    openai.api_type = client.get_secret("OPENAI-API-TYPE").value
    openai.api_key = client.get_secret("OPENAI-API-KEY").value
    openai.api_base = client.get_secret("OPENAI-API-BASE").value
    openai.api_version = client.get_secret("OPENAI-API-VERSION").value
    deployment_embedding = client.get_secret("OPENAI-DEPLOYMENT-EMBEDDING").value
else:
    openai.api_type = config["OPENAI_API_TYPE"]
    openai.api_key = config["OPENAI_API_KEY"]
    openai.api_base = config["OPENAI_API_BASE"]
    openai.api_version = config["OPENAI_API_VERSION"]
    deployment_embedding = config["OPENAI_DEPLOYMENT_EMBEDDING"]


def createEmbeddings(text, endpoint, api_key, api_version, embedding_model_deployment):
    request_url = f"{endpoint}/openai/deployments/{embedding_model_deployment}/embeddings?api-version={api_version}"
    headers = {"Content-Type": "application/json", "api-key": api_key}
    request_payload = {"input": text}
    embedding_response = requests.post(
        request_url, json=request_payload, headers=headers, timeout=None
    )
    if embedding_response.status_code == 200:
        data_values = embedding_response.json()["data"]
        embeddings_vectors = [data_value["embedding"] for data_value in data_values]
        return embeddings_vectors
    else:
        raise Exception(f"failed to get embedding: {embedding_response.json()}")

keyvault was selected.


In [113]:
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from dotenv import dotenv_values

config = dotenv_values(env_name)


if config["KEYS_FROM"] == "KEYVAULT":
    print("keyvault was selected.")
    keyVaultName = config["KEY_VAULT_NAME"]
    KVUri = f"https://{keyVaultName}.vault.azure.net"

    credential = DefaultAzureCredential()
    client = SecretClient(vault_url=KVUri, credential=credential)
    POSTGRES_CONN_STRING = (client.get_secret("COSMOSDB-POSTGRES-CONN-STRING").value,)
else:
    print(".env was selected.")
    POSTGRES_CONN_STRING = config["COSMOSDB_POSTGRES_CONN_STRING"]

keyvault was selected.


In [114]:
docs

[Document(page_content="Microsoft FY23 First Quarter Earnings Conference Call\n\nBrett Iversen, Satya Nadella, Amy Hood\n\nTuesday, October 25, 2022\n\nBRETT IVERSEN: \n\nGood afternoon and thank you for joining us today. On the call with me are Satya Nadella, chairman and chief executive officer, Amy Hood, chief financial officer, Alice Jolla, chief accounting officer, and Keith Dolliver, deputy general counsel.\n\nOn the Microsoft Investor Relations website, you can find our earnings press release and financial summary slide deck, which is intended to supplement our prepared remarks during today’s call and provides the reconciliation of differences between GAAP and non-GAAP financial measures. \n\nOn this call we will discuss certain non-GAAP items. The non-GAAP financial measures provided should not be considered as a\xa0substitute for or superior to the measures of financial performance prepared in accordance with GAAP.\xa0They are included as additional clarifying items to aid inv

In [115]:
data = []
for i, doc in enumerate(doc_chunks):
    # Create embeddings using the provided function
    embeddings = createEmbeddings(
        doc.page_content,
        openai.api_base,
        openai.api_key,
        openai.api_version,
        deployment_embedding,
    )[0]
    data.append(
        {
            "id": i,
            "content": doc.page_content,
            "embedding": embeddings,
            "symbol": doc.metadata["symbol"],
            "fiscal_year": doc.metadata["fiscal_year"],
            "fiscal_quarter": doc.metadata["fiscal_quarter"],
            "source": doc.metadata["source"],
            "chunkid": doc.metadata["chunk"],
        }
    )

In [116]:
import pandas as pd

df = pd.DataFrame(data)

In [117]:
df.columns

Index(['id', 'content', 'embedding', 'symbol', 'fiscal_year', 'fiscal_quarter',
       'source', 'chunkid'],
      dtype='object')

In [118]:
data[0]

{'id': 0,
 'content': "Microsoft FY23 First Quarter Earnings Conference Call\n\nBrett Iversen, Satya Nadella, Amy Hood\n\nTuesday, October 25, 2022\n\nBRETT IVERSEN: \n\nGood afternoon and thank you for joining us today. On the call with me are Satya Nadella, chairman and chief executive officer, Amy Hood, chief financial officer, Alice Jolla, chief accounting officer, and Keith Dolliver, deputy general counsel.\n\nOn the Microsoft Investor Relations website, you can find our earnings press release and financial summary slide deck, which is intended to supplement our prepared remarks during today’s call and provides the reconciliation of differences between GAAP and non-GAAP financial measures. \n\nOn this call we will discuss certain non-GAAP items. The non-GAAP financial measures provided should not be considered as a\xa0substitute for or superior to the measures of financial performance prepared in accordance with GAAP.\xa0They are included as additional clarifying items to aid inve

In [119]:
import pymongo
import psycopg2
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from abc import ABC, abstractmethod
from pgvector.psycopg2 import register_vector
from psycopg2 import pool
from psycopg2 import Error
from psycopg2 import sql


class DatabaseService(ABC):
    @abstractmethod
    def store_data(self, data):
        pass

    @abstractmethod
    def retrieve_data(self, query, num_results):
        pass


class PostgresDBService(DatabaseService):
    def __init__(self, table_name):
        self.table_name = table_name
        self._connect_db()
        self._create_pg_extention()

    def _connect_db(self):
        postgreSQL_pool = pool.SimpleConnectionPool(1, 20, POSTGRES_CONN_STRING[0])
        if postgreSQL_pool:
            print("Connection pool created successfully")
        # Use getconn() to get a connection from the connection pool
        self.connection = postgreSQL_pool.getconn()
        self.cursor = self.connection.cursor()

    def _create_pg_extention(self):
        try:
            # Start a new transaction
            self.cursor.execute("BEGIN;")

            # Previous transaction statements
            # ...

            # Check if the extension already exists
            extension_query = "SELECT * FROM pg_extension WHERE extname = 'vector';"
            self.cursor.execute(extension_query)
            extension_exists = self.cursor.fetchone()

            if not extension_exists:
                # Extension does not exist, create it
                create_extension_query = "CREATE EXTENSION vector;"
                self.cursor.execute(create_extension_query)
                self.connection.commit()
            else:
                # Extension already exists, pass through
                pass

            # Commit the transaction
            self.cursor.execute("COMMIT;")
        except Exception as e:
            # An error occurred, rollback the transaction
            self.cursor.execute("ROLLBACK;")
            raise e
        finally:
            # Close the cursor
            self.cursor.close()

    def check_pgvector_connection(self):
        try:
            self.cursor = self.connection.cursor()
            # Define the SHOW EXTENSIONS query
            show_extensions_query = "SHOW azure.extensions;"

            # Execute the SHOW EXTENSIONS query
            self.cursor.execute(show_extensions_query)

            self.connection.commit()
            # Fetch and print the results
            results = self.cursor.fetchall()
            for row in results:
                print(row)
            self.cursor.close()
        except:
            print("Warning: could not check the pg vector extension")
        finally:
            self.cursor.close()

    def create_schema(self, doc_chunks):
        import pandas as pd

        data = []
        for i, doc in enumerate(doc_chunks):
            # Create embeddings using the provided function
            embeddings = createEmbeddings(
                doc.page_content,
                openai.api_base,
                openai.api_key,
                openai.api_version,
                deployment_embedding,
            )[0]
            data.append(
                {
                    "id": i,
                    "content": doc.page_content,
                    "embedding": embeddings,
                    "symbol": doc.metadata["symbol"],
                    "fiscal_year": doc.metadata["fiscal_year"],
                    "fiscal_quarter": doc.metadata["fiscal_quarter"],
                    "source": doc.metadata["source"],
                    "chunkid": doc.metadata["chunk"],
                }
            )
        return pd.DataFrame(data)

    def store_data(self, df):
        # Convert the DataFrame to a list of tuples for bulk insertion
        records = df.to_records(index=False)
        records_list = records.tolist()

        # Open a cursor to perform database operations
        cursor = self.connection.cursor()

        # Define the table name
        table_name = self.table_name
        batch_size = 10

        # Execute the query to check if the table exists
        cursor.execute(
            f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}');"
        )

        # Fetch the result
        exists = cursor.fetchone()[0]

        if exists:
            print(f"The table '{table_name}' exists in the database.")
            print(
                "You may drop previous table (see commented code above) if you want to re-insert reviews."
            )
        else:
            print(
                f"The table '{table_name}' does not exist in the database. Creating it now and inserting data ..."
            )

            # Use getconn() to get a connection from the connection pool
            with self.connection as connection:
                # Define the CREATE TABLE query
                create_table_query = f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    Id INTEGER PRIMARY KEY,
                    Content TEXT,
                    Embedding VECTOR,
                    Symbol TEXT,
                    FiscalYear TEXT,
                    FiscalQuarter INTEGER,
                    Source TEXT,
                    ChunkId TEXT
                );
                """
                # Execute the CREATE TABLE query
                cursor.execute(create_table_query)
                connection.commit()

                # Define the INSERT INTO query
                insert_query = (
                    f"INSERT INTO {table_name} (Id, Content, Embedding, Symbol, FiscalYear, FiscalQuarter, Source, ChunkId) "
                    f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
                )

                # Execute the INSERT INTO query for each row
                cursor.executemany(insert_query, records_list)
                connection.commit()

                # Execute the CREATE TABLE query
                try:
                    with connection.cursor() as cursor:
                        cursor.execute(create_table_query)
                        connection.commit()
                        print(f"Table {table_name} created successfully!")
                except (Exception, Error) as e:
                    print(f"Error creating table {table_name}: {e}")
                    connection.rollback()

                # Convert numpy.int32 to int in each row
                records_list = [
                    tuple(
                        int(value) if isinstance(value, np.int32) else value
                        for value in record
                    )
                    for record in records_list
                ]

                # Split the records list into batches
                batches = [
                    records_list[i : i + batch_size]
                    for i in range(0, len(records_list), batch_size)
                ]

                # Iterate over each batch and perform bulk insert
                count = 0
                for batch in batches:
                    count += 1
                    print(f"Inserting batch {count} into the table")
                    try:
                        insert_query = sql.SQL(
                            f"INSERT INTO {table_name} (Id, Content, Embedding, Symbol, FiscalYear, FiscalQuarter, Source, ChunkId) "
                            f"VALUES ({', '.join(['%s'] * len(batch[0]))})"
                        )

                        with connection.cursor() as cursor:
                            cursor.executemany(insert_query, batch)
                            connection.commit()
                    except (Exception, Error) as e:
                        print(f"Error inserting batch into the table: {e}")
                        connection.rollback()

    def retrieve_data(self, query, num_results=3):
        # Register 'pgvector' type for the 'embedding' column
        register_vector(self.connection)
        queryEmbedding = createEmbeddings(
            query,
            openai.api_base,
            openai.api_key,
            openai.api_version,
            deployment_embedding,
        )[0]

        select_query = f"SELECT id FROM {self.table_name} ORDER BY embedding <-> %s LIMIT {num_results}"
        cursor = self.connection.cursor()
        cursor.execute(select_query, (np.array(queryEmbedding),))
        results = cursor.fetchall()

        # Use the top k ids to retrieve the actual text from the database
        top_ids = []
        for i in range(len(results)):
            top_ids.append(int(results[i][0]))

        self.connection.rollback()

        format_ids = ", ".join(["%s"] * len(top_ids))

        sql = f"SELECT CONCAT('Content: ', Content, 'Symbol:', Symbol, ' ', 'FiscalYear: ', FiscalYear, ' ', 'FiscalQuarter: ', FiscalQuarter, ' ', 'Source: ', Source) AS concat FROM {self.table_name} WHERE id IN ({format_ids})"

        # Execute the SELECT statement
        try:
            cursor.execute(sql, top_ids)
            top_rows = cursor.fetchall()
            output = []
            for row in top_rows:
                output.append(row)
        except (Exception, Error) as e:
            print(f"Error executing SELECT statement: {e}")

        return output

In [126]:
testdb = PostgresDBService(table_name="msft_transcript")

Connection pool created successfully


In [127]:
testdb.check_pgvector_connection()



In [128]:
df = testdb.create_schema(doc_chunks)

In [129]:
testdb.store_data(df)

The table 'msft_transcript' does not exist in the database. Creating it now and inserting data ...
Table msft_transcript created successfully!
Inserting batch 1 into the table
Error inserting batch into the table: duplicate key value violates unique constraint "msft_transcript_pkey"
DETAIL:  Key (id)=(0) already exists.

Inserting batch 2 into the table
Error inserting batch into the table: duplicate key value violates unique constraint "msft_transcript_pkey"
DETAIL:  Key (id)=(10) already exists.

Inserting batch 3 into the table
Error inserting batch into the table: duplicate key value violates unique constraint "msft_transcript_pkey"
DETAIL:  Key (id)=(20) already exists.

Inserting batch 4 into the table
Error inserting batch into the table: duplicate key value violates unique constraint "msft_transcript_pkey"
DETAIL:  Key (id)=(30) already exists.

Inserting batch 5 into the table
Error inserting batch into the table: duplicate key value violates unique constraint "msft_transcript

In [130]:
results = testdb.retrieve_data("what is the growth rate for azure ml revenue?")

In [131]:
results[1]

('Content: And finally, we returned $9.7 billion to shareholders through share repurchases and dividends.\n\nNow, moving to our Q4 outlook, which unless specifically noted otherwise, is on a US dollar basis. \n\nMy commentary, for next quarter and FY24, does not include any impact from Activision, which we continue to work towards closing in fiscal year 2023, subject to obtaining required regulatory approvals.\n\nNow to FX. Based on current rates, we expect FX to decrease total revenue growth by approximately 2 points with no impact to COGS or operating expense growth. Within the segments, we anticipate roughly 2 points of negative FX impact on revenue growth in Productivity and Business Processes and Intelligent Cloud and roughly 1 point in More Personal Computing.\n\n\n\nOverall, our outlook has many of the trends we saw in Q3 continue thru Q4. In our largest quarter of the year, we expect customer demand for our differentiated solutions including our AI platform and consistent execu