# BPE

In [1]:
!pip install wikipedia
!pip install pyspark
!pip install tqdm



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, size, split, sum as _sum
from pyspark.sql.types import StringType, ArrayType, MapType, IntegerType
from collections import defaultdict, Counter
import wikipedia
import re
from tqdm import tqdm
import time
import random
import json
from typing import Dict, List, Tuple

In [3]:
class SimpleBPETokenizer:
    def __init__(self, vocab_size: int = 1000):
        self.vocab_size = vocab_size
        self.merges: Dict[Tuple[str, str], str] = {}
        self.vocab: Dict[str, int] = {}
        self.special_tokens = {'<pad>': 0, '<unk>': 1, '<bos>': 2, '<eos>': 3}
        self.vocab.update(self.special_tokens)

    def get_stats(self, words: List[List[str]]) -> Counter:
        pairs = Counter()
        for word in words:
            for i in range(len(word) - 1):
                pairs[tuple(word[i:i+2])] += 1
        return pairs

    def merge_vocab(self, words: List[List[str]], pair: Tuple[str, str]) -> List[List[str]]:
        first, second = pair
        new_words = []
        for word in words:
            i = 0
            new_word = []
            while i < len(word):
                if i < len(word) - 1 and word[i] == first and word[i + 1] == second:
                    new_word.append(first + second)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_words.append(new_word)
        return new_words

    def encode(self, text: str) -> List[int]:
        words = text.lower().split()
        result = []
        for word in words:
            word_tokens = [c for c in word]
            while len(word_tokens) > 1:
                pairs = [(word_tokens[i], word_tokens[i + 1])
                        for i in range(len(word_tokens) - 1)]
                mergeable_pairs = [p for p in pairs if p in self.merges]
                if not mergeable_pairs:
                    break
                for pair in pairs:
                    if pair in self.merges:
                        idx = word_tokens.index(pair[0])
                        word_tokens[idx:idx+2] = [self.merges[pair]]
                        break
            result.extend(word_tokens)
        return [self.vocab.get(token, self.vocab['<unk>']) for token in result]

    def decode(self, token_ids: List[int]) -> str:
        inv_vocab = {v: k for k, v in self.vocab.items()}
        return ' '.join(inv_vocab.get(id, '<unk>') for id in token_ids)

In [4]:
def get_large_wikipedia_dataset(initial_topics, target_articles=10000):
    texts = []
    titles_processed = set()

    def get_related_articles(title):
        try:
            page = wikipedia.page(title, auto_suggest=False)
            links = page.links
            return [link for link in links if link not in titles_processed]
        except:
            return []

    def fetch_article(title):
        try:
            page = wikipedia.page(title, auto_suggest=False)
            return page.content
        except:
            return None

    queue = list(initial_topics)

    with tqdm(total=target_articles) as pbar:
        while len(texts) < target_articles and queue:
            current_title = queue.pop(0)

            if current_title in titles_processed:
                continue

            titles_processed.add(current_title)
            content = fetch_article(current_title)

            if content:
                texts.append(content)
                pbar.update(1)
                print(f"\nFetched article: {current_title} ({len(texts)}/{target_articles})")

                related = get_related_articles(current_title)
                queue.extend(random.sample(related, min(len(related), 10)))

            time.sleep(0.5)

    return texts

In [5]:
def process_in_batches(df, batch_size=100):
    total_rows = df.count()
    num_batches = (total_rows + batch_size - 1) // batch_size
    processed_dfs = []

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = start_idx + batch_size

        batch_df = df.withColumn("row_id", monotonically_increasing_id()) \
                     .filter((col("row_id") >= start_idx) & (col("row_id") < end_idx)) \
                     .drop("row_id")

        def preprocess_text(text):
            text = text.lower()
            text = re.sub(r'\[.*?\]|\(.*?\)', '', text)
            text = re.sub(r'[^a-z0-9\s\.\,\!\?]', '', text)
            text = re.sub(r'\s+', ' ', text).strip()
            return text

        preprocess_text_udf = udf(preprocess_text, StringType())
        processed_batch = batch_df.withColumn('cleaned_text', preprocess_text_udf(col('value')))
        processed_dfs.append(processed_batch)

        print(f"Processed batch {i+1}/{num_batches}")

    return spark.createDataFrame(sc.union([df.rdd for df in processed_dfs]))


In [6]:
def train_bpe_tokenizer(spark: SparkSession, data_df, vocab_size: int = 1000) -> SimpleBPETokenizer:
    """Train BPE tokenizer on Spark DataFrame"""

    tokenizer = SimpleBPETokenizer(vocab_size)

    def preprocess_text(text: str) -> List[str]:
        text = text.lower()
        text = re.sub(r'[^a-z0-9\s]', '', text)
        words = text.split()
        return [[c for c in word] for word in words]

    preprocess_udf = udf(preprocess_text, ArrayType(ArrayType(StringType())))

    processed_df = data_df.withColumn("words", preprocess_udf(col("cleaned_text")))
    all_words = processed_df.select(explode("words").alias("word")).collect()
    words = [row.word for row in all_words]

    char_vocab = set()
    for word in words:
        char_vocab.update(word)

    for char in char_vocab:
        tokenizer.vocab[char] = len(tokenizer.vocab)

    num_merges = vocab_size - len(tokenizer.vocab)


    for i in range(num_merges):
        pairs = tokenizer.get_stats(words)
        if not pairs:
            break

        best_pair = max(pairs.items(), key=lambda x: x[1])[0]
        words = tokenizer.merge_vocab(words, best_pair)
        new_token = ''.join(best_pair)
        tokenizer.merges[best_pair] = new_token
        tokenizer.vocab[new_token] = len(tokenizer.vocab)

        if (i + 1) % 100 == 0:
            print(f"Completed {i + 1}/{num_merges} merges")

    return tokenizer

