In [1]:
import os
import base64
import pandas as pd
import asyncio
import tiktoken
import time
import re
import json
import asyncio
import nest_asyncio

from datetime import datetime
from dotenv import load_dotenv
from openai import RateLimitError, APIStatusError
from llama_index.llms.azure_openai import AzureOpenAI

from llama_index.core import (
    PromptTemplate
)
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from llama_index.core.query_pipeline import QueryPipeline, FnComponent
from mimetypes import guess_type
from pydantic import BaseModel, Field, conint, confloat
from llama_index.core.output_parsers import PydanticOutputParser

from typing import List, Optional, Literal

from dataclasses import dataclass, field

from helpers.helpers import read_json_to_str

[nltk_data] Downloading package punkt_tab to
[nltk_data]     c:\Users\LENOVO\anaconda3\envs\bp_digitalization\Lib\s
[nltk_data]     ite-packages\llama_index\core\_static/nltk_cache...
[nltk_data]   Package punkt_tab is already up-to-date!


In [2]:
from llama_index.core import Settings
from azure_authentication.customized_azure_login import CredentialFactory


In [3]:
os.chdir("../")
CWD = os.getcwd()

data_dir = os.path.join(CWD, 'data')

#Specify mode (working with a sample or all the files?)
sample_mode = False 
sample_size = 50

# specify file path
INPUT_FILE_PATH = os.path.join(CWD, "data", "proc", "building_plans_sample", "test_images", "bp_text.json")
METADATA_PATH = os.path.join(CWD, "data", "proc", "building_plans", "metadata","building_plans_metadata.csv")

# specify relevant column names
ID_COLUMN='filename'
TEXT_COLUMN='content'

# read in data
bp_text = pd.read_json(INPUT_FILE_PATH)
metadata_df = pd.read_csv(METADATA_PATH)

In [4]:
metadata_bps = metadata_df[metadata_df['Planart'].isin(['qualifizierter Bebauungsplan', 'einfacher Bebauungsplan', 'vorhabenbezogener Bebauungsplan'])]

In [5]:
bp_text['id'] = bp_text['filename'].str.extract(r'(\d+)_').astype(int)

In [6]:
#input_df = metadata_bps.merge(bp_text)

In [7]:
input_df = bp_text

In [8]:
# Recommendation: Configure your own authentication workflow with environment variables, see the description at
# https://github.com/soda-lmu/azure-auth-helper-python/blob/main/AuthenticationWorkflowSetup.md
credential = CredentialFactory().select_credential()
token_provider = credential.get_login_token_to_azure_cognitive_services()

print("Instantiate Azure OpenAI Client")

Instantiate Azure OpenAI Client


In [9]:
class Llm:
    """

    Contains LLM model

    """

    def __init__(self,
                 model_name="gpt-35-turbo-16k",
                 max_parallel_llm_prompts_running=8):
        
        self.model_name = model_name
        
        credential = CredentialFactory().select_credential()
        token_provider = credential.get_login_token_to_azure_cognitive_services()

        if self.model_name == "gpt-35-turbo-16k":

            self.llamaindex_llm = AzureOpenAI(
                engine="gpt-35-turbo-0301",
                model="gpt-35-turbo-16k",
                temperature=0.0,
                azure_endpoint=os.environ["AZURE_ENDPOINT_GIST_PROJECT_WESTEUROPE"],
                # use_azure_ad=True, # only useful for debugging purposes?
                api_version="2024-02-01",
                api_key=token_provider()
            )

            self.token_counter = TokenCountingHandler(
                # both gpt-3.5-turbo and gpt-4 are based on the same cl100k_base encoding -> doesn't matter which model we use here
                # tokenizer=tiktoken.encoding_for_model("gpt-3.5-turbo").encode
                tokenizer=tiktoken.encoding_for_model("gpt-4").encode
            )

            self.semaphore = asyncio.Semaphore(
                max_parallel_llm_prompts_running)

        elif self.model_name == "gpt-4-1106-preview":

            self.llamaindex_llm = AzureOpenAI(
                engine="gpt-4-1106-preview", model="gpt-4-1106-preview", temperature=0.0,
                azure_endpoint=os.environ["AZURE_ENDPOINT_GIST_PROJECT_NORWAYEAST"],
                # use_azure_ad=True, # only useful for debugging purposes?
                api_version="2024-02-01",
                api_key=token_provider(),
                max_retries=4,
                timeout=240.0,
                reuse_client=False)

            self.token_counter = TokenCountingHandler(
                # both gpt-3.5-turbo and gpt-4 are based on the same cl100k_base encoding -> doesn't matter which model we use here
                # tokenizer=tiktoken.encoding_for_model("gpt-3.5-turbo").encode
                tokenizer=tiktoken.encoding_for_model("gpt-4").encode
            )

            self.semaphore = asyncio.Semaphore(
                max_parallel_llm_prompts_running)

        Settings.llm = self.llamaindex_llm

    def calculate_llm_calling_price(self, input_tokens, output_tokens):
        """
        Cost calculator
        based on prices from https://azure.microsoft.com/en-us/pricing/details/cognitive-services/openai-service/#pricing
        """

        if self.model_name == "gpt-4-1106-preview":
            return input_tokens / 1000 * 0.010 + output_tokens / 1000 * 0.029
        elif self.model_name == "gpt-35-turbo-16k":
            return input_tokens / 1000 * 0.0005 + output_tokens / 1000 * 0.0015
        else:
            return -1.0


