In [0]:
%pip install --quiet -U transformers==4.41.1 pypdf==4.1.0 langchain-text-splitters==0.2.0 databricks-vectorsearch==0.41 mlflow==2.16.2 tiktoken==0.7.0 torch==2.3.0 llama-index==0.10.43 pdfservices-sdk==4.0.0 langchain-community rapidocr-onnxruntime pymupdf pymupdf4llm databricks-agents==0.7.0 mlflow-skinny mlflow[gateway] langchain==0.2.1 langchain_core==0.2.5 langchain_community==0.2.4 databricks-sdk==0.23.0 grandalf
%pip install --force-reinstall --quiet databricks-feature-engineering
dbutils.library.restartPython()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:445)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:460)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:577)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:527)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:631)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:651)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

In [0]:
import zipfile
import logging
import os
import io
import pandas as pd
import json
import warnings
import re
import time
import requests
import collections

from datetime import datetime

from pypdf import PdfReader

from adobe.pdfservices.operation.auth.service_principal_credentials import ServicePrincipalCredentials
from adobe.pdfservices.operation.exception.exceptions import ServiceApiException, ServiceUsageException, SdkException
from adobe.pdfservices.operation.io.cloud_asset import CloudAsset
from adobe.pdfservices.operation.io.stream_asset import StreamAsset
from adobe.pdfservices.operation.pdf_services import PDFServices
from adobe.pdfservices.operation.pdf_services_media_type import PDFServicesMediaType
from adobe.pdfservices.operation.pdfjobs.jobs.extract_pdf_job import ExtractPDFJob
from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_element_type import ExtractElementType
from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_pdf_params import ExtractPDFParams
from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_renditions_element_type import ExtractRenditionsElementType
from adobe.pdfservices.operation.pdfjobs.result.extract_pdf_result import ExtractPDFResult

from typing import Union

import pymupdf
import pymupdf4llm
from PIL import Image

from llama_index.core.node_parser import SentenceSplitter, MarkdownElementNodeParser, MarkdownNodeParser
from databricks.vector_search.client import VectorSearchClient
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup
from databricks.feature_engineering import FeatureEngineeringClient, FeatureFunction
from databricks.feature_engineering.entities.feature_serving_endpoint import (
    EndpointCoreConfig,
    ServedEntity
)

import pyspark.sql.functions as F

In [0]:
# Hyperparameters
CHUNK_SIZE = 500
CHUNK_OVERLAP = 10
SPLIT_TYPE = 'markdown'

PARSERS = {
    'sentence': SentenceSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP),
    'markdown': MarkdownNodeParser()
}
PARSER = PARSERS[SPLIT_TYPE]

# Variables
BASE_URL = 'https://www.bpw.de/fileadmin/user_upload/Service/Downloads/'
URLS = [
    BASE_URL+'BPW-Aftermarketnews_75022201e_AM04_Conversion_of_brake_shoes_for_10-12_t_low_loader_trailer_axles.pdf',
    BASE_URL+'Code_number_designations_BRO_BPW_en_2020_39342001.pdf',
    BASE_URL+'Anhaengerachsen-5_5-t-Trailer-axles-5.5-tonnes-Essieux-jus-5_5-35031401def.pdf',
    BASE_URL+'BPW-Reference_times_for_warranty_work_2022-en-_39052201.pdf',
    BASE_URL+'Maintenance_instructions_BPW_Trailer_Axles_and_Suspensions_2024__33112401e.pdf',
    BASE_URL+'Brake_Drum-HKN-Workshop_manual-BPW-2024-_35192401e.pdf',
    BASE_URL+'HKN_trailer_axles_Spare_parts_list_BPW_31022101e_2021.pdf',
    BASE_URL+'Trailer_Disc_brake_TSB-ECODisc-_Workshop_manual_2023_en-BPW-35292301e.pdf',
    BASE_URL+'BPW-2023-service_manual_WH-TS2_35472302e.pdf',
    BASE_URL+'ECO_Disc_Disc_brake_TS2_TSB_Original_spare_parts_BPW_2024_en__31232401e.pdf'
]

PDF_ROOT_PATH = './pdfs/'
PROCESSED_PDF_ROOT_PATH = './processed-pdfs/'