In [7]:
from pyspark.sql.functions import monotonically_increasing_id, col, udf
from pyspark.sql.types import ArrayType, IntegerType

def tokenize_in_batches(df, tokenizer, batch_size=100):
    # Broadcast the tokenizer state
    tokenizer_state = {
        'vocab': tokenizer.vocab,
        'merges': tokenizer.merges,
        'special_tokens': tokenizer.special_tokens
    }
    tokenizer_broadcast = sc.broadcast(tokenizer_state)

    def tokenize_text(text):
        # Reconstruct tokenizer state in worker
        state = tokenizer_broadcast.value
        vocab = state['vocab']
        merges = state['merges']

        # Tokenization logic
        words = text.lower().split()
        result = []
        for word in words:
            word_tokens = [c for c in word]
            while len(word_tokens) > 1:
                pairs = [(word_tokens[i], word_tokens[i + 1])
                        for i in range(len(word_tokens) - 1)]
                mergeable_pairs = [p for p in pairs if p in merges]
                if not mergeable_pairs:
                    break
                for pair in pairs:
                    if pair in merges:
                        idx = word_tokens.index(pair[0])
                        word_tokens[idx:idx+2] = [merges[pair]]
                        break
            result.extend(word_tokens)
        return [vocab.get(token, vocab['<unk>']) for token in result]

    tokenize_text_udf = udf(tokenize_text, ArrayType(IntegerType()))

    total_rows = df.count()
    num_batches = (total_rows + batch_size - 1) // batch_size
    processed_dfs = []

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = start_idx + batch_size

        # Add row_id and filter for current batch
        batch_df = df.withColumn("row_id", monotonically_increasing_id()) \
                     .filter((col("row_id") >= start_idx) & (col("row_id") < end_idx)) \
                     .drop("row_id")

        # Apply tokenization
        processed_batch = batch_df.withColumn('token_ids', tokenize_text_udf(col('cleaned_text')))
        processed_dfs.append(processed_batch)
        print(f"Tokenized batch {i+1}/{num_batches}")

    # Combine all processed batches
    return spark.createDataFrame(sc.union([df.rdd for df in processed_dfs]))


In [8]:
from pyspark.sql.functions import monotonically_increasing_id, col

def save_results_in_batches(df, batch_size=100):
    total_rows = df.count()
    num_batches = (total_rows + batch_size - 1) // batch_size
    all_data = []

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = start_idx + batch_size
        batch_df = df.withColumn("row_id", monotonically_increasing_id()) \
                     .filter((col("row_id") >= start_idx) & (col("row_id") < end_idx)) \
                     .drop("row_id")

        batch_data = batch_df.select('cleaned_text', 'token_ids').collect()
        all_data.extend([{
            "cleaned_text": row.cleaned_text,
            "token_ids": row.token_ids,
        } for row in batch_data])

        print(f"Saved batch {i+1}/{num_batches}")

        if (i + 1) % 10 == 0:
            with open(f'tokenized_data_batch_{i // 10}.json', 'w') as f:
                json.dump(all_data, f)
            all_data = []

    if all_data:
        with open(f'tokenized_data_batch_final.json', 'w') as f:
            json.dump(all_data, f)


In [9]:
# Initialize Spark
spark = SparkSession.builder.appName("WikipediaBPE").getOrCreate()
sc = spark.sparkContext

In [10]:
# Initial topics
initial_topics = [
    "Artificial Intelligence",
    "History of Science",
    "World War II",
    "Climate Change",
    "Philosophy",
    "Biology",
    "Literature",
    "Technology",
    "Art History",
    "Mathematics"
]

# Fetch Wikipedia articles
print("Fetching Wikipedia articles...")
wiki_texts = get_large_wikipedia_dataset(initial_topics, target_articles=5)

Fetching Wikipedia articles...


 20%|██        | 1/5 [00:01<00:05,  1.26s/it]


Fetched article: Artificial Intelligence (1/5)


 40%|████      | 2/5 [00:05<00:08,  2.95s/it]


Fetched article: History of Science (2/5)


 60%|██████    | 3/5 [00:08<00:06,  3.11s/it]


Fetched article: World War II (3/5)


 80%|████████  | 4/5 [00:12<00:03,  3.28s/it]


Fetched article: Climate Change (4/5)


100%|██████████| 5/5 [00:14<00:00,  3.07s/it]


Fetched article: Philosophy (5/5)


100%|██████████| 5/5 [00:16<00:00,  3.28s/it]


In [11]:
# Create DataFrame
wiki_df = spark.createDataFrame([(text,) for text in wiki_texts], ['value'])
print(f"Created DataFrame with {wiki_df.count()} articles")

Created DataFrame with 5 articles


In [12]:
# Preprocess texts
print("Preprocessing texts...")
preprocessed_df = process_in_batches(wiki_df)

Preprocessing texts...
Processed batch 1/1


In [13]:
# Train BPE tokenizer
print("Training BPE tokenizer...")
bpe_tokenizer = train_bpe_tokenizer(spark, preprocessed_df, vocab_size=1000)

Training BPE tokenizer...
Completed 100/960 merges
Completed 200/960 merges
Completed 300/960 merges
Completed 400/960 merges
Completed 500/960 merges
Completed 600/960 merges
Completed 700/960 merges
Completed 800/960 merges
Completed 900/960 merges


In [14]:
# Tokenize texts
print("Tokenizing texts...")
tokenized_df = tokenize_in_batches(preprocessed_df, bpe_tokenizer)

Tokenizing texts...
Tokenized batch 1/1


In [15]:
# Calculate statistics
token_count_df = tokenized_df.withColumn('token_count', size(col('token_ids')))
total_token_count = token_count_df.agg(_sum('token_count').alias('total_tokens')).collect()[0]['total_tokens']
print(f"Total number of tokens in the articles: {total_token_count}")

