# ESGReveal:An LLM-based approach for extracting structured data from ESG reports

## 1. Setting up

#### I used **LangChain** as the main framework to implement the functionalities described in the paper. LangChain provides a modular approach to working with Large Language Models (LLMs), making it highly adaptable for tasks such as document preprocessing, metadata tagging, and retrieval-augmented generation (RAG).


#### Why LangChain?

1. **Ease of Integration with LLMs**:
   - The paper relies heavily on LLMs for extracting and structuring ESG data. LangChain simplifies the integration process with APIs like OpenAI or other LLM providers.

2. **RAG Framework**:
   - LangChain provides native support for **Retrieval-Augmented Generation (RAG)** workflows, which align closely with the paper's emphasis on metadata tagging, data retrieval, and structured response generation.

3. **Document Preprocessing**:
   - The framework supports easy preprocessing of unstructured documents, like PDF files, using loaders such as `PyPDFLoader`.

4. **Flexibility and Scalability**:
   - LangChain is modular, allowing me to add custom logic for metadata tagging, question answering, and document segmentation as per the paper's requirements.




In [1]:
import sys
import os
import time
import json
import pandas as pd
# add parent path as sys path (to import python files)
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(parent_dir)

In [2]:
import openai
from getpass import getpass

os.environ["OPENAI_API_KEY"] = "Open_ai_apikey"

In [3]:
import os

# checking directory
current_directory = os.getcwd()
print(f"현재 디렉토리: {current_directory}")

현재 디렉토리: /home/jovyan/samchully/jhn


In [4]:
new_directory = "/home/jovyan/samchully/jhn/esg_inc"
os.chdir(new_directory)

# checking altered directory
print(f"변경된 디렉토리: {os.getcwd()}")

변경된 디렉토리: /home/jovyan/samchully/jhn/esg_inc


In [5]:
from langchain_community.document_loaders import PyPDFLoader
import pytesseract
import os

directory_path = new_directory

pdf_files = [
    os.path.join(directory_path, file) 
    for file in os.listdir(directory_path) 
    if file.endswith(".pdf")
]

def clean_invalid_unicode(text):
    """
    문자열에서 유효하지 않은 Unicode 대리 페어를 제거합니다.
    """
    return text.encode('utf-8', 'ignore').decode('utf-8', 'ignore')


In [6]:
from langchain_community.document_loaders import PyPDFLoader
import fitz  # PyMuPDF
import re
from pdf2image import convert_from_path
import pytesseract

# Define default values
DEFAULT_CHUNK_SIZE = 1000
DEFAULT_CHUNK_OVERLAP = 200

## 2. Parse Text

#### LangChain’s PyPDFLoader is used to extract text efficiently from PDFs with standard text encoding, preserving metadata such as page numbers and sources. For scanned or complex PDFs where text extraction fails, OCR serves as a fallback to ensure complete data retrieval. This hybrid approach validates content using custom functions to clean irrelevant symbols and assess quality, determining whether to use direct extraction or OCR. Text is processed page by page, cleaned, and combined with metadata for usability. Future improvements include optimizing OCR for multilingual text, enhancing scalability for large datasets, and enriching metadata with advanced features like table detection. This ensures reliable text extraction across diverse document types.

In [7]:
def remove_non_korean_english_symbols(text):
    return re.sub(r"[^가-힣a-zA-Z0-9\s]", "", text)

def cid_ratio(text):
    if len(text) <= 0:
        return -1
    else:
        return len(re.findall(r"\(cid:\d+\)", text)) / len(text)

def extract_partial_text(pdf_path, max_pages=None, max_chars=None):
    doc = fitz.open(pdf_path)
    total_pages = len(doc)
    max_pages = min(max_pages or total_pages, total_pages)
    extracted_text = ""
    for page_num in range(max_pages):
        page = doc[page_num]
        extracted_text += page.get_text()
        if max_chars and len(extracted_text) >= max_chars:
            return extracted_text[:max_chars].strip()
    return extracted_text.strip()

def pdf_cid_ratio(file_path):
    try:
        text = extract_partial_text(file_path, max_chars=10240)
    except Exception as e:
        print(f"Error extracting text: {e}")
        return None
    return cid_ratio(text) if text.strip() else -1

def pdf_clean_text_ratio(pdf_path):
    doc = fitz.open(pdf_path)
    orig_text = ""
    clean_text = ""
    for page_num in range(len(doc)):
        page = doc[page_num]
        text = page.get_text()
        orig_text += text
        clean_text += remove_non_korean_english_symbols(text)
        if len(orig_text) > 10240:
            break
    return len(clean_text) / len(orig_text) if len(orig_text) > 0 else -1

