In [1]:
import requests
from bs4 import BeautifulSoup
import fitz
from transformers import GPT2TokenizerFast
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pprint

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Load tokenizer
tokenizer_gpt = GPT2TokenizerFast.from_pretrained("gpt2")

def count_tokens(text: str) -> int:
    return len(tokenizer_gpt.encode(text))

# Step 1: Extract text from webpage
def read_webpage(url: str, class_type: str, class_name: str) -> str:
    """Extract text from a web page and return it as a single string."""
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch page: {url}")

    soup = BeautifulSoup(response.text, "html.parser")
    main_content = soup.find(class_type, class_=class_name)

    if main_content:
        return main_content.get_text(separator="\n", strip=True)
    else:
        return ""

def read_pdf(file_path: str) -> str:
    """Extract text from a PDF file and return it as a single string."""
    text = ""
    try:
        with fitz.open(file_path) as pdf:
            for page_num in range(len(pdf)):
                page = pdf.load_page(page_num)
                text += page.get_text("text")  # Extract text from each page
    except Exception as e:
        print(f"❌ Error reading {file_path}: {e}")
        return ""
    
    return text

def process_webpage(url: str, page_name: str, class_type: str, class_name: str):
    """Process webpage text and split it into chunks using RecursiveCharacterTextSplitter."""
    
    # Extract text from webpage
    text = read_webpage(url, class_type, class_name)
    
    if not text:
        print(f"No content extracted from {url}")
        return
    
    # Initialize text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=256,  # Size of chunks (in tokens)
        chunk_overlap=24,  # Tokens overlap between chunks
        length_function=count_tokens,  # Function to count tokens in each chunk
    )

    # Split text into chunks
    chunks = text_splitter.create_documents([text])
    chunks = [chunk.page_content for chunk in chunks]

    print(f"Processed {page_name}: {len(chunks)} chunks")

    # Store processed data in dictionary
    processed_web_data[page_name] = chunks

def process_pdf(file_path: str, pdf_name: str):
    """Process PDF text and split it into chunks using RecursiveCharacterTextSplitter."""
    
    # Extract text from PDF
    text = read_pdf(file_path)
    
    if not text:
        print(f"No content extracted from {file_path}")
        return
    
    # Initialize text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=256,  # Size of chunks (in tokens)
        chunk_overlap=24,  # Tokens overlap between chunks
        length_function=count_tokens,  # Function to count tokens in each chunk
    )

    # Split text into chunks
    chunks = text_splitter.create_documents([text])
    chunks = [chunk.page_content for chunk in chunks]

    print(f"Processed {pdf_name}: {len(chunks)} chunks")

    # Store processed data in dictionary
    processed_pdf_data[pdf_name] = chunks

# Dictionary to store processed web data
processed_web_data = {}
processed_pdf_data = {}

# Example usage
# process_pdf("docs/sample.pdf", "sample_pdf")

In [3]:
url_dict = {
    "https://ipgp.github.io/scientific_python_cheat_sheet/?utm_content=buffer7d821&utm_medium=social&utm_source=twitter.com&utm_campaign=buffer#numpy-import-numpy-as-np": ["numpy_cheatsheet", "section", "main-content"]
}
for url, (page_name, class_type, class_name) in url_dict.items():
    process_webpage(url, page_name, class_type, class_name)

Token indices sequence length is longer than the specified maximum sequence length for this model (1818 > 1024). Running this sequence through the model will result in indexing errors


Processed numpy_cheatsheet: 29 chunks


In [4]:
print(processed_web_data)

{'numpy_cheatsheet': ["Scientific Python Cheatsheet\nScientific Python Cheatsheet\nPure Python\nTypes\nLists\nDictionaries\nSets\nStrings\nOperators\nControl Flow\nFunctions, Classes, Generators, Decorators\nIPython\nconsole\ndebugger\ncommand line\nNumPy\narray initialization\nindexing\narray properties and operations\nboolean arrays\nelementwise operations and math functions\ninner/ outer products\nlinear algebra/ matrix math\nreading/ writing files\ninterpolation, integration, optimization\nfft\nrounding\nrandom variables\nMatplotlib\nfigures and axes\nfigures and axes properties\nplotting routines\nScipy\ninterpolation\nlinear algebra\nintegration\nPandas\ndata structures\nDataFrame\nPure Python\nTypes\na =\n2\n# integer\nb =\n5.0\n# float\nc =\n8.3e5\n# exponential\nd =\n1.5\n+\n0.5j\n# complex\ne =\n4\n>\n5\n# boolean\nf =\n'word'\n# string\nLists\na = [\n'red'\n,\n'blue'\n,", "# string\nLists\na = [\n'red'\n,\n'blue'\n,\n'green'\n]\n# manually initialization\nb =\nlist\n(\nrange

In [None]:
import pandas as pd

def read_csv(file_path: str):
    """Read CSV file and extract relevant columns."""
    df = pd.read_csv(file_path)

    # Ensure necessary columns exist
    required_columns = {"QuestionTitle", "QuestionBody", "AnswerBody"}
    if not required_columns.issubset(df.columns):
        raise ValueError(f"CSV file must contain columns: {required_columns}")

    return df

def process_csv(file_path: str):
    """Process CSV data and split it into text chunks."""
    df = read_csv(file_path)

    # Initialize text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=256,  # Size of chunks (in tokens)
        chunk_overlap=24,  # Overlap between chunks
        length_function=count_tokens,
    )

    all_chunks = []

    for _, row in df.iterrows():
        # Combine Question and Answer
        combined_text = f"Title: {row['QuestionTitle']}\n\nQuestion: {row['QuestionBody']}\n\nAnswer: {row['AnswerBody']}"

        # Split the text into chunks
        chunks = text_splitter.create_documents([combined_text])
        chunks = [chunk.page_content for chunk in chunks]
        
        all_chunks.extend(chunks)

    print(f"Processed {len(df)} Q&A pairs into {len(all_chunks)} chunks")

    # Embed and insert into vector store
    vectors = embedding_fn.encode_documents(all_chunks)

    data = [
        {"id": i, "vector": vectors[i], "text": all_chunks[i]} for i in range(len(vectors))
    ]

    print("Data has", len(data), "entities, each with fields:", data[0].keys())
    print("Vector dim:", len(data[0]["vector"]))

    res = client.insert(collection_name="demo_collection", data=data)
    print(res)

# Example usage
csv_file_path = "./docs/sklearn_stackoverflow.csv"
process_csv(csv_file_path)


Processed 1494 Q&A pairs into 11917 chunks
