In [None]:
!pip install ir-datasets
!pip install pyspark
!pip install mysql-connector-python

!apt-get install -y mysql-server
!/etc/init.d/mysql start

from google.colab import drive

# from google.colab import drive
drive.mount('/content/drive')

Collecting ir-datasets
  Downloading ir_datasets-0.5.10-py3-none-any.whl.metadata (12 kB)
Collecting inscriptis>=2.2.0 (from ir-datasets)
  Downloading inscriptis-2.6.0-py3-none-any.whl.metadata (25 kB)
Collecting lxml>=4.5.2 (from ir-datasets)
  Downloading lxml-5.4.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.5 kB)
Collecting trec-car-tools>=2.5.4 (from ir-datasets)
  Downloading trec_car_tools-2.6-py3-none-any.whl.metadata (640 bytes)
Collecting lz4>=3.1.10 (from ir-datasets)
  Downloading lz4-4.4.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Collecting warc3-wet>=0.2.3 (from ir-datasets)
  Downloading warc3_wet-0.2.5-py3-none-any.whl.metadata (2.2 kB)
Collecting warc3-wet-clueweb09>=0.2.5 (from ir-datasets)
  Downloading warc3-wet-clueweb09-0.2.5.tar.gz (17 kB)
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting zl

In [None]:
import ir_datasets
from pyspark import SparkContext, StorageLevel
from pyspark import SparkConf
from collections import Counter

import re
import csv
import os
import time
import pickle

root_directory = "/content/drive/MyDrive/CS532-FinalProject"
# root_directory = "/Users/mjohnson/Workspace/umass/umass-cs532/FinalProject"
dataset_directory = f"{root_directory}/data/ir_datasets"
checkpoint_directory = f"{root_directory}/rdd_checkpoints"
# training_dataset_name = "wikir/en1k/training"
training_dataset_name = "wikir/en78k/training" # update to whole set
training_dataset_filename = training_dataset_name.replace("/", "_")

# read dataset and write to CSV file on Google Drive
# resolves memory issues that occur consequent to initializaing RDD from dataset
#     iterables using SparkContext.parallelize()

if not os.path.exists(dataset_directory):
    os.makedirs(dataset_directory)

# if not os.path.exists(checkpoint_directory):
#     os.makedirs(checkpoint_directory)

if not (os.path.isfile(f"{dataset_directory}/{training_dataset_filename}_docs")
        and os.path.isfile(f"{dataset_directory}/{training_dataset_filename}_queries")
        and os.path.isfile(f"{dataset_directory}/{training_dataset_filename}_qrels")):
    train_dataset = ir_datasets.load(training_dataset_name)

    docs_file = open(f"{dataset_directory}/{training_dataset_filename}_docs", 'w+')
    csv_writer = csv.writer(docs_file, dialect='unix')
    for doc in train_dataset.docs_iter():
        csv_writer.writerow([doc.doc_id, doc.text])
    docs_file.close()

    queries_file = open(f"{dataset_directory}/{training_dataset_filename}_queries", 'w+')
    csv_writer = csv.writer(queries_file, dialect='unix')
    for query in train_dataset.queries_iter():
        csv_writer.writerow([query.query_id, query.text])
    queries_file.close()

    qrels_file = open(f"{dataset_directory}/{training_dataset_filename}_qrels", 'w+')
    csv_writer = csv.writer(qrels_file, dialect='unix')
    for qrel in train_dataset.qrels_iter():
        csv_writer.writerow([qrel.query_id, qrel.doc_id, qrel.relevance, qrel.iteration])
    qrels_file.close()

# map corpus documents to corpus vocabulary and document id pairs
# D -> (D.text.word, D.doc_id)

# stop standalone cluster if it exists (facilitates re-execution)
# try:
#     spark.stop()
# except BaseException:
#     pass # no op

# start spark standalone cluster and load spark session context
conf = SparkConf()
conf.set("spark.cores.max", "96")
conf.set("spark.executor.cores", "1")
conf.set("spark.executor.instances", "96")
conf.set("spark.executor.memory", "3g")
conf.set("spark.executor.pyspark.memory", "2g")
spark = SparkContext(conf=conf)
# spark = SparkContext(master="spark://10.0.0.166:7077", conf=conf)
# spark.setCheckpointDir(checkpoint_directory)
# storage_level = StorageLevel(True, True, False, False, 3)
storage_level = StorageLevel(True, False, False, False, 3)

# load dataset from CSV file to RDD
train_docs_index_rdd = spark.textFile(f"{dataset_directory}/{training_dataset_filename}_docs")
train_docs_index_rdd.persist(storage_level)

# define function for mapping CSV file lines to vocabulary-document-id pairs
def inverted_index_map_function(csv_file_line):
    csv_file_line_elements = csv_file_line.split('\",\"')
    doc_id = re.sub("[^A-Za-z0-9 ]", "", csv_file_line_elements[0])
    words_counter_for_doc = Counter(re.sub("[^A-Za-z0-9 ]", "", csv_file_line_elements[1]).lower().split(' '))
    words_for_doc = list(words_counter_for_doc.keys())
    word_postings_for_doc = list([[doc_id, str(words_counter_for_doc[words_for_doc[i]])]] for i in range(0, len(words_for_doc)))
    return list(zip(words_for_doc, word_postings_for_doc))

# map CSV file to vocabulary-document-id pairs, flattening pairs across documents
elements = train_docs_index_rdd.take(10)
index_map_start_time = time.time()
train_docs_index_rdd = train_docs_index_rdd.flatMap(inverted_index_map_function)
elements = train_docs_index_rdd.take(10)
index_map_end_time = time.time()

print(f"index map execution time: {index_map_end_time - index_map_start_time}")
print(f"{str(elements)[0:1000]}")

