> Israel Llorens <br/>
> Lead Data Science

In [1]:
import nest_asyncio
nest_asyncio.apply()

import os

os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES" # if you are running in macos Apple chip
os.environ["TOKENIZERS_PARALLELISM"] = "false"

import openai
import init

from Framework.spark import spark
import pyspark.sql.functions as F

# Suppress the specific FutureWarning
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

from dotenv import load_dotenv

load_dotenv()

from pinecone import Pinecone, ServerlessSpec
import hashlib

from datetime import datetime

25/06/02 15:13:09 WARN Utils: Your hostname, Airon.local resolves to a loopback address: 127.0.0.1; using 192.168.100.57 instead (on interface en0)
25/06/02 15:13:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/02 15:13:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Rag - Retrieval part

### Load the text and prepare the embeddings

In [2]:
text_df = spark.read.text('data/*.txt')\
    .withColumn("filename", F.split(F.input_file_name(), os.sep).getItem( F.size(F.split(F.input_file_name(), os.sep))-1) )\
    .filter(F.col('value')!='')
    
# Trocear por frases
sentence_df = text_df.withColumn(
    "sentence", F.explode(F.split(F.col("value"), r'(?<=[\.\!\?])\s+'))
).drop('value')

In [3]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType
from transformers import AutoTokenizer, AutoModel
import torch

# Define output schema
schema = StructType([StructField("embedding", ArrayType(FloatType()))])

