The first part is data preparation and spark setting up.

# **Prepare & Set up**

## **Data collection.**

---



In [1]:
# Create a folder only when it does not exist.
%mkdir -p wikipedia-ml-raw

In [2]:
# Create a script
%%file main.sh
#!/bin/bash

# Give the path of the file as first argument to the script:
input="$1" 

while IFS= read -r var 
do
  # Get each id
  id=`echo $var` 
  link="https://en.wikipedia.org/w/index.php?title=Machine_learning&oldid=$id"
  downloadname="index.php?title=Machine_learning&oldid=$id.html"
  filename="wikipedia-ml-raw/machine-learning-$id.html"
  echo $filename
  # Download the file from wikipedia
  wget -E $link
  # Move the downloaded file to the specific directory and rename it
  mv $downloadname $filename
done <"$input"

Writing main.sh


In [4]:
# Run the script
!bash main.sh article-ids.txt

wikipedia-ml-raw/machine-learning-530966344.html
--2023-04-30 10:09:14--  https://en.wikipedia.org/w/index.php?title=Machine_learning&oldid=530966344
Resolving en.wikipedia.org (en.wikipedia.org)... 208.80.153.224, 2620:0:860:ed1a::1
Connecting to en.wikipedia.org (en.wikipedia.org)|208.80.153.224|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘index.php?title=Machine_learning&oldid=530966344.html’

index.php?title=Mac     [ <=>                ] 127.78K  --.-KB/s    in 0.1s    

2023-04-30 10:09:14 (1.15 MB/s) - ‘index.php?title=Machine_learning&oldid=530966344.html’ saved [130843]

wikipedia-ml-raw/machine-learning-561799120.html
--2023-04-30 10:09:14--  https://en.wikipedia.org/w/index.php?title=Machine_learning&oldid=561799120
Resolving en.wikipedia.org (en.wikipedia.org)... 208.80.153.224, 2620:0:860:ed1a::1
Connecting to en.wikipedia.org (en.wikipedia.org)|208.80.153.224|:443... connected.
HTTP request sent, awaiting re

In [5]:
# Run the parse_article script
!bash parse_article.sh

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## **Set up Spark & PySpark**

In [6]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!rm spark-3.3.1-bin-hadoop3.tgz   # Tidying up
# Setting up environmental variables: 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

--2023-04-30 10:10:01--  https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2023-04-30 10:10:54 (5.42 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]



In [7]:
# install the findpark library to locate Spark
!pip install -q findspark
import findspark
findspark.init()

In [8]:
# import SparkSession from pyspark.sql to create entry point to Spark.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method

In [9]:
# Create SparkContext:
sc = spark.sparkContext

# **Data Processing with Spark**

## **Co-occurrence MapReduce with Spark**

In [10]:
# Read stopwords
stopwords = sc.textFile("stopwords.txt").flatMap(lambda line: line.split()).collect()

In [11]:
# Define a function to filter refs
def is_article(title):
  return "article" in title

In [12]:
# Read all files in the folder "wikipedia-ml"
all_file_rdd = sc.wholeTextFiles("wikipedia-ml")

In [13]:
# Remove refs
all_article_rdd = all_file_rdd.filter(lambda x: is_article(x[0]))

In [14]:
# Get all articles' paths
all_article_paths_rdd = all_article_rdd.map(lambda x: x[0])

In [15]:
print(all_article_paths_rdd.collect())

