In [1]:
# Additional imports will be needed to interact with Azure Blob Storage and Azure Search.

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents import SearchClient

from dotenv import load_dotenv
# import ast
# import html
import json
import os
import re
import requests
from openai import AzureOpenAI
import re
import tempfile
import time
from abc import ABC, abstractmethod
# from concurrent.futures import ProcessPoolExecutor
# from functools import partial
from typing import Callable, List, Dict, Optional, Generator, Tuple, Union
from dataclasses import dataclass, asdict
import shutil

import markdown
import tiktoken
# from azure.identity import DefaultAzureCredential
# from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
from bs4 import BeautifulSoup
from langchain.text_splitter import TextSplitter, MarkdownTextSplitter, RecursiveCharacterTextSplitter, PythonCodeTextSplitter
from tqdm import tqdm
from typing import Any
load_dotenv()


FILE_FORMAT_DICT = {
    "md": "markdown",
    "txt": "text",
    "html": "html",
    "shtml": "html",
    "htm": "html",
    "py": "python",
    "pdf": "pdf",
    "docx": "docx",
    "pptx": "pptx"
}

RETRY_COUNT = 5

SENTENCE_ENDINGS = [".", "!", "?"]
WORDS_BREAKS = list(reversed([",", ";", ":", " ", "(", ")", "[", "]", "{", "}", "\t", "\n"]))

HTML_TABLE_TAGS = {"table_open": "<table>", "table_close": "</table>", "row_open":"<tr>"}

PDF_HEADERS = {
    "title": "h1",
    "sectionHeading": "h2"
}

class TokenEstimator(object):
    GPT2_TOKENIZER = tiktoken.get_encoding("gpt2")

    def estimate_tokens(self, text: Union[str, List]) -> int:

        return len(self.GPT2_TOKENIZER.encode(text, allowed_special="all"))

    def construct_tokens_with_size(self, tokens: str, numofTokens: int) -> str:
        newTokens = self.GPT2_TOKENIZER.decode(
            self.GPT2_TOKENIZER.encode(tokens, allowed_special="all")[:numofTokens]
        )
        return newTokens

TOKEN_ESTIMATOR = TokenEstimator()


In [2]:
def downloadBlobUrlToLocalFolder(blob_name, destination_folder):
    blob_service_client = BlobServiceClient.from_connection_string(os.getenv('AZURE_BLOB_STORAGE_CS'))
    container_client = blob_service_client.get_container_client(os.getenv("AZURE_BLOB_STORAGE_CONTAINER_NAME"))
    last_destination_folder = None
    if destination_folder != last_destination_folder:
        os.makedirs(destination_folder, exist_ok=True)
        last_destination_folder = destination_folder
    blob_client = container_client.get_blob_client(blob_name)
    with open(file=os.path.join(destination_folder, blob_name), mode='wb') as local_file:
        stream = blob_client.download_blob()
        local_file.write(stream.readall())

In [63]:
def convert_escaped_to_posix(escaped_path):
    windows_path = escaped_path.replace("\\\\", "\\")
    posix_path = windows_path.replace("\\", "/")
    return posix_path

def _get_file_format(file_name: str, extensions_to_process: List[str]) -> Optional[str]:
    """Gets the file format from the file name.
    Returns None if the file format is not supported.
    Args:
        file_name (str): The file name.
        extensions_to_process (List[str]): List of extensions to process.
    Returns:
        str: The file format.
    """

    # in case the caller gives us a file path
    file_name = os.path.basename(file_name)
    file_extension = file_name.split(".")[-1]
    if file_extension not in extensions_to_process:
        return None
    return FILE_FORMAT_DICT.get(file_extension, None)

def cleanup_content(content: str) -> str:
    """Cleans up the given content using regexes
    Args:
        content (str): The content to clean up.
    Returns:
        str: The cleaned up content.
    """
    output = re.sub(r"\n{2,}", "\n", content)
    output = re.sub(r"[^\S\n]{2,}", " ", output)
    output = re.sub(r"-{2,}", "--", output)

    return output.strip()

