In [None]:
# Data scraping relevant information from XML files

import xml.etree.ElementTree as ET
import re
from nltk.tokenize import sent_tokenize
import nltk

# Download the required NLTK data
nltk.download('punkt')

# Define the namespace for VALORANT Fandom Wiki XML
ns = {'mw': 'http://www.mediawiki.org/xml/export-0.11/'}

# Parse the XML file
tree = ET.parse("valorant_pages.xml")  # Replace with your file name
root = tree.getroot()


def clean_text(text):
    """Removes unwanted characters like [[...]], {{...}}, and HTML tags from the text."""
    text = re.sub(r'\[\[.*?\]\]', '', text)
    text = re.sub(r'\{\{.*?\}\}', '', text)
    text = re.sub(r'<.*?>', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text


def create_chunks(text, max_chunk_size=300):
    """Splits the cleaned text into smaller chunks for embedding."""
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_length = 0
    for sentence in sentences:
        sentence_length = len(sentence.split())
        if current_length + sentence_length <= max_chunk_size:
            current_chunk.append(sentence)
            current_length += sentence_length
        else:
            chunks.append(" ".join(current_chunk))
            current_chunk = [sentence]
            current_length = sentence_length
    if current_chunk:  # Add the last chunk
        chunks.append(" ".join(current_chunk))
    return chunks


def extract_section(text, start, end=None):
    """Extracts text between given section headers."""
    if start in text:
        if end:
            match = re.search(rf'{start}\n(.*?)(?={end}|$)', text, re.DOTALL)
        else:
            match = re.search(rf'{start}\n(.*?)(==|$)', text, re.DOTALL)
        if match:
            return match.group(1)
    return ""


# List to hold all chunks from all sections
all_chunks = []

# Iterate through <text> tags within the namespace
for text_element in root.findall(".//mw:text", ns):
    text_content = text_element.text
    # Check if the content exists
    if text_content:
        cleaned_summary = cleaned_description = cleaned_official_desc = cleaned_weapon = cleaned_pro_play = cleaned_terminology = None
        # Extract different sections using the extract_section function
        summary_text = extract_section(text_content, "==Summary==")
        description_text = extract_section(text_content, "==Description==")
        official_desc_text = extract_section(text_content, "Official description}}", "==Agent Cosmetics==")
        weapon_text = extract_section(text_content, "==List of Weapons==", "== Primary ==")
        pro_play_text = extract_section(text_content, "===Pro Play===")
        terminology_text = extract_section(text_content, "== Official Terminology ==", "==Navigation==")
        # Clean the extracted text
        if summary_text:
            cleaned_summary = clean_text(summary_text)
            summary_chunks = create_chunks(cleaned_summary)
            all_chunks.extend(summary_chunks)  # Add to all_chunks
        if description_text:
            cleaned_description = clean_text(description_text)
            description_chunks = create_chunks(cleaned_description)
            all_chunks.extend(description_chunks)
        if official_desc_text:
            cleaned_official_desc = clean_text(official_desc_text)
            official_desc_chunks = create_chunks(cleaned_official_desc)
            all_chunks.extend(official_desc_chunks)
        if weapon_text:
            cleaned_weapon = clean_text(weapon_text)
            weapon_chunks = create_chunks(cleaned_weapon)
            all_chunks.extend(weapon_chunks)
        if pro_play_text:
            cleaned_pro_play = clean_text(pro_play_text)
            pro_play_chunks = create_chunks(cleaned_pro_play)
            all_chunks.extend(pro_play_chunks)
        if terminology_text:
            cleaned_terminology = clean_text(terminology_text)
            terminology_chunks = create_chunks(cleaned_terminology)
            all_chunks.extend(terminology_chunks)  # Add to all_chunks

# all_chunks contains all the cleaned and chunked text data from the XML file
print(all_chunks)


In [None]:
# Chunking and embedding the text data into vectors for our Knowledge Base

from sagemaker.huggingface import HuggingFaceModel
import boto3
import json


def chunk_data(parsed_data, chunk_size=200):
    """
    Chunk parsed data into segments based on word count.
    :param parsed_data: List of strings or paragraphs that have been parsed from XML.
    :param chunk_size: Desired size of each chunk (based on word count).
    :return: List of text chunks.
    """
    chunks = []
    current_chunk = []
    for section in parsed_data:
        words = section.split()
        for word in words:
            current_chunk.append(word)
            if len(current_chunk) >= chunk_size:
                chunks.append(' '.join(current_chunk))
                current_chunk = []    
    # Append any remaining words as the last chunk
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    return chunks
    #print(words)
    #return chunks

final_chunks = chunk_data(all_chunks)

### Step 2 ###
# Embed Chunks with Sagemaker

# Hugging Face model details
hub = {
    'HF_MODEL_ID':'sentence-transformers/all-MiniLM-L12-v2',  # You can replace with other text embedding models
    'HF_TASK':'feature-extraction'
}

# Create Hugging Face Model
huggingface_model = HuggingFaceModel(
    transformers_version='4.26.0',
    pytorch_version='1.13.1',
    py_version='py39',
    env=hub,
    role='arn:aws:iam::345594564754:role/HuggingFaceRole',  # Replace with your IAM role
)

# Deploy the model to a SageMaker endpoint
predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",  # Choose a different instance type
    endpoint_name = "huggingface-embed-model-endpoint-eight"
)

sagemaker_runtime = boto3.client('runtime.sagemaker')


def embed_chunk_hf(chunk, endpoint_name):
    """
    Send a chunk of text to the Hugging Face endpoint for embedding.   
    :param chunk: A string of text to be embedded.
    :param endpoint_name: The name of the SageMaker endpoint serving the Hugging Face model.
    :return: The embedding for the chunk.
    """
    payload = {"inputs": chunk}    
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='application/json',
        Body=json.dumps(payload)
    )    
    result = json.loads(response['Body'].read().decode('utf-8'))
    return result


embeddings = [embed_chunk_hf(final_chunks, "huggingface-embed-model-endpoint-eight") for chunk in final_chunks]