train_docs_index_rdd.persist(storage_level)

# confirm mapping is as expected
# elements = train_docs_rdd.take(10)
# print(f"elements: {str(elements)[:1000]}")

# reduce corpus vocabulary term and corpus document id pairs to map of vocab terms to doc id lists
# list((term, (doc_id, term_count))) -> dict({term: list((doc_id, term_count))})

def inverted_index_reduce_function(list_of_doc_ids_for_term_instance_1, list_of_doc_ids_for_term_instance_2):
    list_of_doc_ids_for_term_instance_1 += list_of_doc_ids_for_term_instance_2
    return list_of_doc_ids_for_term_instance_1

elements = train_docs_index_rdd.take(10)
index_reduce_start_time = time.time()
train_docs_index_rdd = train_docs_index_rdd.reduceByKey(inverted_index_reduce_function)
elements = train_docs_index_rdd.take(10)
index_reduce_end_time = time.time()

print(f"reduce execution time: {index_reduce_end_time - index_reduce_start_time}")
print(f"{str(elements)[0:1000]}")

# train_docs_index_rdd = train_docs_index_rdd.repartition(96 * 5)

# elements = train_docs_index_rdd.take(1)
# index_sort_start_time = time.time()
# train_docs_index_rdd = train_docs_index_rdd.sortByKey()
# elements = train_docs_index_rdd.take(1)
# index_sort_end_time = time.time()

# print(f"sort execution time: {index_sort_end_time - index_sort_start_time}")
# print(f"{str(elements)[0:1000]}")

train_docs_rdd = spark.textFile(f"{dataset_directory}/{training_dataset_filename}_docs")
train_docs_rdd.persist(storage_level)

N = train_docs_rdd.count()
k = 1.5
b = 0.75

def compute_doc_lengths(csv_file_line):
    csv_file_line_elements = csv_file_line.split('\",\"')
    words_for_doc = list(set(re.sub("[^A-Za-z0-9 ]", "", csv_file_line_elements[1]).lower().split(' ')))
    return len(words_for_doc)

elements = train_docs_rdd.take(1)
doc_len_mean_start_time = time.time()
train_docs_rdd = train_docs_rdd.map(compute_doc_lengths)
doc_len_mean = train_docs_rdd.mean()
doc_len_max = train_docs_rdd.max()
elements = train_docs_rdd.take(1)
doc_len_mean_end_time = time.time()
del train_docs_rdd

print(f"doc length mean execution time: {doc_len_mean_end_time - doc_len_mean_start_time}")
print(f"doc length mean: {doc_len_mean} doc length max: {doc_len_max}")

train_docs_len_lookup_rdd = spark.textFile(f"{dataset_directory}/{training_dataset_filename}_docs")
train_docs_len_lookup_rdd.persist()

def map_doc_lengths(csv_file_line):
    csv_file_line_elements = csv_file_line.split('\",\"')
    doc_id = re.sub("[^A-Za-z0-9 ]", "", csv_file_line_elements[0])
    words_for_doc = list(set(re.sub("[^A-Za-z0-9 ]", "", csv_file_line_elements[1]).lower().split(' ')))
    return (doc_id, len(words_for_doc))

elements = train_docs_len_lookup_rdd.take(10)
doc_len_lookup_start_time = time.time()
train_docs_len_lookup_rdd = train_docs_len_lookup_rdd.map(map_doc_lengths)
elements = train_docs_len_lookup_rdd.take(10)
doc_len_lookup_end_time = time.time()

print(f"doc length lookup execution time: {doc_len_lookup_end_time - doc_len_lookup_start_time}")
print(f"{str(elements)[0:1000]}")

# def compute_bm_25_score(rdd_element):
#     return train_docs_len_lookup_rdd.lookup(rdd_element[0])

# elements = train_docs_len_lookup_rdd.take(10)
# bm25_lookup_start_time = time.time()
# doc_lengths = train_docs_len_lookup_rdd.map(compute_bm_25_score)
# elements = doc_lengths.take(10)
# bm25_lookup_end_time = time.time()

# print(f"doc length lookup execution time: {bm25_lookup_end_time - bm25_lookup_start_time}")
# print(f"{str(elements)[0:1000]}")

# train_docs_rdd.saveAsTextFile(f"{dataset_directory}/{training_dataset_filename}_index", compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

print("program complete")


index map execution time: 0.44065213203430176
[('these', [['0', '8']]), ('institutions', [['0', '5']]), ('are', [['0', '22']]), ('often', [['0', '8']]), ('described', [['0', '2']]), ('as', [['0', '92']]), ('stateless', [['0', '2']]), ('societies', [['0', '2']]), ('although', [['0', '4']]), ('several', [['0', '3']])]
reduce execution time: 319.04465913772583
[('civilization', [['0', '1'], ['3', '1'], ['16', '4'], ['18', '1'], ['24', '2'], ['26', '1'], ['34', '2'], ['45', '1'], ['47', '1'], ['64', '1'], ['68', '2'], ['81', '3'], ['87', '1'], ['96', '3'], ['109', '3'], ['117', '9'], ['128', '1'], ['132', '1'], ['133', '7'], ['165', '3'], ['187', '1'], ['201', '8'], ['207', '3'], ['215', '1'], ['235', '1'], ['253', '1'], ['255', '1'], ['265', '4'], ['274', '4'], ['290', '1'], ['323', '2'], ['402', '1'], ['415', '1'], ['485', '1'], ['566', '1'], ['599', '4'], ['605', '1'], ['636', '1'], ['638', '1'], ['649', '1'], ['662', '1'], ['665', '2'], ['669', '1'], ['695', '2'], ['713', '4'], ['728',