def pdf_is_valid(filename):
    retval1 = pdf_cid_ratio(filename)
    if retval1 is None:
        return False
    retval2 = pdf_clean_text_ratio(filename)
    return retval1 < 0.1 and retval2 > 0.5

def extract_text_from_pdf(pdf_path):
    text_with_pages = []
    if pdf_is_valid(pdf_path):
        doc = fitz.open(pdf_path)
        for page_num in range(len(doc)):
            page = doc[page_num]
            text = remove_non_korean_english_symbols(page.get_text())
            text_with_pages.append((page_num + 1, text))
    else:
        pages = convert_from_path(pdf_path)
        for page_num, page in enumerate(pages):
            text = pytesseract.image_to_string(page, lang="eng+kor")
            text_with_pages.append((page_num + 1, text))
    return text_with_pages

In [8]:
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(temperature=0, model="gpt-4o") 

In [9]:
from typing import Literal, List, Optional

import json
from pydantic import BaseModel, Field
from langchain import hub


class ESGProperties(BaseModel):
    # category
    aspect: Literal["Environmental", "Social", "Governance"]
    # performance indicator
    kpi: str = Field(description="The key performance indicator")
    # topic
    topic: Optional[str] = Field(description="The topic under the KPI")
    # metic quantity
    quantity: Optional[dict] = Field(
        description=(
            "A dictionary containing a numeric value and its corresponding unit. "
            "Common metrics include environmental,social,financial, and operational data. "
            "Values should be numeric, and units should reflect the measurement context "
            "such as tons, percentages, monetary units, or hours."
        ),
        example=[
            {"value": 1500, "unit": "metric tons"},  # 
            {"value": 85, "unit": "percent"},  # %
            {"value": 200000, "unit": "USD"},  # currency
            {"value": 1000, "unit": "m³"},  # 
            {"value": 40, "unit": "hours"},  # hours
            {"value": 0, "unit": "cases"},  # cases
        ]
    )
    # addtional information
    search_terms: Optional[List[str]] = Field(
        description="Search terms related to the document"
    )

In [10]:
from langchain.schema import Document
from langchain.chat_models import ChatOpenAI
from langchain_community.document_transformers.openai_functions import (
    create_metadata_tagger,
)
# OpenAI 모델 설정
llm = ChatOpenAI(temperature=0, model="gpt-4")

# 메타데이터 태거 생성
esg_tagger = create_metadata_tagger(ESGProperties, llm)

  llm = ChatOpenAI(temperature=0, model="gpt-4")


In [11]:
# pdf file lists 
all_documents = []

for pdf_file in pdf_files:  
    try:
        loader = PyPDFLoader(file_path=pdf_file)
        documents = loader.load()

        for doc in documents:
            # if there is no page content or empty page content try to extract text using OCR
            if not doc.page_content or not doc.page_content.strip():
                # text extraction using OCR
                text_with_pages = extract_text_from_pdf(pdf_file)

                # OCR mapping to the document
                for page_num, text in text_with_pages:
                    if page_num == doc.metadata.get("page"):
                        ocr_doc = Document(
                            page_content=text,
                            metadata={"page": page_num, "source": pdf_file}
                        )
                        # append to all documents
                        all_documents.append(ocr_doc)
                        break  # complete mapping on the page
            else:
                # no need to extract text using OCR
                all_documents.append(doc)

    except Exception as e:
        print(f"Error processing {pdf_file}: {e}")

## 3. Meta Data tagger

#### Configure ESG metrics and incorporate metadata from each metric into code

In [12]:
tagged_documents = []
for doc in all_documents:
    try:
        result = esg_tagger.transform_documents([doc])
        tagged_documents.extend(result)
    except Exception as e:
        print(f"Error tagging document: {doc.metadata}")
        print(e)

# 최종 결과 확인
print(f"Tagged {len(tagged_documents)} documents successfully.")

Tagged 106 documents successfully.


In [13]:
#tagged_documents

In [14]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 토큰 최적화 하면서, 텍스트 구조를 최대한 보존하면서 분할하기 때문

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size = 1000,
    chunk_overlap = 50
)

In [15]:
from langchain.schema import Document

split_documents = []