Total number of tokens in the articles: 71885


In [16]:
# Save results
print("Saving results...")
save_results_in_batches(tokenized_df)

Saving results...
Saved batch 1/1


# Hugging Face Tokeniser and Analytics

In [17]:
!pip install wikipedia
!pip install transformers
!pip install tqdm
!pip install plotly
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

--2024-12-12 07:44:05--  http://setup.johnsnowlabs.com/colab.sh
Resolving setup.johnsnowlabs.com (setup.johnsnowlabs.com)... 3.86.22.73
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|3.86.22.73|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh [following]
--2024-12-12 07:44:06--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1191 (1.2K) [text/plain]
Saving to: ‘STDOUT’


2024-12-12 07:44:06 (71.9 MB/s) - written to stdout [1191/1191]

Installing PySpark 3.2.3 and Spark NLP 5.5.1
setup Colab for PySpark 3.2.3 and Spark NLP 5.5.1


In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, size, split, sum as _sum
from pyspark.sql.types import StringType, ArrayType, IntegerType
from transformers import AutoTokenizer
import wikipedia
import re
from tqdm import tqdm
import time
import random
import json

In [19]:
# Initialize Spark
spark = SparkSession.builder.appName("WikipediaTokenization").getOrCreate()
sc = spark.sparkContext

In [20]:
def get_large_wikipedia_dataset(initial_topics, target_articles=10000):
    texts = []
    titles_processed = set()

    def get_related_articles(title):
        try:
            page = wikipedia.page(title, auto_suggest=False)
            # Get links from the current page
            links = page.links
            # Filter out already processed titles
            return [link for link in links if link not in titles_processed]
        except:
            return []

    def fetch_article(title):
        try:
            page = wikipedia.page(title, auto_suggest=False)
            return page.content
        except:
            return None

    # Start with initial topics
    queue = list(initial_topics)

    with tqdm(total=target_articles) as pbar:
        while len(texts) < target_articles and queue:
            # Get next title from queue
            current_title = queue.pop(0)

            if current_title in titles_processed:
                continue

            # Add to processed set
            titles_processed.add(current_title)

            # Fetch article
            content = fetch_article(current_title)
            if content:
                texts.append(content)
                pbar.update(1)
                print(f"\nFetched article: {current_title} ({len(texts)}/{target_articles})")

                # Get related articles and add to queue
                related = get_related_articles(current_title)
                queue.extend(random.sample(related, min(len(related), 10)))  # Add up to 10 random related articles

            # Sleep briefly to avoid hitting rate limits
            time.sleep(0.5)

            if len(texts) % 100 == 0:
                print(f"\nProgress: {len(texts)}/{target_articles} articles collected")

    return texts

In [21]:
# Initial seed topics covering various domains
initial_topics = [
    "Artificial Intelligence",
    "History of Science",
    "World War II",
    "Climate Change",
    "Philosophy",
    "Biology",
    "Literature",
    "Technology",
    "Art History",
    "Mathematics"
]
print("Fetching Wikipedia articles...")
wiki_texts = get_large_wikipedia_dataset(initial_topics, target_articles=100)

Fetching Wikipedia articles...


  1%|          | 1/100 [00:00<01:19,  1.24it/s]


Fetched article: Artificial Intelligence (1/100)


  2%|▏         | 2/100 [00:05<04:38,  2.84s/it]


Fetched article: History of Science (2/100)


  3%|▎         | 3/100 [00:08<04:59,  3.09s/it]


Fetched article: World War II (3/100)


  4%|▍         | 4/100 [00:12<05:18,  3.32s/it]


Fetched article: Climate Change (4/100)


  5%|▌         | 5/100 [00:14<04:54,  3.10s/it]


Fetched article: Philosophy (5/100)


  6%|▌         | 6/100 [00:17<04:32,  2.89s/it]


Fetched article: Biology (6/100)


  7%|▋         | 7/100 [00:19<04:15,  2.75s/it]


Fetched article: Literature (7/100)


  8%|▊         | 8/100 [00:22<03:58,  2.60s/it]


Fetched article: Technology (8/100)


  9%|▉         | 9/100 [00:24<03:58,  2.62s/it]


Fetched article: Art History (9/100)


 10%|█         | 10/100 [00:27<03:48,  2.54s/it]


Fetched article: Mathematics (10/100)


 11%|█         | 11/100 [00:29<03:32,  2.39s/it]


Fetched article: Solid modeling (11/100)


 12%|█▏        | 12/100 [00:30<03:12,  2.18s/it]


Fetched article: Photograph manipulation (12/100)


 13%|█▎        | 13/100 [00:32<02:52,  1.99s/it]


Fetched article: Palisades Nuclear Generating Station (13/100)


 14%|█▍        | 14/100 [00:33<02:39,  1.85s/it]


Fetched article: Moral blindness (14/100)


 15%|█▌        | 15/100 [00:36<02:46,  1.96s/it]


Fetched article: Terrorist (15/100)


 16%|█▌        | 16/100 [00:38<02:56,  2.10s/it]


Fetched article: Consequentialism (16/100)


 17%|█▋        | 17/100 [00:42<03:28,  2.51s/it]


Fetched article: Kalman filter (17/100)


 18%|█▊        | 18/100 [00:43<03:02,  2.22s/it]


Fetched article: Daniel Crevier (18/100)


 19%|█▉        | 19/100 [00:45<02:54,  2.16s/it]


Fetched article: Fifth generation computer (19/100)


 20%|██        | 20/100 [00:47<02:58,  2.23s/it]


Fetched article: Algorithmic fairness (20/100)


 21%|██        | 21/100 [00:49<02:50,  2.15s/it]