# Pandas UDF - load model inside to avoid broadcast
@pandas_udf(schema)
def embed_text(text_series: pd.Series) -> pd.DataFrame:
    import warnings
    warnings.filterwarnings("ignore", category=FutureWarning)
    
    # Load model inside UDF (once per executor)
    if not hasattr(embed_text, 'local_model'):
        embed_text.model_name = "BAAI/bge-large-en-v1.5"
        embed_text.tokenizer = AutoTokenizer.from_pretrained(embed_text.model_name)
        embed_text.model = AutoModel.from_pretrained(embed_text.model_name)
        embed_text.model.eval()
    
    local_tokenizer = embed_text.tokenizer
    local_model = embed_text.model
    
    device = torch.device("cpu")
    local_model.to(device)
    
    try:
        # Convert Series to list
        texts = text_series.tolist()
        
        # Tokenize batch
        inputs = local_tokenizer(
            texts,
            padding=True,
            truncation=True,
            return_tensors="pt",
            max_length=512
        )
        
        # Move to device
        inputs = {k: v.to(device) for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = local_model(**inputs)
            embeddings = outputs.last_hidden_state[:, 0, :]  # [CLS] token
            embeddings_np = embeddings.cpu().numpy().astype('float32')
        
        return pd.DataFrame({
            "embedding": [vec.tolist() for vec in embeddings_np]
        })
        
    except Exception as e:
        print(f"Error in embed_text: {e}")
        return pd.DataFrame({
            "embedding": [[0.0] * 1024] * len(text_series)
        })

In [None]:
# Apply to dataframe
sentence_df = sentence_df.withColumn('embedding', embed_text("sentence"))

In [4]:
sentence_df.show(4)

+--------+--------------------+--------------------+
|filename|            sentence|           embedding|
+--------+--------------------+--------------------+
| 163.txt|Further growth in...|{[0.026753383, 0....|
| 163.txt|Germany: strong c...|{[-0.8266075, 0.4...|
| 163.txt|T-Mobile US: syne...|{[0.23296036, 0.4...|
| 163.txt|Europe: growth de...|{[-0.20402703, -0...|
+--------+--------------------+--------------------+
only showing top 4 rows



### Embeddings to local FAISS vector database

In [None]:
embeddings_list = sentence_df.select("embedding").rdd.flatMap(lambda x: x).collect()
embeddings_np = np.array(embeddings_list, dtype='float32')

In [10]:
embeddings = np.array([row.embedding for row in embeddings_list], dtype='float32')

In [11]:
import faiss

# Get dimension
dimension = embeddings.shape[1]  # Should be 1024 for BGE-large
print(f"Embedding dimension: {dimension}, Total vectors: {embeddings.shape[0]}")

# Create HNSW index with L2 normalization for inner product
M = 32  # Higher = more accurate but more memory
faiss.normalize_L2(embeddings)  # Normalize for cosine similarity via inner product
index = faiss.IndexHNSWFlat(dimension, M, faiss.METRIC_INNER_PRODUCT)

# Set efConstruction (controls build time/accuracy)
index.hnsw.efConstruction = 40  # Typical: 40-100

# Add vectors
print("Adding vectors to index...")
index.add(embeddings)

# Set efSearch at query time (higher = more accurate but slower)
index.hnsw.efSearch = 64

print(f"Index total vectors: {index.ntotal}")

# Save the index
faiss.write_index(index, "embedding_vector_database.faiss")

Embedding dimension: 1024, Total vectors: 12355
Adding vectors to index...
Index total vectors: 12355


In [12]:
sentence_df.count()

12355

In [17]:
collected_data = sentence_df.select("filename", "sentence", "embedding").collect()

# Extract embeddings and metadata
filenames = [row.filename for row in collected_data]
sentences = [row.sentence for row in collected_data]

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)


In [19]:
import pickle
metadata = {"filenames": filenames, "sentences": sentences}
with open("embedding_metadata.pkl", "wb") as f:
    pickle.dump(metadata, f)

### Use the embedding from local file

In [14]:
# Load model once globally
from transformers import AutoTokenizer, AutoModel
import torch

model_name = "BAAI/bge-large-en-v1.5"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
model.eval()
device = torch.device("cpu")
model.to(device)

def get_embedding(query):
    """Get embedding using pre-loaded model"""
    inputs = tokenizer(
        query,
        padding=True,
        truncation=True,
        return_tensors="pt",
        max_length=512
    )
    
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
        embedding = outputs.last_hidden_state[0, 0, :]
        embedding_np = embedding.cpu().numpy().astype('float32')
    
    return embedding_np.reshape(1, -1)

In [18]:
# Load index and metadata
index = faiss.read_index("embedding_vector_database.faiss")

# Search example
query = "What does Claudia Nemat says"
query_embedding = get_embedding(query)  # Use same model
faiss.normalize_L2(query_embedding)
D, I = index.search(query_embedding, k=5)  # Top 5 results

for i, (distance, idx) in enumerate(zip(D[0], I[0])):
    print(f"{i+1}. Score: {distance:.4f}")
    print(f"   File: {filenames[idx]}")
    print(f"   Text: {sentences[idx]}\n")

1. Score: 0.6574
   File: 165.txt
   Text: Claudia Nemat, Member of the Board of Management for Technology, and Innovation will be reporting live from Barcelona on February 27 at 2:00 p.m.

2. Score: 0.6352
   File: 165.txt
   Text: Schneier will discuss this in a talk with Telekom board member Claudia Nemat.

3. Score: 0.6257
   File: 74.txt
   Text: Statements

4. Score: 0.6233
   File: 203.txt
   Text: in their daily lives," explains Claudia Nemat, Board Member for Technology and Innovation at Telekom.

5. Score: 0.6168
   File: 158.txt
   Text: Claudia Nemat will be reporting live from Barcelona with the "Magenta Keynote" on February 27, at 2:00 p.m.



## Use another technique, instead of FAISS ise Pinecone online service Vector Database

There are many ways to insert and find your embeddings, we can use FAISS library, like we did before, or we can use the Free Tier from an online Vector Database named Pinecone.

In this case scenario, I will be using OPENAI API KEY to generate the embeddings, and upload them into the Vector Database. All the needed coded is added to the Helper Framework inside the utils namespace.

### Pinecone accessing tests

In [7]:
from Framework.utils.pinecone_helper import *

In [8]:
index = create_index()

In [9]:
try:
    delete_all(index)
except:
    pass

In [10]:
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {},
 'total_vector_count': 0,
 'vector_type': 'dense'}

In [None]:
_id, embedding, metadata = prepare_for_pinecone(['This is the first text uploaded to pinecone'])[0]

In [5]:
print('ID:  ',_id, '\nLEN: ', len(embedding), '\nMETA:', metadata)

ID:   c07cc86f9c050186c54b14b81cdc6988 
LEN:  1536 
META: {'text': 'This is the first text uploaded to pinecone', 'date_uploaded': '2025-06-02 11:05:54.827826+00:00'}


In [None]:
upload_texts_to_pinecone(['This is the first text uploaded to pinecone'], index)

1

In [None]:
query_from_pinecone("This is a test", index)

[{'id': 'c07cc86f9c050186c54b14b81cdc6988',
  'metadata': {'date_uploaded': '2025-06-02 11:08:25.570838+00:00',
               'text': 'This is the first text uploaded to pinecone'},
  'score': 0.419925719,
  'values': []}]

###

In [5]:
import pyspark.sql.types as T

@F.udf(T.IntegerType())
def text_to_pinecone(text, filename, api_key):
    from Framework.utils.pinecone_helper import create_index, upload_texts_to_pinecone
    
    index = create_index(api_key)
    upsert_count = upload_texts_to_pinecone([text], index, document=filename)
    return upsert_count

sentence_df = sentence_df.withColumn('n_uploaded', text_to_pinecone('sentence', 'filename', F.lit(os.environ.get('PINECONE_API_KEY'))) )

If at this point we show the website pinecone.io, we will see each sentence being uploaded, with the document that it belongs to as metadata.

In [6]:
sentence_df.show(3)

+--------+--------------------+--------------------+----------+
|filename|            sentence|           embedding|n_uploaded|
+--------+--------------------+--------------------+----------+
| 163.txt|Further growth in...|{[0.026753383, 0....|         1|
| 163.txt|Germany: strong c...|{[-0.8266075, 0.4...|         1|
| 163.txt|T-Mobile US: syne...|{[0.23296036, 0.4...|         1|
+--------+--------------------+--------------------+----------+
only showing top 3 rows



In [7]:
from Framework.utils.pinecone_helper import create_index, delete_all
try:
    index = create_index()
    delete_all(index)
except:
    pass

In [8]:
sentence_df.filter(F.length('sentence')>5).write.mode('overwrite').parquet('db_output');
# sentence_df.filter(F.length('sentence')>5).count()