@dataclass
class Document(object):
    """A data class for storing documents

    Attributes:
        content (str): The content of the document.
        id (Optional[str]): The id of the document.
        title (Optional[str]): The title of the document.
        filepath (Optional[str]): The filepath of the document.
        url (Optional[str]): The url of the document.
        metadata (Optional[Dict]): The metadata of the document.    
    """

    content: str
    id: Optional[str] = None
    title: Optional[str] = None
    filepath: Optional[str] = None
    url: Optional[str] = None
    metadata: Optional[Dict] = None
    contentVector: Optional[List[float]] = None
    
@dataclass
class ChunkingResult:
    """Data model for chunking result

    Attributes:
        chunks (List[Document]): List of chunks.
        total_files (int): Total number of files.
        num_unsupported_format_files (int): Number of files with unsupported format.
        num_files_with_errors (int): Number of files with errors.
        skipped_chunks (int): Number of chunks skipped.
    """
    chunks: List[Document]
    total_files: int
    num_unsupported_format_files: int = 0
    num_files_with_errors: int = 0
    # some chunks might be skipped to small number of tokens
    skipped_chunks: int = 0


class BaseParser(ABC):
    """A parser parses content to produce a document."""

    @abstractmethod
    def parse(self, content: str, file_name: Optional[str] = None) -> Document:
        """Parses the given content.
        Args:
            content (str): The content to parse.
            file_name (str): The file name associated with the content.
        Returns:
            Document: The parsed document.
        """
        pass

    def parse_file(self, file_path: str) -> Document:
        """Parses the given file.
        Args:
            file_path (str): The file to parse.
        Returns:
            Document: The parsed document.
        """
        with open(file_path, "r") as f:
            return self.parse(f.read(), os.path.basename(file_path))

    def parse_directory(self, directory_path: str) -> List[Document]:
        """Parses the given directory.
        Args:
            directory_path (str): The directory to parse.
        Returns:
            List[Document]: List of parsed documents.
        """
        documents = []
        for file_name in os.listdir(directory_path):
            file_path = os.path.join(directory_path, file_name)
            if os.path.isfile(file_path):
                documents.append(self.parse_file(file_path))
        return documents


class HTMLParser(BaseParser):
    """Parses HTML content."""
    TITLE_MAX_TOKENS = 128
    NEWLINE_TEMPL = "<NEWLINE_TEXT>"

    def __init__(self) -> None:
        super().__init__()
        self.token_estimator = TokenEstimator()

    def parse(self, content: str, file_name: Optional[str] = None) -> Document:
        """Parses the given content.
        Args:
            content (str): The content to parse.
            file_name (str): The file name associated with the content.
        Returns:
            Document: The parsed document.
        """
        soup = BeautifulSoup(content, 'html.parser')

        # Extract the title
        title = ''
        if soup.title and soup.title.string:
            title = soup.title.string
        else:
            # Try to find the first <h1> tag
            h1_tag = soup.find('h1')
            if h1_tag:
                title = h1_tag.get_text(strip=True)
            else:
                h2_tag = soup.find('h2')
                if h2_tag:
                    title = h2_tag.get_text(strip=True)
        if title is None or title == '':
            # if title is still not found, guess using the next string
            try:
                title = next(soup.stripped_strings)
                title = self.token_estimator.construct_tokens_with_size(title, self.TITLE_MAX_TOKENS)

            except StopIteration:
                title = file_name

                # Helper function to process text nodes

        # Parse the content as it is without any formatting changes
        result = content
        if title is None:
            title = '' # ensure no 'None' type title

        return Document(content=cleanup_content(result), title=str(title))

class MarkdownParser(BaseParser):
    """Parses Markdown content."""

    def __init__(self) -> None:
        super().__init__()
        self._html_parser = HTMLParser()

    def parse(self, content: str, file_name: Optional[str] = None) -> Document:
        """Parses the given content.
        Args:
            content (str): The content to parse.
            file_name (str): The file name associated with the content.
        Returns:
            Document: The parsed document.
        """
        #html_content = markdown.markdown(content, extensions=['fenced_code', 'toc', 'tables', 'sane_lists'])

        return Document(content=content, title="")
    