Fetched article: Wayback Machine (21/100)


 22%|██▏       | 22/100 [00:51<02:33,  1.96s/it]


Fetched article: History of engineering (22/100)


 23%|██▎       | 23/100 [00:53<02:30,  1.95s/it]


Fetched article: Women in engineering (23/100)


 24%|██▍       | 24/100 [00:55<02:31,  2.00s/it]


Fetched article: History of algebra (24/100)


 25%|██▌       | 25/100 [00:57<02:34,  2.06s/it]


Fetched article: Hulagu Khan (25/100)


 26%|██▌       | 26/100 [00:59<02:31,  2.04s/it]


Fetched article: Diffusion of innovations (26/100)


 27%|██▋       | 27/100 [01:01<02:32,  2.08s/it]


Fetched article: List of Greek philosophers (27/100)


 28%|██▊       | 28/100 [01:04<02:40,  2.22s/it]


Fetched article: Celestial globe (28/100)


 29%|██▉       | 29/100 [01:05<02:23,  2.02s/it]


Fetched article: Renaissance (29/100)


 30%|███       | 30/100 [01:10<03:03,  2.62s/it]


Fetched article: Industrial Revolution (30/100)


 31%|███       | 31/100 [01:13<03:19,  2.88s/it]


Fetched article: Syria–Lebanon campaign (31/100)


 32%|███▏      | 32/100 [01:15<03:02,  2.68s/it]


Fetched article: Japanese occupation of Burma (32/100)


 33%|███▎      | 33/100 [01:17<02:40,  2.40s/it]


Fetched article: Resistance in Lithuania during World War II (33/100)


 34%|███▍      | 34/100 [01:19<02:29,  2.26s/it]


Fetched article: Korean Volunteer Army (34/100)


 35%|███▌      | 35/100 [01:21<02:24,  2.22s/it]


Fetched article: Sweden during World War II (35/100)


 36%|███▌      | 36/100 [01:23<02:21,  2.21s/it]


Fetched article: British Empire in World War II (36/100)


 37%|███▋      | 37/100 [01:25<02:10,  2.08s/it]


Fetched article: Stalin's ten blows (37/100)


 38%|███▊      | 38/100 [01:27<02:06,  2.04s/it]


Fetched article: Territory of the Military Commander in Serbia (38/100)


 39%|███▉      | 39/100 [01:29<02:08,  2.10s/it]


Fetched article: Soviet re-occupation of the Baltic states (1944) (39/100)


 40%|████      | 40/100 [01:31<02:00,  2.00s/it]


Fetched article: Azali Bábism (40/100)


 41%|████      | 41/100 [01:34<02:18,  2.34s/it]


Fetched article: Flowering (41/100)


 42%|████▏     | 42/100 [01:37<02:21,  2.44s/it]


Fetched article: Ecological collapse (42/100)


 43%|████▎     | 43/100 [01:39<02:11,  2.30s/it]


Fetched article: Ocean current (43/100)


 44%|████▍     | 44/100 [01:42<02:18,  2.48s/it]


Fetched article: Geodetic astronomy (44/100)


 45%|████▌     | 45/100 [01:43<02:04,  2.27s/it]


Fetched article: Arctic amplification (45/100)


 46%|████▌     | 46/100 [01:46<02:01,  2.25s/it]


Fetched article: Carbon footprint (46/100)


 47%|████▋     | 47/100 [01:48<01:55,  2.17s/it]


Fetched article: United Nations Environment Programme (47/100)


 48%|████▊     | 48/100 [01:49<01:48,  2.10s/it]


Fetched article: Environmental impact of reservoirs (48/100)


 49%|████▉     | 49/100 [01:52<01:48,  2.12s/it]


Fetched article: Climate change and wildfires (49/100)


 50%|█████     | 50/100 [01:55<01:56,  2.34s/it]


Fetched article: Wildfire (50/100)


 51%|█████     | 51/100 [01:57<01:55,  2.35s/it]


Fetched article: Epiphenomenalism (51/100)


 52%|█████▏    | 52/100 [01:59<01:44,  2.17s/it]


Fetched article: Feminist philosophy (52/100)


 53%|█████▎    | 53/100 [02:01<01:42,  2.18s/it]


Fetched article: Women in philosophy (53/100)


 54%|█████▍    | 54/100 [02:03<01:43,  2.26s/it]


Fetched article: Philosophy of mathematics (54/100)


 55%|█████▌    | 55/100 [02:06<01:44,  2.32s/it]


Fetched article: Humanitas (55/100)


 56%|█████▌    | 56/100 [02:08<01:36,  2.20s/it]


Fetched article: Western philosophy (56/100)


 57%|█████▋    | 57/100 [02:11<01:42,  2.39s/it]


Fetched article: Ontology (57/100)


 58%|█████▊    | 58/100 [02:13<01:44,  2.50s/it]


Fetched article: Archaeology (58/100)


 59%|█████▉    | 59/100 [02:16<01:40,  2.45s/it]


Fetched article: Rule of inference (59/100)


 60%|██████    | 60/100 [02:17<01:27,  2.19s/it]


Fetched article: Stanford Encyclopedia of Philosophy (60/100)


 61%|██████    | 61/100 [02:19<01:19,  2.05s/it]


Fetched article: Carbohydrate (61/100)


 62%|██████▏   | 62/100 [02:21<01:14,  1.95s/it]


Fetched article: Plant stem (62/100)


 63%|██████▎   | 63/100 [02:22<01:07,  1.83s/it]


Fetched article: Particle (63/100)


 64%|██████▍   | 64/100 [02:24<01:04,  1.80s/it]


Fetched article: Adhesion (64/100)


 65%|██████▌   | 65/100 [02:26<01:04,  1.83s/it]


Fetched article: X-ray (65/100)


 66%|██████▌   | 66/100 [02:28<01:07,  1.99s/it]


