In [1]:
from pyspark.sql import SparkSession
from langchain_text_splitters import RecursiveCharacterTextSplitter
import json
import os

In [2]:
print(os.getenv('ESG_REPORTS_FOLDER'))

C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-team6/data-pipelines/data/esg-pdf/


In [3]:
os.getcwd()

'c:\\Zhenjie\\University\\Y3S2\\dsa3101-ay2425s2-team6\\data-pipelines'

In [4]:
os.chdir(r'C:\Zhenjie\University\Y3S2\dsa3101-ay2425s2-team6\data-pipelines\esg-pdf-to-json')

In [5]:
%run ./esg_pdf_to_json.py

In [6]:
os.chdir(r'C:\Zhenjie\University\Y3S2\dsa3101-ay2425s2-team6\data-pipelines')

---

**CODE CHUNKS BELOW ARE OUTDATED RUN `test2.ipynb` instead**

In [7]:
from pyspark.sql import SparkSession
from langchain_text_splitters import RecursiveCharacterTextSplitter
import json
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf

In [8]:
# Initialize Spark NLP session which is built on Apache Spark for NLP tasks
spark = sparknlp.start()

In [9]:
# Load the JSON data
# When doing for loop, need check if the file is a json file before loading
with open('data/esg-json/2022_Citigroup_ESG_Report.json', 'r') as f:
    data = json.load(f)

In [10]:
# Loaded JSON data becomes a dictionary with key-value pairs
# The key is the filepath with page number and the value is the text content of the page
# We then convert this dictionary to a Spark DataFrame with two columns: `id`` and `text`
# Each key-value pair in the dictionary becomes a row in the DataFrame
spark_df = spark.createDataFrame(data.items(), ["id", "text"])

In [None]:
# Define the NLP Pipeline

# Converts the `text` column to `Document` type so that Spark NLP can process
documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

# Splits the `document` into smaller chunks of text
# `.setchunkSize()` parameter specifies the maximum number of characters in each chunk
# `.chunkOverlap()` parameter specifies the number of characters to overlap between chunks
# to avoid losing context.
documentSplitter = DocumentCharacterTextSplitter() \
    .setInputCols("document") \
    .setOutputCol("chunks") \
    .setChunkSize(200) \
    .setChunkOverlap(10)

# Tokenizes the `chunks` into individual words or tokens
tokenizer = Tokenizer() \
    .setInputCols("chunks") \
    .setOutputCol("tokens")

# Extracts keywords form the tokens using YAKE (Yet Another Keyword Extractor) algorithm
# `.setNKeywords(10)` specifies the top 10 keywords should be extracted
keywordExtractor = YakeKeywordExtraction() \
    .setInputCols("tokens") \
    .setOutputCol("keywords") \
    .setNKeywords(10)

# Converts the
finisher = Finisher() \
    .setInputCols("keywords") \
    #.setOutputAsArray(True) \
    # .setIncludeMetadata(True)

# Create the Spark NLP Pipeline with the defined stages
pipeline = Pipeline().setStages([
    documentAssembler,
    documentSplitter,
    tokenizer,
    keywordExtractor,
    finisher
])

# Fit the pipeline to the Spark DataFrame and transform the data, applying the NLP stages
output_df = pipeline.fit(spark_df).transform(spark_df)

# Displays the transformed DataFrame with the extracted keywords
output_df.show()