class ParserFactory:
    def __init__(self):
        self._parsers = {
            "markdown": MarkdownParser(),
        }

    @property
    def supported_formats(self) -> List[str]:
        "Returns a list of supported formats"
        return list(self._parsers.keys())

    def __call__(self, file_format: str) -> BaseParser:
        parser = self._parsers.get(file_format, None)
        if parser is None:
            raise ValueError(f"{file_format} is not supported")

        return parser

parser_factory = ParserFactory()

def merge_chunks_serially(chunked_content_list: List[str], num_tokens: int, url_dict: Dict[str, str]={}) -> Generator[Tuple[str, int], None, None]:
    def unmask_urls(text, url_dict={}):
        if "##URL" in text:
            for key, value in url_dict.items():
                text = text.replace(key, value)
        return text
    # TODO: solve for token overlap
    current_chunk = ""
    total_size = 0
    for chunked_content in chunked_content_list:
        chunked_content = unmask_urls(chunked_content, url_dict)
        chunk_size = TOKEN_ESTIMATOR.estimate_tokens(chunked_content)
        if total_size > 0:
            new_size = total_size + chunk_size
            if new_size > num_tokens:
                yield current_chunk, total_size
                current_chunk = ""
                total_size = 0
        total_size += chunk_size
        current_chunk += chunked_content
    if total_size > 0:
        yield current_chunk, total_size

def chunk_content_helper(
        content: str, file_format: str, file_name: Optional[str],
        token_overlap: int,
        num_tokens: int = 256
) -> Generator[Tuple[str, int, Document], None, None]:
    if num_tokens is None:
        num_tokens = 1000000000

    parser = parser_factory(file_format.split("_pdf")[0]) # to handle cracked pdf converted to html
    doc = parser.parse(content, file_name=file_name)
    # if the original doc after parsing is < num_tokens return as it is
    doc_content_size = TOKEN_ESTIMATOR.estimate_tokens(doc.content)
    if doc_content_size < num_tokens:
        yield doc.content, doc_content_size, doc
    else:
        if file_format == "markdown":
            splitter = MarkdownTextSplitter.from_tiktoken_encoder(
                chunk_size=num_tokens, chunk_overlap=token_overlap)
            chunked_content_list = splitter.split_text(
                content)  # chunk the original content
            for chunked_content, chunk_size in merge_chunks_serially(chunked_content_list, num_tokens):
                chunk_doc = parser.parse(chunked_content, file_name=file_name)
                chunk_doc.title = doc.title
                yield chunk_doc.content, chunk_size, chunk_doc

class UnsupportedFormatError(Exception):
    """Exception raised when a format is not supported by a parser."""

    pass

def get_embedding(text, embedding_model_endpoint=None, embedding_model_key=None, azure_credential=None):
    endpoint = embedding_model_endpoint if embedding_model_endpoint else os.getenv("EMBEDDING_MODEL_ENDPOINT")
    key = embedding_model_key if embedding_model_key else os.getenv("EMBEDDING_MODEL_KEY")
    
    if azure_credential is None and (endpoint is None or key is None):
        raise Exception("EMBEDDING_MODEL_ENDPOINT and EMBEDDING_MODEL_KEY are required for embedding")

    try:
        endpoint_parts = endpoint.split("/openai/deployments/")
        base_url = endpoint_parts[0]
        #print(base_url)
        deployment_id = endpoint_parts[1].split("/embeddings")[0]
        #print(deployment_id)

        api_version = endpoint_parts[1].split("api-version=")[1].split("&")[0]

        # if azure_credential is not None:
        #     api_key = azure_credential.get_token("https://cognitiveservices.azure.com/.default").token
        # else:
        api_key = key

        client = AzureOpenAI(api_version=api_version, azure_endpoint=base_url, api_key=api_key)
        embeddings = client.embeddings.create(model=deployment_id, input=text)
        return embeddings.data[0].embedding

    except Exception as e:
        raise Exception(f"Error getting embeddings with endpoint={endpoint} with error={e}")