Fetched article: Homeostasis (66/100)


 67%|██████▋   | 67/100 [02:30<01:05,  1.99s/it]


Fetched article: Spencer G. Lucas (67/100)


 68%|██████▊   | 68/100 [02:32<01:00,  1.91s/it]


Fetched article: Muscle cell (68/100)


 69%|██████▉   | 69/100 [02:34<00:57,  1.85s/it]


Fetched article: Sexual reproduction (69/100)


 70%|███████   | 70/100 [02:35<00:54,  1.81s/it]


Fetched article: Metabolic pathway (70/100)


 71%|███████   | 71/100 [02:37<00:52,  1.80s/it]


Fetched article: Obscenity trial of Ulysses in The Little Review (71/100)


 72%|███████▏  | 72/100 [02:39<00:47,  1.71s/it]


Fetched article: Epic of Manas (72/100)


 73%|███████▎  | 73/100 [02:40<00:46,  1.72s/it]


Fetched article: Torah (73/100)


 74%|███████▍  | 74/100 [02:43<00:52,  2.00s/it]


Fetched article: The Art of War (74/100)


 75%|███████▌  | 75/100 [02:45<00:50,  2.00s/it]


Fetched article: Maria Nikolajeva (75/100)


 76%|███████▌  | 76/100 [02:46<00:44,  1.85s/it]


Fetched article: Value judgment (76/100)


 77%|███████▋  | 77/100 [02:48<00:41,  1.82s/it]


Fetched article: List of epic poems (77/100)


 78%|███████▊  | 78/100 [02:51<00:44,  2.01s/it]


Fetched article: Stalin (78/100)


 79%|███████▉  | 79/100 [02:53<00:46,  2.23s/it]


Fetched article: Anecdote (79/100)


 80%|████████  | 80/100 [02:55<00:42,  2.15s/it]


Fetched article: King James Version (80/100)


 81%|████████  | 81/100 [02:58<00:42,  2.26s/it]


Fetched article: Vaccine (81/100)


 82%|████████▏ | 82/100 [03:00<00:39,  2.18s/it]


Fetched article: Boundary-work (82/100)


 83%|████████▎ | 83/100 [03:01<00:33,  1.99s/it]


Fetched article: Localism (politics) (83/100)


 84%|████████▍ | 84/100 [03:03<00:31,  2.00s/it]


Fetched article: Metallurgy (84/100)


 85%|████████▌ | 85/100 [03:06<00:31,  2.11s/it]


Fetched article: Energy storage (85/100)


 86%|████████▌ | 86/100 [03:08<00:30,  2.20s/it]


Fetched article: Nuclear weapon (86/100)


 87%|████████▋ | 87/100 [03:11<00:29,  2.27s/it]


Fetched article: Hall–Héroult process (87/100)


 88%|████████▊ | 88/100 [03:12<00:25,  2.11s/it]


Fetched article: Sociology of knowledge (88/100)


 89%|████████▉ | 89/100 [03:14<00:22,  2.00s/it]


Fetched article: Strategy of Technology (89/100)


 90%|█████████ | 90/100 [03:16<00:19,  1.92s/it]


Fetched article: Abstract art (90/100)


 91%|█████████ | 91/100 [03:18<00:16,  1.86s/it]


Fetched article: Institute for Advanced Study (91/100)


 92%|█████████▏| 92/100 [03:20<00:15,  1.91s/it]


Fetched article: Encyclopaedia (92/100)


 93%|█████████▎| 93/100 [03:22<00:13,  1.93s/it]


Fetched article: Celtic art (93/100)


 94%|█████████▍| 94/100 [03:24<00:11,  1.93s/it]


Fetched article: Jackson Pollock (94/100)


 95%|█████████▌| 95/100 [03:25<00:09,  1.82s/it]


Fetched article: Eclecticism in art (95/100)


 96%|█████████▌| 96/100 [03:27<00:07,  1.86s/it]


Fetched article: Analytical psychology (96/100)


 97%|█████████▋| 97/100 [03:29<00:05,  1.91s/it]


Fetched article: Sociology of art (97/100)


 98%|█████████▊| 98/100 [03:31<00:03,  1.80s/it]


Fetched article: Arts festival (98/100)


 99%|█████████▉| 99/100 [03:32<00:01,  1.78s/it]


Fetched article: Anna Freud (99/100)


100%|██████████| 100/100 [03:34<00:00,  1.87s/it]


Fetched article: Pseudoscience (100/100)


100%|██████████| 100/100 [03:36<00:00,  2.17s/it]


Progress: 100/100 articles collected





In [22]:
# Create Spark DataFrame
wiki_df = spark.createDataFrame([(text,) for text in wiki_texts], ['value'])
print(f"Created DataFrame with {wiki_df.count()} articles")

Created DataFrame with 100 articles


In [23]:
# Process in batches to handle large data
from pyspark.sql.functions import udf, col, monotonically_increasing_id, row_number
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
import re

BATCH_SIZE = 100

# Example preprocessing function
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'\[.*?\]|\(.*?\)', '', text)  # remove bracketed text
    text = re.sub(r'[^a-z0-9\s\.\,\!\?]', '', text)  # keep only certain chars
    text = re.sub(r'\s+', ' ', text).strip()         # normalize whitespace
    return text

preprocess_text_udf = udf(preprocess_text, StringType())