In [10]:
class GRZ(BaseModel):
    value: Optional[float] = Field(
        None,
        description="Der numerische Wert der Grundflächenzahl oder 'null', falls nicht vorhanden.",
        example=0.75
    )

class GFZ(BaseModel):
    value: Optional[float] = Field(
        None,
        description="Der numerische Wert der Geschoßflächenzahl oder 'null', falls nicht vorhanden.",
        example=1.0
    )

class BuildingMetrics(BaseModel):
    
    grz: Optional[GRZ] = Field(None, description="Grundflächenzahl (GRZ)")
    
    gfz: Optional[GFZ] = Field(None, description="Geschoßflächenzahl (GFZ)")
    
class PromptRoleAndTask:
    """Describes LLM role and task for prompt."""

    role: str = "You are a helpful enviromental city planner. \n Based on the excerpt from a building plan provided below, we would like to extract following information.\n"

class PromptKpiDefinitions:
    """Provides definitions to each KPI in prompt."""

    definitions_string : str = read_json_to_str('./query/definitions.json')


In [11]:
@dataclass
class Llm_Extraction_Prompt:
    """
    The dataclass contains a prompt (=query text).
    Strategy: We make a single query to extract relevant info from BP.
    """

    """The dataclass contains a prompt (=query text) and a parser method for this prompt.
        Strategy: We make a single query to extract relevant info from BP.
"""

    role: Optional[str] = field(default=PromptRoleAndTask.role)
    KPIDefinitions: Optional[str] = field(default=PromptKpiDefinitions().definitions_string)
    #specifications: Optional[str] = field(default=prompt_specifications.specifications)

    def __init__(self, role=None, KPIDefinitions=None#, specifications=None
                 ):
        """Optional parameters allow default values to be loaded from a file if None is provided."""
        if role is None:
            role = PromptRoleAndTask.role  # Default role definition
        if KPIDefinitions is None:
            KPIDefinitions = PromptKpiDefinitions().definitions_string  # Default KPI definitions
#        if specifications is None:
#            specifications = prompt_specifications.specifications  # Default specifications

        self.query = f'{role}\n{KPIDefinitions}\n\
        Here is the excerpt: \n {{context_str}}'

        self.parser = PydanticOutputParser(output_cls=BuildingMetrics)

    def parse_gpt_output(self, gpt_question_output) -> pd.DataFrame:
        """Extract year, scope, value, and unit gpt_question_output using regular expressions."""

        building_metrics = self.parser.parse(gpt_question_output.message.content)

        #parsed_output = pd.DataFrame([entry.dict()
        #                             for entry in building_metrics.BuildingMetrics])

        return(building_metrics)