In [64]:
def chunk_content(
    content: str,
    file_name: Optional[str] = None,
    url: Optional[str] = None,
    ignore_errors: bool = True,
    num_tokens: int = 256,
    min_chunk_size: int = 10,
    token_overlap: int = 0,
    extensions_to_process = FILE_FORMAT_DICT.keys(),
    cracked_pdf = False,
    use_layout = False,
    add_embeddings = False,
    azure_credential = None,
    embedding_endpoint = None
) -> ChunkingResult:
    """Chunks the given content. If ignore_errors is true, returns None
        in case of an error
    Args:
        content (str): The content to chunk.
        file_name (str): The file name. used for title, file format detection.
        url (str): The url. used for title.
        ignore_errors (bool): If true, ignores errors and returns None.
        num_tokens (int): The number of tokens in each chunk.
        min_chunk_size (int): The minimum chunk size below which chunks will be filtered.
        token_overlap (int): The number of tokens to overlap between chunks.
    Returns:
        List[Document]: List of chunked documents.
    """

    try:
        if file_name is None or (cracked_pdf and not use_layout):
            file_format = "text"
        elif cracked_pdf:
            file_format = "html_pdf" # differentiate it from native html
        else:
            file_format = _get_file_format(file_name, extensions_to_process)
            if file_format is None:
                raise Exception(
                    f"{file_name} is not supported")

        chunked_context = chunk_content_helper(
            content=content,
            file_name=file_name,
            file_format=file_format,
            num_tokens=num_tokens,
            token_overlap=token_overlap
        )
        chunks = []
        skipped_chunks = 0
        for chunk, chunk_size, doc in chunked_context:
            if chunk_size >= min_chunk_size:
                if add_embeddings:
                    for _ in range(RETRY_COUNT):
                        try:
                            doc.contentVector = get_embedding(chunk, azure_credential=azure_credential, embedding_model_endpoint=embedding_endpoint)
                            break
                        except:
                            time.sleep(30)
                    if doc.contentVector is None:
                        raise Exception(f"Error getting embedding for chunk={chunk}")
                    

                chunks.append(
                    Document(
                        content=chunk,
                        title=doc.title,
                        url=url,
                        contentVector=doc.contentVector
                    )
                )
            else:
                skipped_chunks += 1

    except UnsupportedFormatError as e:
        if ignore_errors:
            return ChunkingResult(
                chunks=[], total_files=1, num_unsupported_format_files=1
            )
        else:
            raise e
    except Exception as e:
        if ignore_errors:
            return ChunkingResult(chunks=[], total_files=1, num_files_with_errors=1)
        else:
            raise e
    return ChunkingResult(
        chunks=chunks,
        total_files=1,
        skipped_chunks=skipped_chunks,
    )

In [65]:
def chunk_file(
    file_path: str,
    ignore_errors: bool = True,
    num_tokens=256,
    min_chunk_size=10,
    url = None,
    token_overlap: int = 0,
    extensions_to_process = FILE_FORMAT_DICT.keys(),
    form_recognizer_client = None,
    use_layout = False,
    add_embeddings=False,
    azure_credential = None,
    embedding_endpoint = None
) -> ChunkingResult:
    """Chunks the given file.
    Args:
        file_path (str): The file to chunk.
    Returns:
        List[Document]: List of chunked documents.
    """
    file_name = os.path.basename(file_path)
    file_format = _get_file_format(file_name, extensions_to_process)
    if not file_format:
        if ignore_errors:
            return ChunkingResult(
                chunks=[], total_files=1, num_unsupported_format_files=1
            )
        else:
            raise ValueError(f"{file_name} is not supported")

    cracked_pdf = False
    if file_format in ["pdf", "docx", "pptx"]:
        print("not supported")
    else:
        try:
            with open(file_path, "r", encoding="utf8") as f:
                content = f.read()
        except UnicodeDecodeError:
            from chardet import detect
            with open(file_path, "rb") as f:
                binary_content = f.read()
                encoding = detect(binary_content).get('encoding', 'utf8')
                content = binary_content.decode(encoding)
        
    return chunk_content(
        content=content,
        file_name=file_name,
        ignore_errors=ignore_errors,
        num_tokens=num_tokens,
        min_chunk_size=min_chunk_size,
        url=url,
        token_overlap=max(0, token_overlap),
        extensions_to_process=extensions_to_process,
        cracked_pdf=cracked_pdf,
        use_layout=use_layout,
        add_embeddings=add_embeddings,
        azure_credential=azure_credential,
        embedding_endpoint=embedding_endpoint
    )

