# getting data ready and seetting up the environmnent


In [1]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import xml.etree.ElementTree as ET

# Initialize Spark session with more resources
spark = SparkSession.builder \
    .appName("WikipediaData") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

# Function to parse each line of the pseudo-XML format
def parse_wiki_line(line):
    try:
        root = ET.fromstring(line.strip())
        doc_id = root.attrib['id']
        url = root.attrib['url']
        title = root.attrib['title']
        content = root.text
        return (doc_id, url, title, content)
    except ET.ParseError:
        return None

# Define the schema
from pyspark.sql.types import StringType, StructType, StructField

schema = StructType([
    StructField("doc_id", StringType(), True),
    StructField("url", StringType(), True),
    StructField("title", StringType(), True),
    StructField("content", StringType(), True)
])

# Load the dataset into an RDD (limiting to the first 100 lines for testing)
file_path = 'WikipediaPagesOneDocPerLine1000LinesSmall.txt'
lines = spark.sparkContext.textFile(file_path).take(100)
lines_rdd = spark.sparkContext.parallelize(lines)

# Parse the lines
parsed_lines = lines_rdd.map(parse_wiki_line).filter(lambda x: x is not None)

# Convert to DataFrame
wiki_df = spark.createDataFrame(parsed_lines, schema=schema)

# Show the DataFrame
wiki_df.show(truncate=False)


Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
24/05/23 08:18:22 WARN Utils: Your hostname, dragon resolves to a loopback address: 127.0.1.1; using 192.168.100.3 instead (on interface eth0)
24/05/23 08:18:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 08:18:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+------+------------------------------------------+---------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [3]:
spark

In [4]:
wiki_df.columns

['doc_id', 'url', 'title', 'content']

# Task 1 - Generate the Top 20K dictionary and Create the TF-IDF Array (4 Points)

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, desc
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import numpy as np

# Initialize Spark session (if not already initialized)
spark = SparkSession.builder \
    .appName("WikipediaTFIDF") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Assuming 'wiki_df' is your DataFrame with columns 'doc_id' and 'content'

# Tokenize words
tokenizer = Tokenizer(inputCol="content", outputCol="words")
words_df = tokenizer.transform(wiki_df)

# Explode words to get individual tokens
exploded_words_df = words_df.select("doc_id", explode("words").alias("word"))

# Count the occurrences of each word
word_counts = exploded_words_df.groupBy("word").count().orderBy(desc("count"))

# Show top words and their counts
word_counts.show(20, truncate=False)

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_df)

# CountVectorizer to get term frequencies
cv = CountVectorizer(inputCol="filtered_words", outputCol="tf_features", vocabSize=20000)
cv_model = cv.fit(filtered_df)
tf_df = cv_model.transform(filtered_df)

# Get vocabulary and sort by descending frequency
vocab = cv_model.vocabulary

# Function to map word to index in vocabulary
word_to_index = {word: idx for idx, word in enumerate(vocab)}

# Function to create TF vector for each document
def create_tf_vector(row):
    doc_id = row['doc_id']
    tf_features = row['tf_features']
    tf_vector = np.zeros(len(vocab))
    for idx, freq in zip(tf_features.indices, tf_features.values):
        word = vocab[idx]
        if word in word_to_index:
            tf_vector[word_to_index[word]] = freq
    return doc_id, tf_vector.tolist()

# Create RDD with document ID and TF vector
tf_rdd = tf_df.select("doc_id", "tf_features").rdd.map(create_tf_vector)

# Print first few elements for verification
print(tf_rdd.take(3))



24/05/23 08:18:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/23 08:18:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+----+-----+
|word|count|
+----+-----+
|the |5182 |
|of  |2345 |
|and |1995 |
|in  |1930 |
|to  |1569 |
|a   |1458 |
|was |936  |
|is  |619  |
|as  |599  |
|for |591  |
|by  |530  |
|on  |493  |
|that|454  |
|with|447  |
|he  |371  |
|at  |368  |
|his |349  |
|from|341  |
|it  |339  |
|be  |254  |
+----+-----+
only showing top 20 rows



[Stage 10:>                                                         (0 + 1) / 1]