PDF_PATHS = [os.path.join(PDF_ROOT_PATH, u.split('/')[-1]) for u in URLS]
PDF_NAMES = [path.replace('.pdf', '') for path in PDF_PATHS]

VECTOR_SEARCH_ENDPOINT_NAME = "rag_endpoint"

VSC = VectorSearchClient()

FEC = FeatureEngineeringClient()

CATALOG_NAME = "`test-catalog`"
SCHEMA_NAME = "bronze"
TABLE_PATH = "{}.{}.".format(CATALOG_NAME, SCHEMA_NAME) + "{table_name}"

VS_INDEX_FULLNAME = TABLE_PATH.format(table_name="hackathon_pdfs_self_managed_vs_index") # Where we want to store our index
PDFS_TABLE_FULLNAME = TABLE_PATH.format(table_name="hackathon_pdf_chunks") # Table containing the PDF's chunks

CHAIN_CONFIG_FILE = "rag_chain_config.yaml"

MODEL_NAME = "hackathon_rag_model"
MODEL_NAME_FQN = TABLE_PATH.format(table_name=MODEL_NAME)

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}")
spark.sql(f"USE SCHEMA {SCHEMA_NAME}")

In [0]:
# Initialize the logger
# logging.basicConfig(level=logging.INFO)

def extract_from_pdf(pdf_path, output_root_path):
    try:
        # Initial setup, create credentials instance
        credentials = ServicePrincipalCredentials(
            client_id=os.getenv('PDF_SERVICES_CLIENT_ID'),
            client_secret=os.getenv('PDF_SERVICES_CLIENT_SECRET')
        )

        # Creates a PDF Services instance
        pdf_services = PDFServices(credentials=credentials)

        pdf_file = open(pdf_path, 'rb')
        input_stream = pdf_file.read()

        # Creates an asset(s) from source file(s) and upload
        input_asset = pdf_services.upload(input_stream=input_stream, mime_type=PDFServicesMediaType.PDF)

        # Create parameters for the job
        extract_pdf_params = ExtractPDFParams(
            elements_to_extract=[ExtractElementType.TEXT, ExtractElementType.TABLES],
            elements_to_extract_renditions=[ExtractRenditionsElementType.TABLES, ExtractRenditionsElementType.FIGURES],
        )

        # Creates a new job instance
        extract_pdf_job = ExtractPDFJob(input_asset=input_asset, extract_pdf_params=extract_pdf_params)

        # Submit the job and gets the job result
        location = pdf_services.submit(extract_pdf_job)
        pdf_services_response = pdf_services.get_job_result(location, ExtractPDFResult)

        # Get content from the resulting asset(s)
        result_asset: CloudAsset = pdf_services_response.get_result().get_resource()
        stream_asset: StreamAsset = pdf_services.get_content(result_asset)

        # Creates an output stream and copy stream asset's content to it
        filename = "{}.zip".format(pdf_path.split('/')[-1]).replace(".pdf","")
        output_file_path = os.path.join(output_root_path, filename)
        print(output_file_path)
        with open(output_file_path, "wb") as file:
            file.write(stream_asset.get_input_stream())

        pdf_file.close()

        return output_file_path

    except (ServiceApiException, ServiceUsageException, SdkException) as e:
        logging.exception(f'Exception encountered while executing operation: {e}')


def extract_zip(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(zip_path.replace('.zip', '').replace(".pdf",""))
    os.remove(zip_path)

def get_reader(reference: Union[str, bytearray]):
    if isinstance(reference, str):
        pdf = reference
    else:
        pdf = io.BytesIO(reference)
    return PdfReader(pdf)

def read_pdf(reference: Union[str, bytes]):
    try:
        reader = get_reader(reference)
        return [[reader.metadata['/Title'], i, page_content.extract_text()] for i, page_content in enumerate(reader.pages)]
    except Exception as e:
        warnings.warn(f"Exception {e} has been thrown during parsing")
        return None

def get_metadata(pdf_reference: Union[str, bytes]):
    try:
        reader = get_reader(pdf_reference)
        subject = reader.metadata['/Subject']
        return subject
    except Exception as e:
        warnings.warn(f"Exception {e} has been thrown during parsing")
        return None


def render_page_as_image(pdf_path: str, page_number: int) -> Image:
    doc = pymupdf.open(pdf_path)
    page = doc[page_number]
    pix = page.get_pixmap(dpi=150)
    return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)


