In [1]:
import aiohttp
import asyncio
import nest_asyncio
import re
import csv
from collections import Counter
from bs4 import BeautifulSoup
from classes.progress_tracker import ProgressTracker

class WikipediaScraper:
    def __init__(self, api_url="https://ms.wikipedia.org/w/api.php", max_concurrent_requests=10):
        self.api_url = api_url
        self.session = None
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)  # Limit concurrency

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()

    async def fetch_article_titles(self, limit=50):
        """Fetch recent article titles from Wikipedia."""
        print("Fetching article titles...")
        search_term = "history"
        params = {
            "action": "query",         # We use the "query" action
            "list": "search",         # Search query
            "srsearch": search_term,  # The search term to query
            "srlimit": "max",         # Max number of results (you can adjust this as per your needs)
            "format": "json"          # Return the data in JSON format
        }
        try:
            async with self.semaphore:
                async with self.session.get(self.api_url, params=params) as response:
                    if response.status == 200:
                        data = await response.json()
                        articles = data['query']['search']
                        seen_titles = set()
                        article_info = []
                        for article in articles:
                            title = article['title']
                            if title not in seen_titles:
                                link = f"https://ms.wikipedia.org/wiki/{title.replace(' ', '_')}"
                                article_info.append((title, link))
                                seen_titles.add(title)
                            if len(article_info) >= limit:
                                break
                        print(f"Fetched {len(article_info)} article titles.")
                        return article_info
                    else:
                        print("Error fetching data from Wikipedia API.")
                        return []
        except Exception as e:
            print(f"Error fetching articles: {e}")
            return []

    async def fetch_article_content(self, title):
        """Fetch the content of a specific Wikipedia article."""
        url = f"https://ms.wikipedia.org/wiki/{title.replace(' ', '_')}"
        async with self.semaphore:
            async with self.session.get(url) as response:
                if response.status == 200:
                    soup = BeautifulSoup(await response.text(), 'html.parser')
                    content_div = soup.find(id="bodyContent")
                    if content_div:
                        paragraphs = content_div.find_all('p')
                        return ' '.join([para.get_text() for para in paragraphs])
        return ""