['file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2016_1_1.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2014_6_4.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2017_6_29.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2017_1_1.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2019_1_2.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2014_1_4.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2023_4_29.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2013_6_27.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2013_1_2.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2016_6_30.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2021_6_19.txt', 'file:/content/wikipedia-ml/article_Machine learning - Wikipedia_2022_6_28.txt', 'file:/content/wikipedia-ml/artic

In [16]:
# Use a dictionary to store RDDs of every year
year_rdd_dict = {}

In [17]:
# Define a method to get the years of the articles
def get_year(title):
  date_info = title.split("-")[2]
  year = date_info.split("_")[1]
  return year

In [18]:
# Extract every year when there's articles collected
all_year = all_article_rdd.map(lambda x: get_year(x[0])).distinct()

In [19]:
# Define a function for removing punctuations
def lower_clean_str(x):
  punc='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'
  lowercased_str = x.lower()
  for ch in punc:
    lowercased_str = lowercased_str.replace(ch, '')
  return lowercased_str

# Define a function for filtering stopwords
def not_stopwords(word):
  return not(word in stopwords)

In [20]:
import itertools
for article_path in all_article_paths_rdd.collect():
  # Read one article as an RDD:
  text_rdd = sc.textFile(article_path)
  # compute the co-occurrence RDD
  co_occur_rdd = text_rdd.map(lambda line: lower_clean_str(line).split()) \
                  .flatMap(lambda word: itertools.combinations(word, 2)) \
                  .filter(lambda comb: not_stopwords(comb[0]) and not_stopwords(comb[1])) \
                  .filter(lambda comb: comb[0] != comb[1]) \
                  .filter(lambda comb: len(comb[0]) > 1 and len(comb[1]) > 1) \
                  .map(lambda comb: (comb, 1))
                  
  # Store RDDs to the dictionary {year, RDD-union}
  article_year = get_year(article_path)
  if(not(article_year in year_rdd_dict)):
    year_rdd_dict[article_year] = co_occur_rdd
  else:
    unioned_rdd = year_rdd_dict[article_year].union(co_occur_rdd)
    year_rdd_dict[article_year] = unioned_rdd


In [21]:
# create dataframe for each year and compute top ten co-occurances
for year in year_rdd_dict:
  print(year)
  rdd = year_rdd_dict[year].reduceByKey(lambda x, y: x + y)
  # Create dataframe
  columns = ["word_0", "word_1", "count"]
  df = rdd.map(lambda l: (l[0][0],l[0][1],l[1])).toDF(columns)
  df.orderBy("count",ascending=False).show(10)

2016
+----------+----------+-----+
|    word_0|    word_1|count|
+----------+----------+-----+
|   machine|  learning|  129|
|  learning|      data|   83|
|  learning|algorithms|   72|
|   machine|      data|   50|
|  learning|   machine|   45|
|  learning|  training|   44|
|  learning|       can|   41|
|algorithms|  learning|   38|
|  learning| knowledge|   36|
|  learning|      time|   36|
+----------+----------+-----+
only showing top 10 rows

2014
+----------+--------------+-----+
|    word_0|        word_1|count|
+----------+--------------+-----+
|  learning|          data|   68|
|   machine|      learning|   67|
|  learning|    algorithms|   55|
|  learning|representation|   40|
|  learning|           can|   39|
|algorithms|representation|   38|
|  learning|     knowledge|   36|
|algorithms|      learning|   35|
|   machine|          data|   34|
|algorithms|      features|   33|
+----------+--------------+-----+
only showing top 10 rows

2017
+--------+----------+-----+
|  word_0

## **n-gram MapReduce with Spark**

In [22]:
# n-gram function
def generate_ngram(word_list, n):
  ngram_list = []
  for i in range(0, len(word_list)-n+1):
    n_gram = (word_list[i],word_list[i+1],word_list[i+2])
    ngram_list.append(n_gram)
  return ngram_list

In [23]:
year_ngram_rdd_dict = {}

In [24]:
for article_path in all_article_paths_rdd.collect():
  # Read one article as an RDD:
  text_rdd = sc.textFile(article_path)
  # compute the 3-gram RDD
  ngram_rdd = text_rdd.map(lambda line: lower_clean_str(line).split()) \
                       .flatMap(lambda word_list: generate_ngram(word_list, 3)) \
                       .map(lambda ngram: (ngram, 1))
                  
  # Store RDDs to the dictionary {year, RDD-union}
  article_year = get_year(article_path)
  if(not(article_year in year_ngram_rdd_dict)):
    year_ngram_rdd_dict[article_year] = ngram_rdd
  else:
    unioned_rdd = year_ngram_rdd_dict[article_year].union(ngram_rdd)
    year_ngram_rdd_dict[article_year] = unioned_rdd

In [25]:
for year in year_ngram_rdd_dict:
  print(year)
  rdd = year_ngram_rdd_dict[year].reduceByKey(lambda x, y: x + y)
  # Create dataframe
  columns = ["word_0", "word_1", "word_2", "count"]
  df = rdd.map(lambda l: (l[0][0],l[0][1],l[0][2],l[1])).toDF(columns)
  df.orderBy("count",ascending=False).show(10)

2016
+-------------+----------+----------+-----+
|       word_0|    word_1|    word_2|count|
+-------------+----------+----------+-----+
|           of|   machine|  learning|   13|
|      machine|  learning|        is|   11|
|            a|       set|        of|   10|
|       sparse|dictionary|  learning|    8|
|      machine|  learning|algorithms|    6|
|           in|polynomial|      time|    6|
|         been|   applied|        in|    6|
|         with|   respect|        to|    6|
|   algorithms|   attempt|        to|    6|
|computational|  learning|    theory|    6|
+-------------+----------+----------+-----+
only showing top 10 rows

2014
+-------------+----------+----------+-----+
|       word_0|    word_1|    word_2|count|
+-------------+----------+----------+-----+
|           of|   machine|  learning|   12|
|            a|       set|        of|   10|
|      machine|  learning|algorithms|    7|
|       sparse|dictionary|  learning|    7|
|           in|polynomial|      time|   