def extract_images(pdf_path: str):
    import io

    pil_imgs = []
    doc = pymupdf.open(pdf_path)
    for i in range(1, doc.xref_length()):
        try:
            img = doc.extract_image(i)
            pil_imgs.append(Image.open(io.BytesIO(img['image'])))
        except:
            pass
    return pil_imgs

def extract_tables(text: str):
    parser = MarkdownElementNodeParser()
    elems = parser.extract_elements(text)
    page_text = ''
    tables = []
    for elem in elems:
        if elem.type == 'table':
            tables.append(elem.element)
        else:
            page_text += elem.element
            ## TODO: elem.table contains a pd.DataFrame version of elem.element when elem.type == 'table'

    return page_text, tables

def preprocess_pdf(pdf_path, use_llama=False, isolate_tables=False, use_pandas=False):
    # img_root = './images'
    # img_path = os.path.join(img_root, PDF_PATHS[2].split('/')[-1].replace('.pdf', ''))
    # os.makedirs(img_path, exist_ok=True)

    # Process PDF
    if use_llama:   
        llama_reader = pymupdf4llm.LlamaMarkdownReader()
        pages = llama_reader.load_data(pdf_path, margins=0, table_strategy='lines')
    else:
        pages = pymupdf4llm.to_markdown(
            pdf_path, 
            page_chunks=True, 
            extract_words=False, 
            # write_images=True, 
            # dpi=150, 
            # image_path=img_path, 
            # image_format='png', 
            # image_size_limit=0, 
            margins=0, 
            table_strategy='lines'
        )

    cols = ['path', 'title', 'page_number', 'text']
    contents_df = None
    tables_df = None
    for page in pages:
        if use_llama:
            page = page.dict()
        text = page['text'].replace('-----', '').strip()
        text, tables = extract_tables(text) if isolate_tables else (text, None)

        data = [[page['metadata']['file_path'], page['metadata']['title'], page['metadata']['page'], text]]
        new_row = pd.DataFrame(data, columns=cols) if use_pandas else spark.createDataFrame(data, schema=cols)

        if contents_df is not None:
            contents_df = pd.concat([contents_df, new_row]) if use_pandas else contents_df.union(new_row)
        else:
            contents_df = new_row

        if tables is not None:
            for t in tables:
                data = [[page['metadata']['file_path'], page['metadata']['title'], page['metadata']['page'], t]]
                new_row = pd.DataFrame(data, columns=cols) if use_pandas else spark.createDataFrame(data, schema=cols)

                if tables_df is not None:
                    tables_df = pd.concat([tables_df, new_row]) if use_pandas else tables_df.union(new_row)
                else:
                    tables_df = new_row

    # Filter content
    initial_text = r"(?:.*\n+)?"
    markdown_titles = r"(#+\s.*\n+)"
    markdown_content = r"(.*)"
    pattern = initial_text + markdown_titles + markdown_content

    # pages_to_remove = []
    # for idx, row in contents.collect():
    #     # Use re.DOTALL to make '.' match any character including newlines
    #     matches = re.findall(pattern, row['content'], re.DOTALL)
    #     if len(matches) == 0:
    #         contents = contents.filter(contents['page_number'] != idx + 1)

    # contents_df = contents_df.withColumn('to_remove', (F.regexp_extract('text', pattern, 0) == '')).filter('to_remove == false').drop('to_remove')

    return contents_df, tables_df

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
def endpoint_exists(vsc, vs_endpoint_name):
  try:
    return vs_endpoint_name in [e['name'] for e in vsc.list_endpoints().get('endpoints', [])]
  except Exception as e:
    #Temp fix for potential REQUEST_LIMIT_EXCEEDED issue
    if "REQUEST_LIMIT_EXCEEDED" in str(e):
      print("WARN: couldn't get endpoint status due to REQUEST_LIMIT_EXCEEDED error. The demo will consider it exists")
      return True
    else:
      raise e

