In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Connecting to security.ubuntu.com (91.189.91.83)] [Connecting to cloud.r-project.org] [Connectin                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [3,614 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,517 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packag

In [None]:
import os
import time
from pyspark.sql import SparkSession

In [None]:
!unzip pagecounts-20160101-000000_parsed.out.zip

Archive:  pagecounts-20160101-000000_parsed.out.zip
  inflating: pagecounts-20160101-000000_parsed.out  


In [None]:
# prompt: mount drive

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
# Create Spark session
spark = SparkSession.builder \
    .appName("WikimediaPageViewAnalysis") \
    .getOrCreate()

# Replace "your_data.out" with the actual path/name of your extracted file
rdd = spark.sparkContext \
    .textFile("/content/drive/MyDrive/pagecounts/pagecounts-20160101-000000_parsed.out") \
    .filter(lambda line: line.strip() != "")  # Filter out any empty lines

# Each line => ["ace", "Asia", "1", "25859"] etc.
parsed_rdd = rdd.map(lambda line: line.split())

# For clarity, create an RDD of tuples:
# (project_code, page_title, page_hits, page_size)
# converting page_hits and page_size to integers
data_rdd = parsed_rdd.map(lambda x: (x[0], x[1], int(x[2]), int(x[3])) if len(x) == 4 else None) \
                      .filter(lambda x: x is not None)

In [None]:
# -- Start timing --
start_time_mr_q1 = time.time()

page_sizes = data_rdd.map(lambda row: row[3])  # row[3] is page_size

min_size = page_sizes.reduce(lambda a, b: a if a < b else b)
max_size = page_sizes.reduce(lambda a, b: a if a > b else b)

# For average: sum all page sizes, then divide by count
sum_sizes = page_sizes.reduce(lambda a, b: a + b)
count_sizes = page_sizes.count()
avg_size = sum_sizes / count_sizes if count_sizes > 0 else 0

end_time_mr_q1 = time.time()

print("=== Query 1 (MapReduce) ===")
print(f"Min Page Size: {min_size}")
print(f"Max Page Size: {max_size}")
print(f"Average Page Size: {avg_size}")
print(f"Time taken: {end_time_mr_q1 - start_time_mr_q1:.4f} seconds\n")

=== Query 1 (MapReduce) ===
Min Page Size: 0
Max Page Size: 141180155987
Average Page Size: 132215.79814237313
Time taken: 66.5936 seconds



In [None]:
# -- Start timing --
start_time_mr_q2 = time.time()

titles_start_the = data_rdd.filter(lambda row: row[1].lower().startswith("the"))

total_the = titles_start_the.count()

# Filter further for non-English
non_en_the = titles_start_the.filter(lambda row: not row[0].startswith("en")).count()

end_time_mr_q2 = time.time()

print("=== Query 2 (MapReduce) ===")
print(f"Number of page titles starting with 'The': {total_the}")
print(f"Number of those not in English project: {non_en_the}")
print(f"Time taken: {end_time_mr_q2 - start_time_mr_q2:.4f} seconds\n")

=== Query 2 (MapReduce) ===
Number of page titles starting with 'The': 45210
Number of those not in English project: 9239
Time taken: 35.0459 seconds



In [None]:
# -- Start timing --
start_time_mr_q3 = time.time()

# Split each title on '_', flatten, and get distinct
unique_terms = data_rdd.flatMap(lambda row: row[1].lower().split("_")).distinct().count()

end_time_mr_q3 = time.time()

print("=== Query 3 (MapReduce) ===")
print(f"Number of unique terms in page titles: {unique_terms}")
print(f"Time taken: {end_time_mr_q3 - start_time_mr_q3:.4f} seconds\n")

=== Query 3 (MapReduce) ===
Number of unique terms in page titles: 1793107
Time taken: 41.4229 seconds



In [None]:
# -- Start timing --
start_time_mr_q4 = time.time()

# (title, 1) => reduceByKey to get counts
title_counts = data_rdd.map(lambda row: (row[1], 1)) \
    .reduceByKey(lambda a, b: a + b)

# Collect to view or save results
title_counts_result = title_counts.collect()

end_time_mr_q4 = time.time()

top_titles = sorted(title_counts_result, key=lambda x: x[1], reverse=True)[:10]

print("=== Query 4 (MapReduce) ===")
print("Sample of title frequencies:")
for t, c in title_counts_result[:5]:  # just show first 5
    print(f"{t}: {c}")
print(f"Time taken: {end_time_mr_q4 - start_time_mr_q4:.4f} seconds\n")
print("=== Top 10 ===")
for t, c in top_titles:
    print(f"{t}: {c}")

=== Query 4 (MapReduce) ===
Sample of title frequencies:
E.Desv: 6
Special:Contributions/5.232.61.79: 1
Special:ListFiles/Nyttend: 1
Special:WhatLinksHere/Main_Page: 8
Time_Inc: 4
Time taken: 42.1990 seconds

=== Top 10 ===
water: 118
1863: 106
Google: 101
Berlin: 101
Linux: 98
Main_Page: 90
ISO_3166-1: 88
Microsoft_Windows: 87
Index.php: 86
HTML: 86


In [None]:
# -- Start timing --
start_time_mr_q5 = time.time()

# groupByKey on title => RDD of (title, iterable_of_records)
title_groups = data_rdd.map(lambda row: (row[1], row)) \
    .groupByKey()

# Example to display how data is combined:
combined_results = title_groups.mapValues(list).collect()

end_time_mr_q5 = time.time()

print("=== Query 5 (MapReduce) ===")
print("Sample combined data for the first few titles:")
for title, records in combined_results[:3]:  # just first 3
    print(f"Title: {title}")
    for r in records:
        print(f"    {r}")
print(f"Time taken: {end_time_mr_q5 - start_time_mr_q5:.4f} seconds\n")

=== Query 5 (MapReduce) ===
Sample combined data for the first few titles:
Title: E.Desv
    ('aa', 'E.Desv', 1, 4662)
    ('arc', 'E.Desv', 1, 5210)
    ('ast', 'E.Desv', 1, 4825)
    ('fiu-vro', 'E.Desv', 1, 5237)
    ('fr', 'E.Desv', 1, 7057)
    ('ik', 'E.Desv', 1, 4548)
Title: Special:Contributions/5.232.61.79
    ('aa', 'Special:Contributions/5.232.61.79', 1, 5805)
Title: Special:ListFiles/Nyttend
    ('aa', 'Special:ListFiles/Nyttend', 1, 5032)
Time taken: 81.1058 seconds



In [None]:
# Open output file for writing
with open("Output.txt", "w") as output_file:

    # Query 1 (MapReduce)
    output_file.write("=== Query 1 (MapReduce) ===\n")
    output_file.write(f"Min Page Size: {min_size}\n")
    output_file.write(f"Max Page Size: {max_size}\n")
    output_file.write(f"Average Page Size: {avg_size}\n")
    output_file.write(f"Time taken: {end_time_mr_q1 - start_time_mr_q1:.4f} seconds\n\n")

    # Query 2 (MapReduce)
    output_file.write("=== Query 2 (MapReduce) ===\n")
    output_file.write(f"Number of page titles starting with 'The': {total_the}\n")
    output_file.write(f"Number of those not in English project: {len(non_en_the)}\n")
    output_file.write(f"Time taken: {end_time_mr_q2 - start_time_mr_q2:.4f} seconds\n\n")

    # Query 3 (MapReduce)
    output_file.write("=== Query 3 (MapReduce) ===\n")
    output_file.write(f"Number of unique terms in page titles: {len(unique_terms)}\n")
    output_file.write(f"Time taken: {end_time_mr_q3 - start_time_mr_q3:.4f} seconds\n\n")

    # Query 4 (MapReduce)
    output_file.write("=== Query 4 (MapReduce) ===\n")
    output_file.write("Sample of title frequencies:\n")
    for t, c in title_counts_result[:5]:
        output_file.write(f"{t}: {c}\n")
    output_file.write(f"Time taken: {end_time_mr_q4 - start_time_mr_q4:.4f} seconds\n")
    output_file.write("=== Top 10 ===\n")
    for t, c in top_titles:
        output_file.write(f"{t}: {c}\n")
    output_file.write("\n")

    # Query 5 (MapReduce)
    output_file.write("=== Query 5 (MapReduce) ===\n")
    output_file.write("Sample combined data for the first few titles:\n")
    for title, records in combined_results[:3]:
        output_file.write(f"Title: {title}\n")
        for r in records:
            output_file.write(f"    {r}\n")
    output_file.write(f"Time taken: {end_time_mr_q5 - start_time_mr_q5:.4f} seconds\n\n")

In [None]:
# -- Start timing --
start_time_loops_q1 = time.time()

# Collect all page sizes into a Python list
page_sizes_list = data_rdd.map(lambda row: row[3]).collect()  # row[3] is page_size

# Compute min, max, and average using Python functions
min_size = min(page_sizes_list) if page_sizes_list else 0
max_size = max(page_sizes_list) if page_sizes_list else 0
avg_size = sum(page_sizes_list) / len(page_sizes_list) if page_sizes_list else 0

end_time_loops_q1 = time.time()

print("=== Query 1 (Spark Loops) ===")
print(f"Min Page Size: {min_size}")
print(f"Max Page Size: {max_size}")
print(f"Average Page Size: {avg_size}")
print(f"Time taken: {end_time_loops_q1 - start_time_loops_q1:.4f} seconds\n")

=== Query 1 (Spark Loops) ===
Min Page Size: 0
Max Page Size: 141180155987
Average Page Size: 132215.79814237313
Time taken: 15.0592 seconds



In [None]:
# -- Start timing --
start_time_loops_q2_2 = time.time()

# Collect all rows into a Python list
all_rows = data_rdd.collect()

# Filter titles that start with "The" (case-insensitive)
titles_start_the = [row for row in all_rows if row[1].lower().startswith("the")]

# Total number of such titles
total_the = len(titles_start_the)

# Filter further for titles not in English project (not starting with "en")
non_en_the = [row for row in titles_start_the if not row[0].startswith("en")]

end_time_loops_q2 = time.time()

print("=== Query 2 (Spark Loops) ===")
print(f"Number of page titles starting with 'The': {total_the}")
print(f"Number of those not in English project: {len(non_en_the)}")
print(f"Time taken: {end_time_loops_q2 - start_time_loops_q2:.4f} seconds\n")

=== Query 2 (Spark Loops) ===
Number of page titles starting with 'The': 45210
Number of those not in English project: 9239
Time taken: 950.9160 seconds



In [None]:
# -- Start timing --
start_time_loops_q3 = time.time()

# Collect all titles into a Python list
titles_list = data_rdd.map(lambda row: row[1].lower()).collect()

# Split each title into terms (split by "_"), and collect unique terms using a set
unique_terms = set()
for title in titles_list:
    terms = title.split("_")
    unique_terms.update(terms)  # Add terms to the set

end_time_loops_q3 = time.time()

print("=== Query 3 (Spark Loops) ===")
print(f"Number of unique terms in page titles: {len(unique_terms)}")
print(f"Time taken: {end_time_loops_q3 - start_time_loops_q3:.4f} seconds\n")

=== Query 3 (Spark Loops) ===
Number of unique terms in page titles: 1793107
Time taken: 20.2332 seconds



In [None]:
# -- Start timing --
start_time_loops_q4 = time.time()

# Collect all titles into a Python list
titles_list = data_rdd.map(lambda row: row[1]).collect()

# Count frequency of each title using a dictionary
title_counts = {}
for title in titles_list:
    if title not in title_counts:
        title_counts[title] = 0
    title_counts[title] += 1

# Convert the dictionary to a sorted list of tuples (sorted by frequency)
title_counts_result = list(title_counts.items())
top_titles = sorted(title_counts_result, key=lambda x: x[1], reverse=True)[:10]

end_time_loops_q4 = time.time()

print("=== Query 4 (Spark Loops) ===")
print("Sample of title frequencies:")
for t, c in title_counts_result[:5]:  # just show first 5
    print(f"{t}: {c}")
print(f"Time taken: {end_time_loops_q4 - start_time_loops_q4:.4f} seconds\n")
print("=== Top 10 ===")
for t, c in top_titles:
    print(f"{t}: {c}")

=== Query 4 (Spark Loops) ===
Sample of title frequencies:
271_a.C: 4
Category:User_th: 2
Chiron_Elias_Krase: 6
Dassault_rafaele: 3
E.Desv: 6
Time taken: 21.7222 seconds

=== Top 10 ===
water: 118
1863: 106
Berlin: 101
Google: 101
Linux: 98
Main_Page: 90
ISO_3166-1: 88
Microsoft_Windows: 87
Index.php: 86
HTML: 86


In [None]:
# -- Start timing --
start_time_loops_q5 = time.time()

# Collect all rows into a Python list
all_rows = data_rdd.collect()

# Group rows by page title using a dictionary
title_groups = {}
for row in all_rows:
    title = row[1]  # row[1] is page_title
    if title not in title_groups:
        title_groups[title] = []
    title_groups[title].append(row)

# Convert the grouped data into a list of tuples for processing
combined_results = list(title_groups.items())

end_time_loops_q5 = time.time()

print("=== Query 5 (Spark Loops) ===")
print("Sample combined data for the first few titles:")
for title, records in combined_results[:3]:  # just first 3
    print(f"Title: {title}")
    for r in records:
        print(f"    {r}")
print(f"Time taken: {end_time_loops_q5 - start_time_loops_q5:.4f} seconds\n")

=== Query 5 (Spark Loops) ===
Sample combined data for the first few titles:
Title: 271_a.C
    ('aa', '271_a.C', 1, 4675)
    ('az', '271_a.C', 1, 6356)
    ('bcl', '271_a.C', 1, 5068)
    ('be', '271_a.C', 1, 6287)
Title: Category:User_th
    ('aa', 'Category:User_th', 1, 4770)
    ('commons.m', 'Category:User_th', 1, 0)
Title: Chiron_Elias_Krase
    ('aa', 'Chiron_Elias_Krase', 1, 4694)
    ('az', 'Chiron_Elias_Krase', 1, 6374)
    ('bg', 'Chiron_Elias_Krase', 1, 7468)
    ('cho', 'Chiron_Elias_Krase', 1, 4684)
    ('dz', 'Chiron_Elias_Krase', 1, 5435)
    ('it', 'Chiron_Elias_Krase', 1, 5929)
Time taken: 38.0224 seconds



In [None]:
# Open output file for writing
with open("output_spark.txt", "w") as output_file:

    # Repeat the same process for Spark Loops queries:
    # Query 1 (Spark Loops)
    output_file.write("=== Query 1 (Spark Loops) ===\n")
    output_file.write(f"Min Page Size: {min_size}\n")
    output_file.write(f"Max Page Size: {max_size}\n")
    output_file.write(f"Average Page Size: {avg_size}\n")
    output_file.write(f"Time taken: {end_time_loops_q1 - start_time_loops_q1:.4f} seconds\n\n")

    # Query 2 (Spark Loops)
    output_file.write("=== Query 2 (Spark Loops) ===\n")
    output_file.write(f"Number of page titles starting with 'The': {total_the}\n")
    output_file.write(f"Number of those not in English project: {len(non_en_the)}\n")
    output_file.write(f"Time taken: {end_time_loops_q2 - start_time_loops_q2:.4f} seconds\n\n")

    # Query 3 (Spark Loops)
    output_file.write("=== Query 3 (Spark Loops) ===\n")
    output_file.write(f"Number of unique terms in page titles: {len(unique_terms)}\n")
    output_file.write(f"Time taken: {end_time_loops_q3 - start_time_loops_q3:.4f} seconds\n\n")

    # Query 4 (Spark Loops)
    output_file.write("=== Query 4 (Spark Loops) ===\n")
    output_file.write("Sample of title frequencies:\n")
    for t, c in title_counts_result[:5]:
        output_file.write(f"{t}: {c}\n")
    output_file.write(f"Time taken: {end_time_loops_q4 - start_time_loops_q4:.4f} seconds\n")
    output_file.write("=== Top 10 ===\n")
    for t, c in top_titles:
        output_file.write(f"{t}: {c}\n")
    output_file.write("\n")

    # Query 5 (Spark Loops)
    output_file.write("=== Query 5 (Spark Loops) ===\n")
    output_file.write("Sample combined data for the first few titles:\n")
    for title, records in combined_results[:3]:
        output_file.write(f"Title: {title}\n")
        for r in records:
            output_file.write(f"    {r}\n")
    output_file.write(f"Time taken: {end_time_loops_q5 - start_time_loops_q5:.4f} seconds\n\n")