[('431949', [10.0, 4.0, 11.0, 1.0, 3.0, 0.0, 10.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 54.0, 4.0, 1.0, 0.0, 0.0, 5.0, 0.0, 4.0, 6.0, 1.0, 0.0, 45.0, 2.0, 0.0, 2.0, 1.0, 1.0, 0.0, 28.0, 0.0, 3.0, 0.0, 0.0, 5.0, 1.0, 5.0, 0.0, 0.0, 2.0, 16.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 3.0, 5.0, 0.0, 1.0, 1.0, 0.0, 0.0, 7.0, 0.0, 0.0, 0.0, 1.0, 2.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 2.0, 1.0, 1.0, 0.0, 1.0, 0.0, 3.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 13.0, 2.0, 30.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 27.0, 2.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 8.0, 0.0, 1.0, 2.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 1.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 2.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 5.0, 2.0, 4.0, 1.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0, 0.0, 3.0, 5.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 12.0, 0.0, 1.0, 0.0, 0.0, 0.0, 2.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1

                                                                                

# Task 2 - Implement the getPrediction function (8 Points)


In [6]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql.functions import col
from scipy.spatial import distance
from collections import Counter

# Initialize Spark session (if not already initialized)
spark = SparkSession.builder \
    .appName("WikipediaTFIDF") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Assuming 'wiki_df' is your DataFrame with columns 'doc_id', 'content', and 'category'
# Tokenize words
tokenizer = Tokenizer(inputCol="content", outputCol="words")
words_df = tokenizer.transform(wiki_df)

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_df)

# CountVectorizer to get term frequencies
cv = CountVectorizer(inputCol="filtered_words", outputCol="tf_features", vocabSize=20000)
cv_model = cv.fit(filtered_df)
tf_df = cv_model.transform(filtered_df)

# Compute IDF
idf = IDF(inputCol="tf_features", outputCol="tfidf_features")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)

# Function to transform text input into TF-IDF vector
def transform_text(text):
    # Tokenize and remove stopwords
    words = text.split()
    filtered_words = [w.lower() for w in words if w.lower() not in remover.getStopWords()]

    # Create a temporary DataFrame
    temp_df = spark.createDataFrame([(filtered_words,)], ["filtered_words"])

    # Convert to TF-IDF vector
    tf_vector = cv_model.transform(temp_df)
    tfidf_vector = idf_model.transform(tf_vector)
    return tfidf_vector.select("tfidf_features").first().tfidf_features.toArray()

# Function to calculate cosine similarity
def cosine_similarity(vec1, vec2):
    return 1 - distance.cosine(vec1, vec2)

# Function to get prediction for text input
def getPrediction(textInput, k):
    # Transform text input into TF-IDF vector
    input_vector = transform_text(textInput)

    # Calculate similarity with all documents
    def calculate_similarity(row):
        doc_id = row['doc_id']
        tfidf_vector = row['tfidf_features'].toArray()
        sim_score = cosine_similarity(input_vector, tfidf_vector)
        return Row(doc_id=doc_id, similarity=sim_score)

    # Apply calculate_similarity function to tfidf_df's RDD representation
    similarity_rdd = tfidf_df.select("doc_id", "tfidf_features").rdd.map(calculate_similarity)

    # Get top k nearest neighbors
    top_k_neighbors = similarity_rdd.takeOrdered(k, key=lambda x: -x['similarity'])

    # Gather categories of top k neighbors
    neighbor_categories = [wiki_df.filter(col('doc_id') == neighbor['doc_id']).select('category').first().category for neighbor in top_k_neighbors]

    # Predict category based on majority vote
    category_counts = Counter(neighbor_categories)
    predicted_category = category_counts.most_common(1)[0][0]

    return predicted_category

# Example usage
print(getPrediction('Sport Basketball Volleyball Soccer', 10))
print(getPrediction('What is the capital city of Australia?', 10))
print(getPrediction('How many goals Vancouver score last year?', 10))


                                                                                

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `category` cannot be resolved. Did you mean one of the following? [`content`, `title`, `doc_id`, `url`].;
'Project ['category]
+- Filter (doc_id#0 = 432240)
   +- LogicalRDD [doc_id#0, url#1, title#2, content#3], false


# Task 3 - Using Dataframes (6 points)

## Task 3.1 (3 points)
Use Spark Dataframe to provide summary statistics (max, average, median, std) about the
number of Wikipedia categories used for Wikipedia pages. Print the results on the output console,
or store them on the cloud storage.


In [7]:
import pandas as pd
categories_data = pd.read_csv('wiki-categorylinks-small.csv')  # Adjust the path as per your setup
categories_data.columns
# Assuming there's a common identifier (e.g., doc_id) to merge datasets

Index(['434042', '1987_debut_albums'], dtype='object')

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, expr
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("WikipediaCategoriesSummary") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Define schema for the CSV file
schema = StructType([
    StructField("cl_from", IntegerType(), True),
    StructField("cl_to", StringType(), True)  # Adjust as per actual columns in your CSV
    # Add more StructField entries as needed for other columns
])

# Load the dataset with schema and without headers
file_path = 'wiki-categorylinks-small.csv'
df = spark.read.csv(file_path, schema=schema)

# Ensure 'cl_from' column is treated as IntegerType
df = df.withColumn("cl_from", col("cl_from").cast(IntegerType()))

# Group by 'cl_from' and count distinct categories per page
category_counts = df.groupBy("cl_from").agg(count("*").alias("category_count"))

# Calculate summary statistics
summary_stats = category_counts.select(
    expr("max(category_count)").alias("max_categories"),
    expr("avg(category_count)").alias("avg_categories"),
    expr("percentile_approx(category_count, 0.5)").alias("median_categories"),
    expr("stddev_pop(category_count)").alias("std_categories")
).collect()

# Display results
for row in summary_stats:
    max_categories = row["max_categories"]
    avg_categories = row["avg_categories"]
    median_categories = row["median_categories"]
    std_categories = row["std_categories"]

    print(f"Max categories: {max_categories}")
    print(f"Avg categories: {avg_categories}")
    print(f"Median categories: {median_categories}")
    print(f"Std categories: {std_categories}")




24/05/23 08:19:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 21:>                                                         (0 + 1) / 1]

Max categories: 76
Avg categories: 13.793793793793794
Median categories: 10
Std categories: 11.056480782274724


                                                                                

## task 3.2
Use Spark Dataframe to find the top 10 most used Wikipedia categories. Print the results on the
output console, or store them on the cloud storage

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Top10WikipediaCategories") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load the dataset (assuming 'wiki-categorylinks-small.csv' without headers)
file_path = 'wiki-categorylinks-small.csv'

# Define schema for the CSV file
schema = "cl_from INT, cl_to STRING"

# Read CSV into DataFrame
df = spark.read.csv(file_path, schema=schema)

# Group by 'cl_to' and count occurrences
category_counts = df.groupBy("cl_to").agg(count("*").alias("category_count"))

# Find top 10 most used categories
top_10_categories = category_counts.orderBy(col("category_count").desc()).limit(10)

# Show results (print to console)
top_10_categories.show(truncate=False)




24/05/23 08:19:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------------------------------------------+--------------+
|cl_to                                            |category_count|
+-------------------------------------------------+--------------+
|Articles_with_short_description                  |273           |
|All_articles_with_unsourced_statements           |222           |
|Wikipedia_articles_with_VIAF_identifiers         |212           |
|Wikipedia_articles_with_WorldCat-VIAF_identifiers|212           |
|Wikipedia_articles_with_LCCN_identifiers         |196           |
|Commons_category_link_is_on_Wikidata             |178           |
|Wikipedia_articles_with_ISNI_identifiers         |157           |
|Wikipedia_articles_with_GND_identifiers          |155           |
|All_articles_needing_additional_references       |154           |
|Webarchive_template_wayback_links                |140           |
+-------------------------------------------------+--------------+



# Task 4 - Removing Stop Words, do Stemming, and redo task 2 (2 points)

## Task 4.1 - Remove Stop Words (1 point)
Describe if removing the English Stop words (most common words like ”a”, “the”, “is”, “are”,
“i”, “you”, ...”) would change the final kNN results here.
Would your result be changed heavily after removing the stop words? Why? Provide reasons.
You do not need to implement this task, only discuss your expected outcome results.

Removing English stop words like "a", "the", "is", "are", "I", "you", etc., can potentially impact the final kNN (k-Nearest Neighbors) results in text classification tasks. Here’s how it could influence the outcomes:

1. **Impact on Text Representation**: Stop words are very common and appear frequently in almost all documents. When performing kNN classification using TF-IDF (Term Frequency-Inverse Document Frequency) or other text representation methods, these stop words contribute significantly to the term frequencies. Removing them can change the vector representations of documents. Without stop words, the remaining content words may have more weight in determining document similarity.

2. **Effect on Similarity Measures**: TF-IDF calculates similarity based on the presence and frequency of terms. If stop words are included, they can dominate the similarity calculation without contributing much to the semantic meaning of the text. Removing stop words can improve the relevance of the top k nearest neighbors by focusing on content-bearing words that better represent the document's theme or topic.

3. **Classification Accuracy**: The removal of stop words can lead to more accurate classifications in some cases. This is because irrelevant words (stop words) that do not contribute to the document's semantics are eliminated. Consequently, the algorithm may find more relevant documents based on the actual content words that remain after stop word removal.

4. **Data Sparsity and Noise Reduction**: In large datasets, including stop words can contribute to noise and increase data sparsity. Removing them can mitigate this issue by focusing on meaningful terms, potentially improving the overall robustness and efficiency of the kNN algorithm.

**Expected Outcome**:
- The result of kNN classification is likely to change after removing stop words, albeit the extent of change can vary. In scenarios where documents rely heavily on content words for classification (e.g., distinguishing between topics or themes), removing stop words could lead to more accurate and relevant nearest neighbors being identified.
  


## Task 4.2 - Do English word stemming (1 point)
We can stem the words [”game”,”gaming”,”gamed”,”games”] to their root word ”game”. Read
more about stemming here https://en.wikipedia.org/wiki/Stemming.
Would stemming change your result heavily? Why? Provide reasons and describe them.
You do not need to implement this task, only discuss your expected outcome results.

In summary, while stemming can enhance the effectiveness of kNN by simplifying and normalizing text representations, its impact on the final results depends on the context and objectives of the text classification task.