def wait_for_vs_endpoint_to_be_ready(vsc, vs_endpoint_name):
  for i in range(180):
    try:
      endpoint = vsc.get_endpoint(vs_endpoint_name)
    except Exception as e:
      #Temp fix for potential REQUEST_LIMIT_EXCEEDED issue
      if "REQUEST_LIMIT_EXCEEDED" in str(e):
        print("WARN: couldn't get endpoint status due to REQUEST_LIMIT_EXCEEDED error. Please manually check your endpoint status")
        return
      else:
        raise e
    status = endpoint.get("endpoint_status", endpoint.get("status"))["state"].upper()
    if "ONLINE" in status:
      return endpoint
    elif "PROVISIONING" in status or i <6:
      if i % 20 == 0: 
        print(f"Waiting for endpoint to be ready, this can take a few min... {endpoint}")
      time.sleep(10)
    else:
      raise Exception(f'''Error with the endpoint {vs_endpoint_name}. - this shouldn't happen: {endpoint}.\n Please delete it and re-run the previous cell: vsc.delete_endpoint("{vs_endpoint_name}")''')
  raise Exception(f"Timeout, your endpoint isn't ready yet: {vsc.get_endpoint(vs_endpoint_name)}")

def index_exists(vsc, endpoint_name, index_full_name):
    try:
        vsc.get_index(endpoint_name, index_full_name).describe()
        return True
    except Exception as e:
        if 'RESOURCE_DOES_NOT_EXIST' not in str(e):
            print(f'Unexpected error describing the index. This could be a permission issue.')
            raise e
    return False
    
def wait_for_index_to_be_ready(vsc, vs_endpoint_name, index_name):
  for i in range(180):
    idx = vsc.get_index(vs_endpoint_name, index_name).describe()
    index_status = idx.get('status', idx.get('index_status', {}))
    status = index_status.get('detailed_state', index_status.get('status', 'UNKNOWN')).upper()
    url = index_status.get('index_url', index_status.get('url', 'UNKNOWN'))
    if "ONLINE" in status:
      return
    if "UNKNOWN" in status:
      print(f"Can't get the status - will assume index is ready {idx} - url: {url}")
      return
    elif "PROVISIONING" in status:
      if i % 40 == 0: print(f"Waiting for index to be ready, this can take a few min... {index_status} - pipeline url:{url}")
      time.sleep(10)
    else:
        raise Exception(f'''Error with the index - this shouldn't happen. DLT pipeline might have been killed.\n Please delete it and re-run the previous cell: vsc.delete_index("{index_name}, {vs_endpoint_name}") \nIndex details: {idx}''')
  raise Exception(f"Timeout, your index isn't ready yet: {vsc.get_index(index_name, vs_endpoint_name)}")

def wait_for_model_serving_endpoint_to_be_ready(ep_name, secs=10):
    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import EndpointStateReady, EndpointStateConfigUpdate
    import time

    # Wait for it to be ready
    w = WorkspaceClient()
    state = ""
    for i in range(200):
        state = w.serving_endpoints.get(ep_name).state
        if state.config_update == EndpointStateConfigUpdate.IN_PROGRESS:
            if i % 40 == 0:
                print(f"Waiting for endpoint to deploy {ep_name}. Current state: {state}")
            time.sleep(secs)
        elif state.ready == EndpointStateReady.READY:
          print('endpoint ready.')
          return
        else:
          break
    raise Exception(f"Couldn't start the endpoint, timeout, please check your endpoint for more details: {state}")

In [0]:
def deploy_feature_table(catalog_table_path: str, feature_table_name: str, endpoint_name: str, lookup_key: str, workload_size: str = "Small", scale_to_zero: bool = True):
    # Create a lookup to fetch features by key
    features=[
        FeatureLookup(
            table_name=catalog_table_path.replace('`', ''),
            lookup_key=lookup_key
        )
    ]

    # Create feature spec with the lookup for features
    feature_table_spec_name = feature_table_name.replace('`', '')
    try:
        FEC.create_feature_spec(name=feature_table_spec_name, features=features)
    except Exception as e:
        if "already exists" in str(e):
            pass
        else:
            raise e
        
    # Create endpoint for serving the table
    try:
        status = FEC.create_feature_serving_endpoint(
            name=endpoint_name, 
            config=EndpointCoreConfig(
            served_entities=ServedEntity(
                feature_spec_name=feature_table_spec_name, 
                workload_size=workload_size, 
                scale_to_zero_enabled=scale_to_zero)
            )
        )

        # Print endpoint creation status
        print(status)
    except Exception as e:
        if "already exists" in str(e):
            pass
        else:
            raise e