for document in tagged_documents:
    # tuple에서 'content'와 'metadata'를 가져와 Document 객체로 변환
    if isinstance(document, dict):  # OCR 처리 결과일 경우
        doc = Document(page_content=document["content"], metadata={"page": document["page"]})
    elif isinstance(document, Document):  # 이미 Document 객체일 경우 그대로 사용
        doc = document
        doc.page_content = clean_invalid_unicode(doc.page_content)
    else:  # document가 문자열일 경우
        doc = Document(page_content=clean_invalid_unicode(document), metadata={})
    split_docs = text_splitter.split_documents([doc]) #list 형태로 해야함 
    #print(split_docs)
    for split_docs in split_docs:
        split_documents.append(split_docs)
    
print(len(split_documents))

221


In [16]:
import json

# 메타데이터 저장
with open("esg_metadata.json", "w", encoding="utf-8") as file:
    json.dump([doc.metadata for doc in tagged_documents], file, ensure_ascii=False, indent=4)

## 4. Vector Store

In [17]:
import faiss
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_openai import OpenAIEmbeddings

In [18]:
embeddings = OpenAIEmbeddings()

In [19]:
# DB 생성
db = FAISS.from_documents(documents=split_documents, embedding=OpenAIEmbeddings())

In [20]:
# FAISS 인덱스를 파일에 저장
faiss.write_index(db.index, "faiss_index.index")

In [21]:
# 파일에서 FAISS 인덱스 로드
index = faiss.read_index("faiss_index.index")

In [22]:
# 문서 저장소 ID 확인
#db.index_to_docstore_id

# 문서 저장내용
#db.docstore._dict

In [32]:
from langchain_community.document_loaders.csv_loader import CSVLoader

directory_path = "/home/jovyan/samchully/jhn/output"

# 디렉터리 내의 모든 CSV 파일 필터링
csv_files = [os.path.join(directory_path, file) for file in os.listdir(directory_path) if file.endswith('.csv')]

all_data = []

for csv_file in csv_files:
    loader = CSVLoader(file_path = csv_file)
    data = loader.load()
    all_data.append(data) 


In [35]:
print(type(all_data))

<class 'list'>


In [38]:
tagged_tables = []
for doc in all_data:
    try:
        result = esg_tagger.transform_documents(doc)
        tagged_documents.extend(result)
    except Exception as e:
        print(f"Error tagging document: {doc.metadata}")
        print(e)

# 최종 결과 확인
print(f"Tagged {len(tagged_tables)} documents successfully.")

Tagged 0 documents successfully.


In [39]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# all_data는 리스트 형태로 여러 파일의 데이터를 포함하므로 이를 문자열로 변환
all_text = "\n".join(str(data) for data in all_data)  # 데이터를 문자열로 변환 및 병합

# 텍스트 분할기 생성
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=100,
)

# 텍스트 분할
texts = text_splitter.split_text(all_text)

In [41]:
db.add_texts(texts)
# FAISS 인덱스를 파일에 저장
faiss.write_index(db.index, "final_faiss_index.index")

In [42]:
# 파일에서 FAISS 인덱스 로드
index = faiss.read_index("final_faiss_index.index")

## 5. Retriever & Reranker

In [43]:
retriever = FAISS.from_documents(split_documents,embeddings).as_retriever(search_kwargs ={'k': 1})
query = "ESG 성과 지표에 대해 알려줘 json 형식으로"
docs = retriever.invoke(query)
print(docs)

[Document(metadata={'aspect': 'Governance', 'kpi': 'ESG Management', 'topic': 'ESG Strategy', 'quantity': None, 'search_terms': ['ESG', 'Governance', 'Strategy', 'Jeju Air'], 'source': '/home/jovyan/samchully/jhn/esg_inc/(주) 제주항공 2024 지속가능경영보고서.pdf', 'page': 13}, page_content='ESG\n전략방향\n내∙외부 이해관계자 행복 추구\n 경영 활동 내 위험 요소 최소화\n 친환경 비즈니스 포트폴리오 강화\n내∙외부 이해관계자 행복 추구 투명한 거버넌스  생태계 구축 환경경영체계 구축 UN SDGs\n연계')]


In [44]:
def pretty_print_docs(docs):
    print(
        f"\n{'-' * 100}\n".join(
            [f"Document {i+1}:\n\n" + d.page_content for i, d in enumerate(docs)]
        )
    )

In [45]:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

# 모델 초기화
model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")

# 상위 3개의 문서 선택
compressor = CrossEncoderReranker(model=model, top_n=1)

# 문서 압축 검색기 초기화
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

  from tqdm.autonotebook import tqdm, trange