def process_in_batches(df, batch_size=BATCH_SIZE):
    # Add a unique row ID to each row
    # We use a window with no partition but order by a unique column (monotonically_increasing_id())
    df_with_id = df.withColumn(
        "row_id",
        row_number().over(Window.orderBy(monotonically_increasing_id()))
    )

    total_rows = df_with_id.count()
    num_batches = (total_rows + batch_size - 1) // batch_size

    processed_dfs = []

    for i in range(num_batches):
        start_idx = i * batch_size + 1
        end_idx = start_idx + batch_size - 1

        # Filter rows for this batch
        batch_df = df_with_id.filter((col("row_id") >= start_idx) & (col("row_id") <= end_idx))

        # Preprocess this batch
        processed_batch = batch_df.withColumn('cleaned_text', preprocess_text_udf(col('value')))

        processed_dfs.append(processed_batch.select(df.columns + ['cleaned_text']))

        print(f"Processed batch {i+1}/{num_batches}")

    # Union all processed batches
    return processed_dfs[0].unionAll(processed_dfs[1]) if len(processed_dfs) > 1 else processed_dfs[0]


In [24]:
# Process the data in batches
preprocessed_df = process_in_batches(wiki_df)

Processed batch 1/1


In [25]:
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained('gpt2')
tokenizer_broadcast = sc.broadcast(tokenizer)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [26]:
# Tokenization function
def tokenize_text(text):
    tokenizer = tokenizer_broadcast.value
    token_ids = tokenizer.encode(text, add_special_tokens=False)
    return token_ids

tokenize_text_udf = udf(tokenize_text, ArrayType(IntegerType()))

In [27]:
# Process tokenization in batches
def tokenize_in_batches(df, batch_size=BATCH_SIZE):
    # Assign a unique row number to each row
    df_with_id = df.withColumn(
        "row_id",
        row_number().over(Window.orderBy(monotonically_increasing_id()))
    )

    total_rows = df_with_id.count()
    num_batches = (total_rows + batch_size - 1) // batch_size

    processed_dfs = []

    for i in range(num_batches):
        start_idx = i * batch_size + 1
        end_idx = start_idx + batch_size - 1

        # Filter rows for this batch
        batch_df = df_with_id.filter((col("row_id") >= start_idx) & (col("row_id") <= end_idx))

        # Apply tokenization
        processed_batch = batch_df.withColumn('token_ids', tokenize_text_udf(col('cleaned_text')))

        # Select original columns plus token_ids (excluding row_id if not needed)
        selected_cols = [c for c in df.columns] + ['token_ids']
        processed_dfs.append(processed_batch.select(*selected_cols))

        print(f"Tokenized batch {i+1}/{num_batches}")

    # Union all processed batches into a single DataFrame
    # If there's only one batch, just return it
    if len(processed_dfs) == 1:
        return processed_dfs[0]
    else:
        unioned_df = processed_dfs[0]
        for pdf in processed_dfs[1:]:
            unioned_df = unioned_df.union(pdf)
        return unioned_df

In [28]:
tokenized_df = tokenize_in_batches(preprocessed_df)

Tokenized batch 1/1


In [29]:
# word count
word_count_df = preprocessed_df.withColumn('word_count', size(split(col('cleaned_text'), ' ')))
total_word_count = word_count_df.agg(_sum('word_count').alias('total_words')).collect()[0]['total_words']
print(f"Total number of words across articles: {total_word_count}")

# Token count
token_count_df = tokenized_df.withColumn('token_count', size(col('token_ids')))
total_token_count = token_count_df.agg(_sum('token_count').alias('total_tokens')).collect()[0]['total_tokens']
print(f"Total number of tokens in the articles: {total_token_count}")

Total number of words across articles: 465028
Total number of tokens in the articles: 604508


In [30]:
# Save results in batches
def save_results_in_batches(df, batch_size=BATCH_SIZE):
    # Assign a unique row number to each row
    df_with_id = df.withColumn(
        "row_id",
        row_number().over(Window.orderBy(monotonically_increasing_id()))
    )

    total_rows = df_with_id.count()
    num_batches = (total_rows + batch_size - 1) // batch_size

    all_data = []

    for i in range(num_batches):
        start_idx = i * batch_size + 1
        end_idx = start_idx + batch_size - 1

        # Filter the rows for this batch
        batch_df = df_with_id.filter((col("row_id") >= start_idx) & (col("row_id") <= end_idx))

        # Collect the batch data
        batch_data = batch_df.select('cleaned_text', 'token_ids').collect()
        all_data.extend([{
            "cleaned_text": row.cleaned_text,
            "token_ids": row.token_ids
        } for row in batch_data])

        print(f"Saved batch {i+1}/{num_batches}")

        # Periodically write intermediate results to file
        if (i + 1) % 10 == 0:
            with open(f'tokenized_data_batch_{(i+1)//10}.json', 'w') as f:
                json.dump(all_data, f)
            all_data = []

    # Save any remaining data
    if all_data:
        with open('tokenized_data_batch_final.json', 'w') as f:
            json.dump(all_data, f)

save_results_in_batches(tokenized_df)

print("\nProcessing complete!")

Saved batch 1/1

Processing complete!


# Analytics on the words

* Removal of stop words

In [31]:
import plotly.express as px
import pandas as pd
import torch
import re
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, col, split, pandas_udf, expr, count
from pyspark.sql.types import StringType, ArrayType, DoubleType
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF, NGram
from pyspark.ml.clustering import LDA
from transformers import pipeline


# UDF to preprocess text
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'\[.*?\]|\(.*?\)', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

preprocess_text_udf = udf(preprocess_text, StringType())

# Preprocess and tokenize
cleaned_df = wiki_df.withColumn('cleaned_text', preprocess_text_udf(col('value')))
words_df = cleaned_df.withColumn('words', split(col('cleaned_text'), ' '))

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
filtered_df = remover.transform(words_df)
filtered_df.cache()

filtered_df.show(5)