In [0]:
def get_warehouse(name=None):
    from databricks.sdk import WorkspaceClient

    w = WorkspaceClient()
    warehouses = w.warehouses.list()
    for wh in warehouses:
        if wh.name == name:
            return wh
    for wh in warehouses:
        if wh.name.lower() == "shared endpoint":
            return wh
    for wh in warehouses:
        if wh.name.lower() == "dbdemos-shared-endpoint":
            return wh
    #Try to fallback to an existing shared endpoint.
    for wh in warehouses:
        if "dbdemos" in wh.name.lower():
            return wh
    for wh in warehouses:
        if "shared" in wh.name.lower():
            return wh
    for wh in warehouses:
        if wh.num_clusters > 0:
            return wh       
    raise Exception("Couldn't find any Warehouse to use. Please create a wh first to run the demo and add the id here")

def display_tools(tools):
    display(pd.DataFrame([{k: str(v) for k, v in vars(tool).items()} for tool in tools]))

In [0]:
def download_pdf(dest_path, urls):
    def download_file(url, destination):
      local_filename = url.split('/')[-1]
      with requests.get(url, stream=True) as r:
          r.raise_for_status()
          print('saving '+destination+'/'+local_filename)
          with open(destination+'/'+local_filename, 'wb') as f:
              for chunk in r.iter_content(chunk_size=8192): 
                  f.write(chunk)
      return local_filename

    if not os.path.exists(dest_path):
      os.makedirs(dest_path)
    from concurrent.futures import ThreadPoolExecutor
    def download_to_dest(url):
        download_file(url, dest_path)
    with ThreadPoolExecutor(max_workers=10) as executor:
        collections.deque(executor.map(download_to_dest, urls))

def upload_pdfs_to_volume(volume_path, urls):
  try:
    download_pdf(volume_path, urls)
  except Exception as e:
    print(str(e))

In [None]:
def deduplicate_assessments_table(assessment_table):
    # De-dup response assessments
    assessments_request_deduplicated_df = spark.sql(f"""select * except(row_number)
                                        from ( select *, row_number() over (
                                                partition by request_id
                                                order by
                                                timestamp desc
                                            ) as row_number from {assessment_table} where text_assessment is not NULL
                                        ) where row_number = 1""")
    # De-dup the retrieval assessments
    assessments_retrieval_deduplicated_df = spark.sql(f"""select * except( retrieval_assessment, source, timestamp, text_assessment, schema_version),
        any_value(timestamp) as timestamp,
        any_value(source) as source,
        collect_list(retrieval_assessment) as retrieval_assessments
      from {assessment_table} where retrieval_assessment is not NULL group by request_id, source.id, step_id"""    )

    # Merge together
    assessments_request_deduplicated_df = assessments_request_deduplicated_df.drop("retrieval_assessment", "step_id")
    assessments_retrieval_deduplicated_df = assessments_retrieval_deduplicated_df.withColumnRenamed("request_id", "request_id2").withColumnRenamed("source", "source2").drop("step_id", "timestamp")

    merged_deduplicated_assessments_df = assessments_request_deduplicated_df.join(
        assessments_retrieval_deduplicated_df,
        (assessments_request_deduplicated_df.request_id == assessments_retrieval_deduplicated_df.request_id2) &
        (assessments_request_deduplicated_df.source.id == assessments_retrieval_deduplicated_df.source2.id),
        "full"
    ).select(
        [str(col) for col in assessments_request_deduplicated_df.columns] +
        [assessments_retrieval_deduplicated_df.retrieval_assessments]
    )

    return merged_deduplicated_assessments_df

In [None]:
def get_latest_model(model_name):
    from mlflow.tracking import MlflowClient
    mlflow_client = MlflowClient(registry_uri="databricks-uc")
    latest_version = None
    for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
        version_int = int(mv.version)
        if not latest_version or version_int > int(latest_version.version):
            latest_version = mv
    return latest_version