+--------------------+--------------------+--------------------+
|                  id|                text|   finished_keywords|
+--------------------+--------------------+--------------------+
|C:/Zhenjie/Univer...|What’s Inside\n2 ...|[o ur, o ur, clim...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[esg report, fina...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[sustainable fina...|
|C:/Zhenjie/Univer...|2022 Highlights\n...|[sustainable fina...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[across citi, cit...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[risk management,...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[net zero, net ze...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[esg report, esg ...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[esg governance, ...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[citi citi, stake...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[esg report, esg ...|
|C:/Zhenjie/Univer...|Citi 2022 ESG Rep...|[sustainable fina...|
|C:/Zhenjie/Univer...|Cit

In [20]:
from pyspark.sql.functions import concat_ws

# Renames the `text` column to `text_chunk` and `finished_keywords` to `tags`
# This is so that we don't confuse the `text` column with `text` 
# and `finished_keywords` with `keyword` data type in Elasticsearch later on
output_df = output_df.withColumnRenamed("text", "text_chunk") \
                     .withColumnRenamed("finished_keywords", "tags") 

# Converts the array of strings in the `tags` column to a single string
# with each keyword separated by a comma
output_df = output_df.withColumn("tags", concat_ws(", ", "tags"))

# Drop any rows with null values in the `tags` column
output_df = output_df.na.drop(subset=["tags"])

In [21]:
output_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- text_chunk: string (nullable = true)
 |-- tags: string (nullable = false)



In [24]:
# Save the Spark DataFrame as a CSV
output_df = output_df.toPandas()
output_df.to_csv("data/esg-csv/2022_Citigroup_ESG_Report_keywords.csv", index=False)

---

In [None]:
# Run this in command prompt to start running ElasticSearch locally
!docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1

In [1]:
import pandas as pd

In [2]:
SAMPLE_CSV_FILEPATH = './data/esg-csv/2022_Citigroup_ESG_Report_keywords.csv'

# Load the CSV file
df = pd.read_csv(SAMPLE_CSV_FILEPATH)
df.head()

Unnamed: 0,id,text_chunk,tags
0,C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-te...,What’s Inside\n2 A bout This Report\n3 L etter...,"o ur, o ur, climate risk, o ur, o ur, climate ..."
1,C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-te...,Citi 2022 ESG Report\nPage 2\nESG at Citi Sust...,"esg report, financial disclosures, financial d..."
2,C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-te...,Citi 2022 ESG Report\nPage 3\nESG at Citi Sust...,"sustainable finance, sustainable finance, sust..."
3,C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-te...,2022 Highlights\nCiti 2022 ESG Report\nPage 4\...,"sustainable finance, sustainable finance, citi..."
4,C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-te...,Citi 2022 ESG Report\nPage 5\nSustainable Fina...,"across citi, citi esg, esg priorities, esg pri..."


In [3]:
# For `tags` we use `text`` data type insted of 'keyword' data type
# because according to official Elasticsearch documentation,
# 'keyword' datatype is used when you require and exact value search like zip codes etc
# But in our case, our tags are a string of keywords separated by commas.
# The embeddings field is a dense vector of 384 dimensions because we
# are using 'all-MiniLM-L6-v2' emebedding model which produces vectors of 384 dimensions
# The embedding is obtained by passing the `text_chunk` through the embedding model

index_mapping = {
    "properties": {
        "text_chunk" : {"type" : "text", "analyzer": "standard"},
        "tags": {"type": "text", "analyzer": "standard"},
        "embedding": {
            "type": "dense_vector",
            "dims": 384,
            "similarity": "cosine"
        }
    }
}

In [4]:
from elasticsearch import Elasticsearch

# Initialize local Elasticsearch on port 9200
# Can also check manually that service is running by typing http://localhost:9200/ in browser
try:
    es = Elasticsearch('http://localhost:9200/')
except Exception as e:
    raise Exception(
        status_code=500, detail=f"Failed to connect to Elasticsearch: {str(e)}"
    )

In [5]:
######## INDEXING ########

from sentence_transformers import SentenceTransformer

# Load a pretrained embeddings model for RAG
# Used this instead of the OLlama embeddings as this one produces dense embeddings
# which are more suitable for the task of semantic search
# Initialize S-BERT model
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Function to get embeddings using S-BERT model
def get_embeddings(text):
    try:
        # Create embeddings and convert to list from as needed by Elasticsearch
        return embedding_model.encode(text).tolist()
    except Exception as e:
        print(f"Error fetching embeddings for text: {text}. Error: {str(e)}")
        return None

In [6]:
# Sample text to encode
sample_text = "This is a sample text"

# Encode the sample text
sample_text_embedding = get_embeddings(sample_text)

# Print the shape of vector embedding
# Remember this as it will be used to create index mapping for ElasticSearch later on
print(sample_text_embedding) # can see that the vector is of shape (384,)

[0.01002943329513073, 0.09550528228282928, -0.02096635289490223, 0.025162313133478165, 0.042095545679330826, 0.02539600059390068, 0.03265978768467903, 0.057843681424856186, 0.04338080435991287, 0.007828288711607456, 0.039830051362514496, -0.012325837276875973, 0.011496693827211857, -0.061468105763196945, 0.023514479398727417, 0.050312090665102005, 0.04488016292452812, -0.038764793425798416, -0.014595728367567062, 0.007335948292165995, 0.04716603830456734, 0.08016199618577957, 0.03173888474702835, 0.015792690217494965, -0.0031506221275776625, 0.04468901455402374, -0.023292778059840202, 0.08901121467351913, 0.11823391914367676, -0.007691589649766684, -0.04131343215703964, 0.07963008433580399, 0.17859436571598053, 0.026123905554413795, 0.06597477942705154, 0.021756844595074654, -0.05122711881995201, 0.06648340076208115, 0.03905222564935684, 0.048735518008470535, 0.008930870331823826, -0.08045852184295654, 0.043473612517118454, 0.014143373817205429, 0.023846764117479324, -0.036777436733245

  attn_output = torch.nn.functional.scaled_dot_product_attention(


In [7]:
index_name = 'esg_reports_demo'

# Create Elasticsearch index with mappings
def create_index():
    try:
        # Delete index if it already exists
        if es.indices.exists(index=index_name):
            es.indices.delete(index=index_name)
        # Create index with mapping
        es.indices.create(index=index_name, mappings = index_mapping)
        print(f"Index '{index_name}' created successfully!")

    except Exception as e:
        print(f"Error creating index '{index_name}': {str(e)}")

In [None]:
# es.indices.delete(index=index_name)

ObjectApiResponse({'acknowledged': True})

In [8]:
# Convert data into Elasticsearch format

actions = [
    {
        "_index": index_name,
        "id": row['id'],
        "_source": {
            "text_chunk": row['text_chunk'],
            "tags": row['tags'],
            "embedding": get_embeddings(row['text_chunk'])
        }
    }
    for _, row in df.iterrows()
]

print(actions)

[{'_index': 'esg_reports_demo', 'id': 'C:/Zhenjie/University/Y3S2/dsa3101-ay2425s2-team6/data-pipelines/data/esg-pdf/2022_Citigroup_ESG_Report.pdf_0', '_source': {'text_chunk': 'What’s Inside\n2 A bout This Report\n3 L etter from Our CEO\n4 2 022 Highlights\nESG at Citi\n5 E SG Across Citi\n6 E SG Governance at Citi\n8 O ur Material ESG Issues\n10 S takeholder Engagement at Citi\nSustainable Finance\n12 O ur $1 Trillion Goal\n22 F inancing the Low-Carbon Transition\n23 F inancing Social Impact\nClimate Risk  \nand Net Zero\n25 O ur Net Zero Commitment\n30 O ur Approach to Managing  \nClimate Risk\n32 R educing Climate Risk in  \nOur Financing.\nSustainable Operations\n34 O perational Footprint Goals\n36 Sus tainable and Healthy Buildings\n37 E fficient Travel\n37 M anaging Climate Risk in  \nOur Operations\n38 En vironmental Performance  \nfor Operations\nBuilding Equitable and \nResilient Communities\n43 Action for Racial Equity\n45 C iti Impact Fund\n46 S trategic Philanthropy:  \nTh

In [27]:
from elasticsearch import helpers

# Bulk index the data into Elasticsearch
helpers.bulk(es, actions)
print("Data indexed successfully!") 

Data indexed successfully!


In [4]:
# ============= SEARCHING =============

# Lexical Search Function
def lexical_search(query: str, top_k: int):
    '''Returns the top-k lexical search results for the given query'''
    # Search for query in 'text_chunk' and 'tags' fields
    lexical_results = es.search(
        index=index_name,
        body={
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["text_chunk", "tags"]
                }
            },
            "size": top_k
        },
        source_excludes=["embedding"],
    )

    lexical_hits = lexical_results['hits']['hits']
    max_bm25_score = max([hit["_score"] for hit in lexical_hits], default=1.0)

    # Normalize lexical scores
    for hit in lexical_hits:
        hit["_normalized_score"] = hit["_score"] / max_bm25_score

    return lexical_hits

In [5]:
# Semantic Search Function
def semantic_search(query: str, top_k: int):
    # Generate embeddings for the query using S-BERT
    query_embedding = get_embeddings(query)

    # Perform a cosine similarity search using the query embedding
    script_query = {
        "script_score": {
            "query": {"match_all": {}},
            "script": {
                "source": "cosineSimilarity(params.query_embedding, 'embedding') + 1.0",
                "params": {"query_embedding": query_embedding},
            },
        }
    }

    semantic_results = es.search(
        index=index_name,
        body={
            "query": script_query,
            "_source": {"excludes": ["embedding"]},
            "size": top_k,
        },
        source_excludes=["embedding"],
    )

    semantic_hits = semantic_results['hits']['hits']
    max_semantic_score = max([hit["_score"] for hit in semantic_hits], default=1.0)

    # Normalize semantic scores
    for hit in semantic_hits:
        hit["_normalized_score"] = hit["_score"] / max_semantic_score

    return semantic_hits

In [6]:
# Combine lexical and semantic search results using Reciprocal Rank Fusion (RRF)
def reciprocal_rank_fusion(lexical_hits, semantic_hits, k=60):
    '''
    k: The rank bias parameter (higher values reduce the impact of rank).
    '''
    rrf_scores = {}

    # Process lexical search results
    for rank, hit, in enumerate(lexical_hits, start=1):
        doc_id = hit["_id"]
        score = 1 / (k + rank) # Reciprocal Rank Fusion (RRF) score
        if doc_id in rrf_scores:
            rrf_scores[doc_id]["rrf_score"] += score
        else:
            rrf_scores[doc_id] = {
                "text_chunk": hit["_source"]["text_chunk"],
                "tags": hit["_source"]["tags"],
                "lexical_score": hit["_normalized_score"],
                "semantic_score": 0,
                "rrf_score": score,
            }

    # Process semantic search results
    for rank, hit in enumerate(semantic_hits, start=1):
        doc_id = hit["_id"]
        score = 1 / (k + rank) # RRF formula
        if doc_id in rrf_scores:
            rrf_scores[doc_id]["rrf_score"] += score
            rrf_scores[doc_id]["semantic_score"] = hit["_normalized_score"]
        else:
            rrf_scores[doc_id] = {
                "text_chunk": hit["_source"]["text_chunk"],
                "tags": hit["_source"]["tags"],
                "lexical_score": 0,
                "semantic_score": hit["_normalized_score"],
                "rrf_score": score,
            }

    # Sort by the RRF score in descending order
    sorted_results = sorted(
        rrf_scores.values(), key=lambda x: x["rrf_score"], reverse=True
    )

    return sorted_results

In [None]:
def remove_duplicates_and_rerank(lexical_hits, semantic_hits, rerank=False):
    combined_results = {}

    # Process lexical search results
    for hit in lexical_hits:
        doc_id = hit["_id"]
        combined_results[doc_id] ={
            "text_chunk": hit["_source"]["text_chunk"],
            "tags": hit["_source"]["tags"],
            "_normalized_score": hit["_normalized_score"], # store lexical normalized score
            "semantic_score": 0, # Default for semantic score
        }

    # Process semantic search results, checking for duplicates
    for hit in semantic_hits:
        doc_id = hit["_id"]
        if doc_id in combined_results:
            # If the document exists in both lexical and semantic results,
            # combine
            combined_results[doc_id]["semantic_score"] = hit["_normalized_score"]
            combined_results[doc_id]["_normalized_score"] += hit["_normalized_score"] # combine scores
        else:
            # If it's not a duplicate, add it with only semantic score
            combined_results[doc_id]  = {
                "text_chunk": hit["_source"]["text_chunk"],
                "tags": hit["_source"]["tags"],
                "_normalized_score": hit["_normalized_score"], # store semantic normalized score
                "semantic_score": hit["_normalized_score"],
            }
    
    # Convert the dictionary to a list of results
    result_list = list(combined_results.values())

    # Sort by combined _normalized_score if rerank is True
    if rerank:
        result_list = sorted(
            result_list, key=lambda x: x["_normalized_score"], reverse=True
        )

    return result_list

In [7]:
# Hybrid Search Function
def hybrid_search(query: str, lexical_top_k: int, semantic_top_k: int):
    # Get lexical and semantic search results
    lexical_hits = lexical_search(query, lexical_top_k)
    semantic_hits = semantic_search(query, semantic_top_k)
    # Combine using RRF
    combined_results = reciprocal_rank_fusion(lexical_hits, semantic_hits, k=60)
    return combined_results

In [8]:
# input_query = "What green objective does JPMorgan have?"
# input_query = "How much is allocated for JPMorgan's green objectives?"
# input_query = "What are the green objectives of JPMorgan?"
input_query = "What is Nubank's total tCO2 emissions?"

In [9]:
lexical_search(input_query, 3)

[{'_index': 'esg_reports_demo',
  '_id': 'EEllQpUBRcqPRASMy7HW',
  '_score': 5.89406,
  '_ignored': ['tags.keyword', 'text_chunk.keyword'],
  '_source': {'text_chunk': "29\nPeoplePlanet\nGovernance\nProsperity\nIntroduction\nAppendix2023 ESG Report\nClimate \nChange\nWe are committed to be carbon neutral forever \nand we have historically offset all of our \nemissions since our foundation. We measure \nour Greenhouse Gas (GHG) emissions based \non the Brazilian GHG Protocol Program. \nOur emissions inventory undergoes external \naudit and is published through the Registro \nPúblico de Emissões (Public Emissions Registry \nin Brazil), earning the gold seal since our \ncommitment and publication began in 2020. \nAfter measuring our emissions, we offset them \nthrough the purchase of carbon credits.\nOur Scope 1 and 2 emissions totaled 244.3 \ntCO2e (tons of carbon dioxide equivalent), \nreflecting a 52.9% increase compared to 2022. \nThey are primarily composed of the electricity \nconsu

In [10]:
semantic_search(input_query, 3)

  attn_output = torch.nn.functional.scaled_dot_product_attention(


[{'_index': 'esg_reports_demo',
  '_id': 'EUllQpUBRcqPRASMy7HW',
  '_score': 1.5033833,
  '_ignored': ['tags.keyword', 'text_chunk.keyword'],
  '_source': {'text_chunk': '30\nPeoplePlanet\nGovernance\nProsperity\nIntroduction\nAppendix2023 ESG Report\nT otal emissions  \n(tCO2e)\nIn this reporting cycle, we were \ninspired by the recommendations \nof the TCFD – Task Force on \nClimate-related Financial \nDisclosures to disclose information \nrelated to the management of \nclimate change and increase \ntransparency on how we are \nmonitoring our impact on this topic. \nGovernance\nWe have two forums in our \ngovernance, as described below, \ncomprised of our senior leadership \nto ensure that matters related to \nclimate risks and opportunities are \nreported and properly discussed \ninternally. The Board of Directors of \nNu Holdings serves as the ultimate \ngovernance instance, advised by \nthe Audit and Risk Committee, \nwhich assesses environmental risks \ncomprehensively, as well a

In [11]:
hybrid_search(input_query, 3, 3)

[{'text_chunk': "29\nPeoplePlanet\nGovernance\nProsperity\nIntroduction\nAppendix2023 ESG Report\nClimate \nChange\nWe are committed to be carbon neutral forever \nand we have historically offset all of our \nemissions since our foundation. We measure \nour Greenhouse Gas (GHG) emissions based \non the Brazilian GHG Protocol Program. \nOur emissions inventory undergoes external \naudit and is published through the Registro \nPúblico de Emissões (Public Emissions Registry \nin Brazil), earning the gold seal since our \ncommitment and publication began in 2020. \nAfter measuring our emissions, we offset them \nthrough the purchase of carbon credits.\nOur Scope 1 and 2 emissions totaled 244.3 \ntCO2e (tons of carbon dioxide equivalent), \nreflecting a 52.9% increase compared to 2022. \nThey are primarily composed of the electricity \nconsumption of our offices, air conditioning, \nand diesel fuel for the generators. The most \nsignificant increase is in scope 2, attributed to \nthe 'Nu Wa

In [13]:
retrieved_context = hybrid_search(input_query, 3, 3)[0]['text_chunk']

In [14]:
print(retrieved_context)

29
PeoplePlanet
Governance
Prosperity
Introduction
Appendix2023 ESG Report
Climate 
Change
We are committed to be carbon neutral forever 
and we have historically offset all of our 
emissions since our foundation. We measure 
our Greenhouse Gas (GHG) emissions based 
on the Brazilian GHG Protocol Program. 
Our emissions inventory undergoes external 
audit and is published through the Registro 
Público de Emissões (Public Emissions Registry 
in Brazil), earning the gold seal since our 
commitment and publication began in 2020. 
After measuring our emissions, we offset them 
through the purchase of carbon credits.
Our Scope 1 and 2 emissions totaled 244.3 
tCO2e (tons of carbon dioxide equivalent), 
reflecting a 52.9% increase compared to 2022. 
They are primarily composed of the electricity 
consumption of our offices, air conditioning, 
and diesel fuel for the generators. The most 
significant increase is in scope 2, attributed to 
the 'Nu Way of Working,' the hybrid work model 
adopte

In [15]:
from langchain_core.prompts import PromptTemplate

template = """
You are an AI assistant that helps users find information in ESG reports and answer questions about them.
If the answer is not in the context, say "I don't know".

Context: {context}
Question: {question}
Answer: 
"""

prompt = PromptTemplate(template=template, input_variables=["context", "question"])

In [16]:
from langchain_ollama.llms import OllamaLLM

llm = OllamaLLM(model='llama3.2')


In [17]:
from langchain_core.output_parsers import StrOutputParser

rag_chain = prompt | llm | StrOutputParser()

response = rag_chain.invoke({"context": retrieved_context, "question": input_query})
print(response)

Nubank's total tCO2e emissions for 2023 were 256.6tCO2e (244.3 Scope 1 and 2 emissions + 12,345.5 scope 3 emissions).