In [19]:
class BP_Metrics_Getter:
    """Retrieve emissions from a single document."""

    def __init__(self, llm, llm_single_prompt):
        self.llamaindex_llm = llm.llamaindex_llm
        self.llm_semaphore = llm.semaphore
        self.token_counter = llm.token_counter
        self.llm_single_prompt = llm_single_prompt
        self.start_time = time.perf_counter()

    async def _bound_get_emissions_from_raw_text(self, doc_text):
        """Getter function with semaphore."""

        async with self.llm_semaphore:
            current_time = datetime.now()
            cur_time = time.perf_counter()
            elapsed_time = cur_time - self.start_time
            print("Current Time:", current_time.strftime("%H:%M:%S"), "(", elapsed_time / 60,
                  "minutes since process started.)")
            
            res = await self._run_llm(doc_text)

            duration_time = time.perf_counter() - cur_time
            total_duration_time = time.perf_counter() - self.start_time
            print("LLM query execution time: " + str(duration_time) + " seconds (" +
                  str(total_duration_time) + " seconds since initiating ValueRetrieverPipeline).")
        return res


    async def _run_llm(self, doc_text: str):
        """Extract emissions from a single page of a document.

        Example code to extract emissions from a single PDF document

        embed_model = config.EmbeddingModel().embed_model
        llm = config.Llm(model_name="gpt-35-turbo-16k")
        llm_single_prompt = prompts_with_prompt_parsers.LlmSinglePromptQueryScope123()
        start_time = time.perf_counter()
        emission_getter = EmissionsGetter(
            llm=llm, llm_single_prompt=llm_single_prompt, start_time=start_time)

        pathname_list = glob.glob(os.path.join(
            path_to_data, "input-data/*.pdf"))
        doc = semantic_search.Pdfdoc(filename=filename)
        relevant_pages = doc.retrieve_relevant_pages(embed_model)

        output = await asyncio.gather(*(emission_getter.get_emissions_from_raw_text(doc_text=page.text) for page in relevant_pages))
        """

        prompt = self.llm_single_prompt.query
        parsing_instruction = self.llm_single_prompt.parser
        prompt_tmpl = PromptTemplate(template=f"{prompt}",
                                     output_parser=parsing_instruction)

        p = QueryPipeline(modules={"llm_prompt": prompt_tmpl,
                                   "llm": self.llamaindex_llm},
                          verbose=False)
        
        p.add_chain(["llm_prompt", "llm"])

        try:
            results = await p.arun_multi({"llm_prompt": {"context_str": doc_text}})
                                         
            raw_response = results["llm"]["output"]

        except RateLimitError as e:
            print(
                "A 429 status code (Rate Limit error) was received; we should back off a bit.")
            raw_response = "Create empty output table"

        except APIStatusError as e:

            raw_response = "Create empty output table"

            print("Another non-200-range status code was received")
            print(e.status_code)
            print(e.response)

        return raw_response
    
    def _parse_to_table_llm_output(self, llm_output):

        output_string = str(llm_output)

        parsed_output = self.llm_single_prompt.parse_gpt_output(output_string)

        return parsed_output

    def _parse_to_classes_llm_output(self, llm_output):

        parsed_output = self.llm_single_prompt.parse_gpt_output(llm_output)

        return parsed_output
        


In [13]:
if sample_mode:
    
    run_data = input_df.sample(sample_size, random_state=15)

else: 
 
    run_data = input_df

In [15]:
llm = Llm(model_name="gpt-4-1106-preview")

In [20]:
getter = BP_Metrics_Getter(llm = llm, llm_single_prompt = Llm_Extraction_Prompt())

In [21]:
nest_asyncio.apply()

extraction_results = await getter._bound_get_emissions_from_raw_text(run_data['content'][0])

Current Time: 10:33:45 ( 0.05705804166694482 minutes since process started.)
LLM query execution time: 5.4869639000389725 seconds (8.910447800066322 seconds since initiating ValueRetrieverPipeline).


In [25]:
parsed_extractions = getter._parse_to_classes_llm_output(extraction_results)

In [26]:
extraction_df = pd.concat(parsed_extractions)

TypeError: cannot concatenate object of type '<class 'tuple'>'; only Series and DataFrame objs are valid

In [17]:
OUTPUT_FLOODING_FILE_PATH = os.path.join("data", "proc", "building_plans_sample", "features",  "test_images_flooding_data_extraction.csv")
OUTPUT_EXTRACTIONS_FILE_PATH = os.path.join("data", "proc", "building_plans_sample", "features",  "test_images_info_data_extraction.csv")


extraction_df.to_csv(OUTPUT_EXTRACTIONS_FILE_PATH)
flooding_df.to_csv(OUTPUT_FLOODING_FILE_PATH)