+--------------------+--------------------+--------------------+--------------------+
|               value|        cleaned_text|               words|      filtered_words|
+--------------------+--------------------+--------------------+--------------------+
|Artificial intell...|artificial intell...|[artificial, inte...|[artificial, inte...|
|The history of sc...|the history of sc...|[the, history, of...|[history, science...|
|World War II or t...|world war ii or t...|[world, war, ii, ...|[world, war, ii, ...|
|Present-day clima...|presentday climat...|[presentday, clim...|[presentday, clim...|
|Philosophy ('love...|philosophy is a s...|[philosophy, is, ...|[philosophy, syst...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



a) Frequency of top 10 words

In [32]:
word_freq_df = (filtered_df
                .withColumn('word', explode(col('filtered_words')))
                .groupBy('word')
                .agg(count('*').alias('frequency'))
                .orderBy(col('frequency').desc()))

word_freq_df.show(10)


# visualization
word_freq_pd = word_freq_df.limit(10).toPandas()
fig = px.bar(word_freq_pd, x='word', y='frequency', title='Top 10 Most Frequent Words')
fig.show()

+----------+---------+
|      word|frequency|
+----------+---------+
|      also|     1193|
|         k|     1084|
|       one|      843|
|      used|      832|
|       new|      702|
|       war|      700|
|philosophy|      698|
|       may|      687|
|     first|      676|
|      many|      640|
+----------+---------+
only showing top 10 rows



b) Frequency of tokens

In [33]:
# Example only if tokenized_df exists and has token_ids
token_freq_df = (tokenized_df
                 .withColumn('token', explode(col('token_ids')))
                 .groupBy('token')
                 .agg(count('*').alias('frequency'))
                 .orderBy(col('frequency').desc()))

token_freq_df.show(10)

# Visualization for Top 10 Tokens
token_freq_pd = token_freq_df.limit(10).toPandas()
token_freq_pd["token"] = token_freq_pd["token"].astype(str)
fig = px.bar(token_freq_pd, x='token', y='frequency', title='Top 10 Most Frequent Tokens')
fig.show()

+-----+---------+
|token|frequency|
+-----+---------+
|  262|    32051|
|   11|    27280|
|   13|    21691|
|  286|    19082|
|  290|    14992|
|  287|    11773|
|  284|    10565|
|  257|     9288|
|  318|     5178|
|  355|     4451|
+-----+---------+
only showing top 10 rows



c) TF-IDF

In [34]:
# we want to maintain mapping between the term and index
cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features", vocabSize=10000)
cv_model = cv.fit(filtered_df)
tf = cv_model.transform(filtered_df)

idf = IDF(inputCol="raw_features", outputCol="features")
tfidf_model = idf.fit(tf)
tfidf_df = tfidf_model.transform(tf)
tfidf_df.show(5)


# visualization for tf-idf features for a single document
vocabulary = cv_model.vocabulary

doc_tfidf = tfidf_df.select("filtered_words", "features").limit(1).toPandas()
features_arr = doc_tfidf['features'][0].toArray()
nonzero_indices = [(i, w) for i, w in enumerate(features_arr) if w > 0]
top_features = sorted(nonzero_indices, key=lambda x: x[1], reverse=True)[:10]

top_features_pd = pd.DataFrame(top_features, columns=['FeatureIndex', 'TF-IDF Weight'])

# Map the feature index to the corresponding term
top_features_pd['Term'] = top_features_pd['FeatureIndex'].apply(lambda idx: vocabulary[idx])

fig = px.bar(top_features_pd, x='Term', y='TF-IDF Weight', title='Top 10 TF-IDF Terms')
fig.show()


+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               value|        cleaned_text|               words|      filtered_words|        raw_features|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Artificial intell...|artificial intell...|[artificial, inte...|[artificial, inte...|(10000,[0,1,2,3,4...|(10000,[0,1,2,3,4...|
|The history of sc...|the history of sc...|[the, history, of...|[history, science...|(10000,[0,2,3,4,5...|(10000,[0,2,3,4,5...|
|World War II or t...|world war ii or t...|[world, war, ii, ...|[world, war, ii, ...|(10000,[0,2,3,4,5...|(10000,[0,2,3,4,5...|
|Present-day clima...|presentday climat...|[presentday, clim...|[presentday, clim...|(10000,[0,2,3,4,7...|(10000,[0,2,3,4,7...|
|Philosophy ('love...|philosophy is a s...|[philosophy, is, ...|[philosophy, syst...|(10000,[0,2,3,4,6..

d) Bigram analysis

In [35]:
ngram = NGram(n=2, inputCol="filtered_words", outputCol="bigrams")
bigrams_df = ngram.transform(filtered_df)
bigrams_df.show(5, truncate=False)

# Frequency of bigrams
bigrams_exploded_df = bigrams_df.withColumn('bigram', explode(col('bigrams')))
bigram_freq_df = bigrams_exploded_df.groupBy('bigram').agg(count('*').alias('frequency')).orderBy(col('frequency').desc())
bigram_freq_pd = bigram_freq_df.limit(10).toPandas()

# visualization for bigram
fig = px.bar(bigram_freq_pd, x="bigram", y="frequency", title="Top 10 Bigrams by Frequency")
fig.show()

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

e) Sentiment Analysis

In [36]:
@pandas_udf(StringType())
def batch_sentiment_udf(texts: pd.Series) -> pd.Series:
    # Load the sentiment pipeline once per batch
    local_device = 0 if torch.cuda.is_available() else -1
    local_sentiment = pipeline("sentiment-analysis", device=local_device, truncation=True)

    results = []
    for txt in texts:
        truncated_txt = txt[:512] if txt else ""
        pred = local_sentiment(truncated_txt)[0]['label']
        results.append(pred)
    return pd.Series(results)

# Apply the Pandas UDF in Spark
sentiment_df = preprocessed_df.withColumn('sentiment', batch_sentiment_udf(col('cleaned_text')))
sentiment_df.show(5)

# Now do the aggregation in Spark as before
from pyspark.sql.functions import col, count
sentiment_counts_df = (sentiment_df
    .filter(col("sentiment").isNotNull())
    .groupBy("sentiment")
    .agg(count("*").alias("Count"))
)

sentiment_counts_pd = sentiment_counts_df.toPandas()
fig = px.pie(sentiment_counts_pd, values='Count', names='sentiment', title='Sentiment Distribution')
fig.show()

+--------------------+--------------------+---------+
|               value|        cleaned_text|sentiment|
+--------------------+--------------------+---------+
|Artificial intell...|artificial intell...| POSITIVE|
|The history of sc...|the history of sc...| POSITIVE|
|World War II or t...|world war ii or t...| NEGATIVE|
|Present-day clima...|presentday climat...| POSITIVE|
|Philosophy ('love...|philosophy is a s...| POSITIVE|
+--------------------+--------------------+---------+
only showing top 5 rows



f) Topic Modelling

In [37]:
lda = LDA(k=10, maxIter=10, featuresCol="raw_features")
lda_model = lda.fit(tfidf_df)
topics = lda_model.describeTopics()
topics.show(truncate=False)

# Visualization: TermIndices and TermWeights
topics_pd = topics.limit(10).toPandas()

topics_flat = []
for i, row in topics_pd.iterrows():
    termIndices = row['termIndices']
    termWeights = row['termWeights']
    for t, w in zip(termIndices, termWeights):
        # Map term index to the actual term using the vocabulary
        term = vocabulary[t]  # Convert index t to its corresponding term
        topics_flat.append({'Topic': i, 'Term': term, 'Weight': w})

topics_flat_pd = pd.DataFrame(topics_flat)
fig = px.bar(topics_flat_pd, x='Term', y='Weight', color='Topic',
             title='Top Terms by Topic')
fig.show()


+-----+-----------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                                                |termWeights                                                                                                                                                                                                                        |
+-----+-----------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |[49, 151, 82, 202, 0, 135, 11, 50, 9, 471]                 |[0.004119744214921579, 0.00408699037738881, 0


Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.



g) Cosine similarity between documents

In [38]:
def cosine_similarity(vec1, vec2):
    arr1 = vec1.toArray() if isinstance(vec1, SparseVector) else vec1
    arr2 = vec2.toArray() if isinstance(vec2, SparseVector) else vec2
    dot_product = float(Vectors.dense(arr1).dot(Vectors.dense(arr2)))
    mag1 = float(Vectors.dense(arr1).norm(2))
    mag2 = float(Vectors.dense(arr2).norm(2))
    if mag1 == 0.0 or mag2 == 0.0:
        return 0.0
    return dot_product / (mag1 * mag2)

cosine_similarity_udf = udf(cosine_similarity, DoubleType())

tfidf_df_with_id = tfidf_df.withColumn(
    "id", row_number().over(Window.orderBy(monotonically_increasing_id()))
)

# sampling for efficiency
sample_tfidf = tfidf_df_with_id.sample(False, 0.25)

similarity_df = (sample_tfidf.alias('df1')
    .join(sample_tfidf.alias('df2'), expr('df1.id < df2.id'))
    .withColumn('similarity', cosine_similarity_udf(col('df1.features'), col('df2.features'))))

similarity_df.show(5)

# Visualization of Cosine Similarity
sim_pd = (similarity_df
    .select(col('df1.id').alias('Document1'),
            col('df2.id').alias('Document2'),
            col('similarity'))
    .limit(50)
    .toPandas()
)

fig = px.scatter(sim_pd, x='Document1', y='Document2', size='similarity',
                 title='Cosine Similarity Between Documents')
fig.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+
|               value|        cleaned_text|               words|      filtered_words|        raw_features|            features| id|               value|        cleaned_text|               words|      filtered_words|        raw_features|            features| id|          similarity|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+
|Artificial intell...|artificial intell...|[artificial, inte...|[artificial, inte...|(10000,[0,1,2,3,4...|(10000,[0,1,2,3,4...|  1|Present-day clima...

h) [Future Scope] KMeans clustering
* Hasn't been implemented due to platform issues in Google Colab (PySpark crash)

In [39]:
# hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=2000)  # fewer features
# tf = hashing_tf.transform(filtered_df)

# idf = IDF(inputCol="raw_features", outputCol="features")
# tfidf_model = idf.fit(tf)
# tfidf_df = tfidf_model.transform(tf)

# # Sample your data if it's large
# # For example, take only 10% of the data to reduce memory pressure
# tfidf_small = tfidf_df.sample(False, 0.1, seed=42)
# print("Count after sampling:", tfidf_small.count())

# # Apply KMeans to the smaller dataset
# from pyspark.ml.clustering import KMeans
# kmeans = KMeans(k=5, seed=1, featuresCol="features", maxIter=10)  # Reduced maxIter for faster run
# kmeans_model = kmeans.fit(tfidf_small)
# clusters = kmeans_model.transform(tfidf_small)

# # PCA for visualization: Also do it on the smaller dataset
# from pyspark.ml.feature import PCA
# pca = PCA(k=3, inputCol="features", outputCol="pca_features")
# pca_model = pca.fit(clusters)
# pca_df = pca_model.transform(clusters)

# # Limit the data before converting to Pandas to avoid memory overflow
# pca_pd = pca_df.select("pca_features", "prediction").limit(100).toPandas()

# import pandas as pd
# import plotly.express as px

# pca_pd[['PCA1', 'PCA2', 'PCA3']] = pd.DataFrame(pca_pd['pca_features'].tolist(), index=pca_pd.index)

# # Smaller, more manageable visualization
# fig = px.scatter_3d(pca_pd, x='PCA1', y='PCA2', z='PCA3', color='prediction',
#                     title='KMeans Clustering Visualization (PCA) - Reduced Data')
# fig.update_layout(width=800, height=600)
# fig.show()