In [66]:
def process_file(
        file_path: str, # !IMP: Please keep this as the first argument
        directory_path: str,
        ignore_errors: bool = True,
        num_tokens: int = 1024,
        min_chunk_size: int = 10,
        url_prefix = None,
        token_overlap: int = 0,
        extensions_to_process: List[str] = FILE_FORMAT_DICT.keys(),
        form_recognizer_client = None,
        use_layout = False,
        add_embeddings = False,
        azure_credential = None,
        embedding_endpoint = None
    ):

    if not form_recognizer_client:
        form_recognizer_client = None

    is_error = False
    try:
        url_path = None
        rel_file_path = os.path.relpath(file_path, directory_path)
        if url_prefix:
            url_path = url_prefix + rel_file_path
            url_path = convert_escaped_to_posix(url_path)

        result = chunk_file(
            file_path,
            ignore_errors=ignore_errors,
            num_tokens=num_tokens,
            min_chunk_size=min_chunk_size,
            url=url_path,
            token_overlap=token_overlap,
            extensions_to_process=extensions_to_process,
            form_recognizer_client=form_recognizer_client,
            use_layout=use_layout,
            add_embeddings=add_embeddings,
            azure_credential=azure_credential,
            embedding_endpoint=embedding_endpoint
        )
        for chunk_idx, chunk_doc in enumerate(result.chunks):
            chunk_doc.filepath = rel_file_path
            chunk_doc.metadata = json.dumps({"chunk_id": str(chunk_idx)})
    except Exception as e:
        print(e)
        if not ignore_errors:
            raise
        print(f"File ({file_path}) failed with ", e)
        is_error = True
        result =None
    return result, is_error

In [67]:
def chunk_directory(
        file_path: str,
        ignore_errors: bool = False,
        num_tokens= None, #int = 1024,
        min_chunk_size: int = 10,
        url_prefix = None,
        token_overlap: int = 0,
        extensions_to_process: List[str] = list(FILE_FORMAT_DICT.keys()),
        form_recognizer_client = None,
        use_layout = False,
        add_embeddings = True,
        azure_credential = None,
        embedding_endpoint = None
):
    """
    Chunks the given directory recursively
    Args:
        directory_path (str): The directory to chunk.
        ignore_errors (bool): If true, ignores errors and returns None.
        num_tokens (int): The number of tokens to use for chunking.
        min_chunk_size (int): The minimum chunk size.
        url_prefix (str): The url prefix to use for the files. If None, the url will be None. If not None, the url will be url_prefix + relpath. 
                            For example, if the directory path is /home/user/data and the url_prefix is https://example.com/data, 
                            then the url for the file /home/user/data/file1.txt will be https://example.com/data/file1.txt
        token_overlap (int): The number of tokens to overlap between chunks.
        extensions_to_process (List[str]): The list of extensions to process. 
        form_recognizer_client: Optional form recognizer client to use for pdf files.
        use_layout (bool): If true, uses Layout model for pdf files. Otherwise, uses Read.
        add_embeddings (bool): If true, adds a vector embedding to each chunk using the embedding model endpoint and key.

    Returns:
        List[Document]: List of chunked documents.
    """
    chunks = []
    total_files = 0
    num_unsupported_format_files = 0
    num_files_with_errors = 0
    skipped_chunks = 0

    # all_files_directory = get_files_recursively(directory_path)
    #files_to_process = [file_path for file_path in all_files_directory if os.path.isfile(file_path)]
    #print(f"Total files to process={len(files_to_process)} out of total directory size={len(all_files_directory)}")


    print("Single process to chunk and parse the files. --njobs > 1 can help performance.")
    #for file_path in tqdm(files_to_process):
    directory_path = os.path.dirname(file_path)
    total_files += 1
    result, is_error = process_file(file_path=file_path,directory_path=directory_path, ignore_errors=ignore_errors,
                                num_tokens=num_tokens,
                                min_chunk_size=min_chunk_size, url_prefix=url_prefix,
                                token_overlap=token_overlap,
                                extensions_to_process=extensions_to_process,
                                form_recognizer_client=form_recognizer_client, use_layout=use_layout, add_embeddings=add_embeddings,
                                azure_credential=azure_credential, embedding_endpoint=embedding_endpoint)
    if is_error:
        num_files_with_errors += 1
        
    chunks.extend(result.chunks)
    num_unsupported_format_files += result.num_unsupported_format_files
    num_files_with_errors += result.num_files_with_errors
    skipped_chunks += result.skipped_chunks

    return ChunkingResult(
            chunks=chunks,
            total_files=total_files,
            num_unsupported_format_files=num_unsupported_format_files,
            num_files_with_errors=num_files_with_errors,
            skipped_chunks=skipped_chunks,
        )