class ArticleProcessor:
    @staticmethod
    def clean_text(text):
        """Clean the text by removing special characters and converting to lowercase."""
        return re.sub(r'[^a-zA-Z\s]', ' ', text).lower().strip()

    @staticmethod
    def save_word_counts_to_csv(word_counts, filename="./raw_data/word_counts.csv"):
        directory = os.path.dirname(filename)
        if directory and not os.path.exists(directory):
            os.makedirs(directory)
        sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
        with open(filename, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow(["word", "count"])
            for word, count in sorted_word_counts:
                writer.writerow([word, count])

    @staticmethod
    def filter_word_counts(word_counts, min_count=3, min_length=2):
        """Filter word counts based on minimum count and word length."""
        return {word: count for word, count in word_counts.items() if count > min_count and len(word) >= min_length}

class WikipediaPipeline:
    def __init__(self, api_url="https://ms.wikipedia.org/w/api.php", limit=200, max_concurrent_requests=10):
        self.api_url = api_url
        self.limit = limit
        self.max_concurrent_requests = max_concurrent_requests

    async def run(self):
        async with WikipediaScraper(self.api_url, self.max_concurrent_requests) as scraper:
            articles = await scraper.fetch_article_titles(self.limit)

            if not articles:
                print("No articles fetched. Exiting...")
                return

            print(f"Fetched {len(articles)} articles. Fetching content...")

            # Initialize progress tracker
            progress_tracker = ProgressTracker(len(articles))

            async def fetch_content_with_progress(title):
                content = await scraper.fetch_article_content(title)
                progress_tracker.update()
                return content

            tasks = [fetch_content_with_progress(title) for title, _ in articles]
            all_content = await asyncio.gather(*tasks)

            progress_tracker.complete()

            # Process and count words
            overall_word_counts = Counter()
            for content in all_content:
                if content:
                    cleaned_text = ArticleProcessor.clean_text(content)
                    words = cleaned_text.split()
                    overall_word_counts.update(words)

            # Save filtered results
            filtered_word_counts = ArticleProcessor.filter_word_counts(overall_word_counts)
            ArticleProcessor.save_word_counts_to_csv(filtered_word_counts)
            print("Pipeline completed successfully.")

if __name__ == "__main__":
    nest_asyncio.apply()
    pipeline = WikipediaPipeline(api_url="https://ms.wikipedia.org/w/api.php", limit=500, max_concurrent_requests=10)
    asyncio.get_event_loop().run_until_complete(pipeline.run())

Fetching article titles...
Fetched 500 article titles.
Fetched 500 articles. Fetching content...

|██████████████████████████████████████████████████| 100% Elapsed: 0m 43s ETA: 0m 0s



FileNotFoundError: [Errno 2] No such file or directory: './raw_data/word_counts.csv'

In [None]:
import aiohttp
import asyncio
import nest_asyncio
import nltk
import time
from nltk.corpus import words
from pyspark.sql import SparkSession
from bs4 import BeautifulSoup
from classes.progress_tracker import ProgressTracker

nest_asyncio.apply()

try:
    words_list = set(words.words())
except LookupError:
    print("Downloading nltk 'words' corpus...")
    nltk.download('words')
    words_list = set(words.words())

class WordValidator:
    def __init__(self, csv_file_path, output_csv_path, neither_csv_path, batch_size=100, reduced_words_count=None):
        self.csv_file_path = csv_file_path
        self.output_csv_path = output_csv_path
        self.neither_csv_path = neither_csv_path
        self.batch_size = batch_size
        self.reduced_words_count = reduced_words_count  # New parameter for limiting words count
        self.spark = SparkSession.builder.appName("WordValidator").getOrCreate()

    def load_csv(self):
        """Load the CSV file into a Spark DataFrame."""
        df = self.spark.read.csv(self.csv_file_path, header=True, inferSchema=True)
        if self.reduced_words_count:
            # Limit the number of rows to `reduced_words_count` and retain the count column
            if "count" in df.columns:
                df = df.orderBy("count", ascending=False).limit(self.reduced_words_count)
            else:
                print(f"Limiting to the first {self.reduced_words_count} rows.")
                df = df.limit(self.reduced_words_count)
        return df

    async def check_english_word(self, word, session):
        """Check if a word is valid in English using an online dictionary API."""
        if word.lower() in words_list:
            return True
        try:
            url = f"https://api.dictionaryapi.dev/api/v2/entries/en/{word}"
            async with session.get(url) as response:
                return response.status == 200
        except Exception:
            return False

    async def check_malay_word(self, word, session):
        """Check if a word is valid in Malay using the DBP API."""
        try:
            url = f"https://prpm.dbp.gov.my/Cari1?keyword={word}"
            async with session.get(url) as response:
                soup = BeautifulSoup(await response.text(), 'html.parser')
                panel_result = soup.find(id="MainContent_panelresult")
                return panel_result and "<b>Kamus Bahasa Melayu</b>" in str(panel_result)
        except Exception:
            return False

    async def process_words(self, words, progress_tracker):
        """Validate a batch of words asynchronously."""
        async with aiohttp.ClientSession() as session:
            english_results = await asyncio.gather(
                *[self.check_english_word(word, session) for word in words]
            )
            malay_results = await asyncio.gather(
                *[self.check_malay_word(word, session) for word in words]
            )
        for _ in words:
            progress_tracker.update()
        return english_results, malay_results

    async def validate_words_in_batches(self, word_list, progress_tracker):
        """Validate words in batches to optimize API calls."""
        valid_english_results = []
        valid_malay_results = []

        for i in range(0, len(word_list), self.batch_size):
            batch_words = word_list[i:i + self.batch_size]
            english_results, malay_results = await self.process_words(batch_words, progress_tracker)
            valid_english_results.extend(english_results)
            valid_malay_results.extend(malay_results)

        return valid_english_results, valid_malay_results

    async def validate_and_save(self):
        """Main function to load, validate, and save the filtered words."""
        # Load words from CSV
        df = self.load_csv()
        word_list = df.select("word", "count").rdd.map(lambda row: (row['word'], int(row['count']))).collect()

        # Initialize progress tracker
        print(f"Validating {len(word_list)} words...")
        progress_tracker = ProgressTracker(len(word_list))

        # Separate words and counts
        words, counts = zip(*word_list)

        # Validate words
        valid_english_results, valid_malay_results = await self.validate_words_in_batches(words, progress_tracker)

        # Complete progress bar
        progress_tracker.complete()
        # Create a DataFrame with validation results
        validated_df = self.spark.createDataFrame(
            zip(words, counts, valid_english_results, valid_malay_results),
            ["word", "count", "is_english", "is_malay"]
        )

        # Convert Spark DataFrame to Pandas DataFrame
        pandas_df = validated_df.toPandas()

        # Filter valid English and Malay words
        valid_words_df = pandas_df[(pandas_df['is_english']) | (pandas_df['is_malay'])]
        neither_words_df = pandas_df[~(pandas_df['is_english'] | pandas_df['is_malay'])]

        valid_words_df.head()

        # Save results directly to CSV files
        valid_words_df.to_csv(self.output_csv_path, index=False)
        neither_words_df.to_csv(self.neither_csv_path, index=False)

        print("All files saved successfully as single CSV files.")

if __name__ == "__main__":
    csv_file_path = "./raw_data/word_counts.csv"
    output_csv_path = "./raw_data/valid_word_counts.csv"
    neither_csv_path = "./raw_data/neither_words.csv"
    validator = WordValidator(csv_file_path, output_csv_path, neither_csv_path, reduced_words_count=200)

    async def main():
        await validator.validate_and_save()

    asyncio.run(main())

Validating 150 words...



In [6]:
class DefinitionExampleEntry:
    def __init__(self, definition: str, examples: list[str]):
        """Initialize a DefinitionExampleEntry object.

        Args:
            definition (str): The definition extracted from the text.
            examples (list[str]): A list of examples extracted from the text.
        """
        self.definition = definition
        self.examples = examples

    def __repr__(self):
        """Return a string representation of the object."""
        return self.__str__()

    def __str__(self):
        """Return a user-friendly string representation of the object."""
        examples_str = "\n".join(self.examples)
        return (
            f"Definition: {self.definition}\nExamples:\n{examples_str if examples_str else 'None'}"
        )

def extract_definition_and_examples(text: str) -> DefinitionExampleEntry:
    """Extract definition and examples from the input text.

    Args:
        text (str): The input text containing a definition and examples.

    Returns:
        DefinitionExampleEntry: An object containing the definition and examples.
    """
    # Split the input into definition and examples based on the first colon
    parts = text.split(":", 1)

    # The first part is always the definition
    definition = parts[0].strip()

    # Check if examples exist (based on the presence of a second part)
    if len(parts) > 1:
        examples_text = parts[1]
        examples = [
            f"Example {i + 1}: {example.strip()}"
            for i, example in enumerate(examples_text.split(";"))
            if example.strip()
        ]
    else:
        examples = []

    # Return an instance of DefinitionExampleEntry
    return DefinitionExampleEntry(definition, examples)

test_texts = [
    "organ berwarna perang kemerah-merahan di dlm tubuh yg mengeluarkan hempedu dan membersihkan darah: ~ kambing; ~ lembu.", #hati
    "guru: dia ingin menjadi ~;", #cikgu
    "enak (pd rasa, bau, dll): kuih itu sangat ~ rasanya;", #sedap
    "balai atau ruang tempat diadakan majlis-majlis forum (tari-menari, perbahasan dll): Forum itu telah diadakan di ~ Canselor, Universiti Malaya.", #dewan
    " air yg turun (menitik dgn banyaknya) yg terpeluwap drpd wap di atmosfera: pagi ini ~ turun dgn lebatnya; musim ~ = ketika ~ musim yg hujan selalu turun;", #hujan
    "menyampaikan ilmu pengetahuan (kemahiran dsb) kpd orang lain, mendidik, melatih, memberikan petunjuk-petun­juk kpd: ~ orang menggunakan senjata;", #ajar
    "tidak sempit, lapang (pakaian, lubang, dll): seluar baru itu ~ sedikit;", #langgar
    "; ~ kurus sj tumbuhan (pokok), berasberas, kasir, kedondong matahari, kempas roman, pauh kijang, Trigonochlamys griffithii; rumput ~ sj tumbuhan (rumpai), getang-getang, ketumbi padang, tutup bumi paya, Synedrella nodiflora." #babi2
]

# Process each text and print the result
for text in test_texts:
    entry = extract_definition_and_examples(text)
    print(entry)
    print()

Definition: organ berwarna perang kemerah-merahan di dlm tubuh yg mengeluarkan hempedu dan membersihkan darah
Examples:
Example 1: ~ kambing
Example 2: ~ lembu.

Definition: guru
Examples:
Example 1: dia ingin menjadi ~

Definition: enak (pd rasa, bau, dll)
Examples:
Example 1: kuih itu sangat ~ rasanya

Definition: balai atau ruang tempat diadakan majlis-majlis forum (tari-menari, perbahasan dll)
Examples:
Example 1: Forum itu telah diadakan di ~ Canselor, Universiti Malaya.

Definition: air yg turun (menitik dgn banyaknya) yg terpeluwap drpd wap di atmosfera
Examples:
Example 1: pagi ini ~ turun dgn lebatnya
Example 2: musim ~ = ketika ~ musim yg hujan selalu turun

Definition: menyampaikan ilmu pengetahuan (kemahiran dsb) kpd orang lain, mendidik, melatih, memberikan petunjuk-petun­juk kpd
Examples:
Example 1: ~ orang menggunakan senjata

Definition: tidak sempit, lapang (pakaian, lubang, dll)
Examples:
Example 1: seluar baru itu ~ sedikit

Definition: ; ~ kurus sj tumbuhan (pokok),