In [100]:
def upload_documents_to_index(blob_page, service_name, index_name, docs, credential=None, upload_batch_size = 50, admin_key=None):
    if credential is None and admin_key is None:
        raise ValueError("credential and admin_key cannot be None")
    
    to_upload_dicts = []

    id = int(blob_page.split("page_")[-1].split(".")[0])
    
    for d in docs:
        if type(d) is not dict:
            d = asdict(d)
        # add id to documents
        d.update({"@search.action": "upload", "id": str(id)})
        if "contentVector" in d and d["contentVector"] is None:
            del d["contentVector"]
        to_upload_dicts.append(d)
        id += 1
    
    endpoint = "https://{}.search.windows.net/".format(service_name)

    search_client = SearchClient(
        endpoint=endpoint,
        index_name=index_name,
        credential=AzureKeyCredential(admin_key),
    )
    # Upload the documents in batches of upload_batch_size
    for i in tqdm(range(0, len(to_upload_dicts), upload_batch_size), desc="Indexing Chunks..."):
        batch = to_upload_dicts[i: i + upload_batch_size]
        results = search_client.upload_documents(documents=batch)
        num_failures = 0
        errors = set()
        for result in results:
            if not result.succeeded:
                print(f"Indexing Failed for {result.key} with ERROR: {result.error_message}")
                num_failures += 1
                errors.add(result.error_message)
        if num_failures > 0:
            raise Exception(f"INDEXING FAILED for {num_failures} documents. Please recreate the index."
                            f"To Debug: PLEASE CHECK chunk_size and upload_batch_size. \n Error Messages: {list(errors)}")
        

def create_or_update_search_index(
        service_name, 
        index_name="dynamic-idx", 
        semantic_config_name="default", 
        credential=None, 
        language="en",
        vector_config_name="default",
        admin_key=None):
    
    if credential is None and admin_key is None:
        raise ValueError("credential and admin key cannot be None")

    url = f"https://{service_name}.search.windows.net/indexes/{index_name}?api-version=2023-07-01-Preview"
    headers = {
        "Content-Type": "application/json",
        "api-key": admin_key,
    }

    body = {
        "fields": [
            {
                "name": "id",
                "type": "Edm.String",
                "searchable": True,
                "key": True,
            },
            {
                "name": "content",
                "type": "Edm.String",
                "searchable": True,
                "sortable": False,
                "facetable": False,
                "filterable": False,
                "analyzer": f"{language}.lucene" if language else None,
            },
            {
                "name": "title",
                "type": "Edm.String",
                "searchable": True,
                "sortable": False,
                "facetable": False,
                "filterable": False,
                "analyzer": f"{language}.lucene" if language else None,
            },
            {
                "name": "filepath",
                "type": "Edm.String",
                "searchable": True,
                "sortable": False,
                "facetable": False,
                "filterable": True,
            },
            {
                "name": "url",
                "type": "Edm.String",
                "searchable": True,
            },
            {
                "name": "metadata",
                "type": "Edm.String",
                "searchable": True,
            },
        ],
        "suggesters": [],
        "scoringProfiles": [],
        "semantic": {
            "configurations": [
                {
                    "name": semantic_config_name,
                    "prioritizedFields": {
                        "titleField": {"fieldName": "title"},
                        "prioritizedContentFields": [{"fieldName": "content"}],
                        "prioritizedKeywordsFields": [],
                    },
                }
            ]
        },
    }

    if vector_config_name:
        body["fields"].append({
            "name": "contentVector",
            "type": "Collection(Edm.Single)",
            "searchable": True,
            "retrievable": True,
            "dimensions": 1536,
            "vectorSearchConfiguration": vector_config_name
        })

        body["vectorSearch"] = {
            "algorithmConfigurations": [
                {
                    "name": vector_config_name,
                    "kind": "hnsw"
                }
            ]
        }

    response = requests.put(url, json=body, headers=headers)
    if response.status_code == 201:
        print(f"Created search index {index_name}")
    elif response.status_code == 204:
        print(f"Updated existing search index {index_name}")
    else:
        raise Exception(f"Failed to create search index. Error: {response.text}")
    
    return True


def remove_content(path: Optional[str] = None, service_name=os.getenv("AZURE_SEARCH_SERVICE_NAME"), index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"), admin_key=os.getenv("AZURE_SEARCH_ADMIN_KEY")):

    endpoint = "https://{}.search.windows.net/".format(service_name)

    search_client = SearchClient(
        endpoint=endpoint,
        index_name=index_name,
        credential=AzureKeyCredential(admin_key),
    )
    print(f"Removing sections from '{path or '<all>'}' from search index '{index_name}'")
    filter = None if path is None else f"filepath eq '{os.path.basename(path)}'"
    result = search_client.search("", filter=filter, top=1000, include_total_count=True)
    removed_docs = search_client.delete_documents(
        documents=[{"id": document["id"]} for document in result]
    )
    print(f"\tRemoved {len(removed_docs)} sections from index")
    # It can take a few seconds for search results to reflect changes, so wait a bit


In [101]:
# create_or_update_search_index(
#         "eaom-manual-search", 
#         index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"), 
#         semantic_config_name="default", 
#         credential=None, 
#         language="en",
#         vector_config_name="default",
#         admin_key=os.getenv("AZURE_SEARCH_ADMIN_KEY"))

In [103]:
connect_str = os.getenv('AZURE_BLOB_STORAGE_CS')

# Create the BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client("dynamic")
blob_list = [blob.name for blob in container_client.list_blobs()]

In [105]:
with tempfile.TemporaryDirectory() as local_data_folder:
    for blob_page in blob_list:
        print(f"downloading {blob_page}")
        downloadBlobUrlToLocalFolder(blob_page, "." + local_data_folder)
        print("downloaded")
        file_path = os.path.join(("." + local_data_folder),blob_page)

        result = chunk_directory(
            file_path, num_tokens=None
        )

        upload_documents_to_index(blob_page=blob_page, service_name=os.getenv("AZURE_SEARCH_SERVICE_NAME"), index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"), docs=result.chunks, admin_key=os.getenv("AZURE_SEARCH_ADMIN_KEY"))
        print(f"completed {blob_page}")
        shutil.rmtree(os.path.dirname(file_path), ignore_errors=True)

downloading page_010.md
downloaded
Single process to chunk and parse the files. --njobs > 1 can help performance.


Indexing Chunks...: 100%|██████████| 1/1 [00:00<00:00,  1.13it/s]


completed page_010.md
downloading page_011.md
downloaded
Single process to chunk and parse the files. --njobs > 1 can help performance.


Indexing Chunks...: 100%|██████████| 1/1 [00:00<00:00,  1.13it/s]


completed page_011.md
downloading page_089.md
downloaded
Single process to chunk and parse the files. --njobs > 1 can help performance.


Indexing Chunks...: 100%|██████████| 1/1 [00:00<00:00,  1.14it/s]

completed page_089.md





In [102]:
remove_content(path="page_089.md")

Removing sections from 'page_089.md' from search index 'dynamic-idx'
	Removed 1 sections from index


In [None]:
def update_search_index(blob_page):
    with tempfile.TemporaryDirectory() as local_data_folder:
        downloadBlobUrlToLocalFolder(blob_page, "." + local_data_folder)
        print("downloaded")
        file_path = os.path.join(("." + local_data_folder),blob_page)

        result = chunk_directory(
            file_path, num_tokens=None
        )

        upload_documents_to_index(blob_page=blob_page, service_name=os.getenv("AZURE_SEARCH_SERVICE_NAME"), index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"), docs=result.chunks, admin_key=os.getenv("AZURE_SEARCH_ADMIN_KEY"))
        print(f"completed {blob_page}")
        shutil.rmtree(os.path.dirname(file_path), ignore_errors=True)