# Demonstration of using Spark to perform simple search queries over the inverted index

## A helper function that reads the hadoop mapreduce output file into a list of (keyword, [(file1, location), (file2, location)... (filen, location)])

In [99]:
def build_words_map(file_name):
    import re

    """
    Build the word_map.
    It's a dictionary of words with dictionaries of books with lists of occurrence locations
    :param file_name:
    :return: None. words_map is built
    """
    result = []
    f = open(file_name)

    # Foreach line in the file
    for l in f:
        # Split the line by tabs, the word is index 0,
        # The remaining indices are occurrences of the word in the format '<book_title, offset_value>'
        curr_data = l.split('\t')

        curr_word = curr_data[0].lower()
        
        # Foreach word occurrence
        lst = []
        for c in curr_data[1:]:
            # Split the occurrence into [book_title, occurrence_offset]
            curr_occurrence = re.sub('[<>]', '', c).split(',')
            if len(curr_occurrence) != 2:
                continue

            curr_book, curr_offset = curr_occurrence

            # Add the occurrences of this word's entries to the book's dictionary
            lst.append((curr_book, curr_offset))
        result.append((curr_word, lst))
    
    return result



### Use the helper function to load the inverted index file into a usable structure for Spark

In [111]:
fn = "/home/kris/Desktop/shakespeare/index/part-r-00000"
lst = build_words_map(fn)

### Load the list into an RDD

In [112]:
d = sc.parallelize(lst)

### Simple keyword search based on a single query term.  Using the filter function, we can look through the RDD and pull out values that contain that keyword

In [113]:
key = 'accuse'
d.filter(lambda x: x[0] == key).map(lambda x: x[1]).collect()

[[('alls_well_that_ends_well.txt', '7335'),
  ('antony_and_cleopatra.txt', '80909'),
  ('antony_and_cleopatra.txt', '119006'),
  ('cymbeline.txt', '50279'),
  ('cymbeline.txt', '67245'),
  ('cymbeline.txt', '79323'),
  ('cymbeline.txt', '143586'),
  ('first_part_henry_sixth.txt', '60327'),
  ('first_part_henry_sixth.txt', '134505'),
  ('hamlet.txt', '80226'),
  ('king_henry_eighth.txt', '67121'),
  ('king_henry_eighth.txt', '139404'),
  ('king_henry_eighth.txt', '139706'),
  ('king_lear.txt', '99511'),
  ('king_richard_the_third.txt', '13861'),
  ('king_richard_the_third.txt', '14112'),
  ('king_richard_the_third.txt', '50512'),
  ('measure_for_measure.txt', '104050'),
  ('measure_for_measure.txt', '108558'),
  ('measure_for_measure.txt', '118855'),
  ('measure_for_measure.txt', '124086'),
  ('measure_for_measure.txt', '124278'),
  ('merry_wives_of_windsor.txt', '36421'),
  ('much_ado_about_nothing.txt', '100158'),
  ('much_ado_about_nothing.txt', '110158'),
  ('pericles.txt', '74620')

### Using the same phrase as was used to demonstrate the command line query program we should expect to see the same results

In [133]:
keys = "five wits went halting off".split()

### Convert the phrase into an rdd with each keyword as the value.  We're going to use this to do a join on the inverted index

In [139]:
krdd = sc.parallelize(keys).map(lambda x: (x, 0))

### We store the results into an RDD for further processing.  We also save the files and offset information for use later

In [194]:
results = krdd.join(d).map(lambda x: (x[0], x[1][1]))
results.collect()

[('halting',
  [('cymbeline.txt', '91744'),
   ('hamlet.txt', '378458'),
   ('history_of_troilus_and_cressida.txt', '272858'),
   ('king_john.txt', '116202'),
   ('much_ado_about_nothing.txt', '4628'),
   ('much_ado_about_nothing.txt', '143396'),
   ('two_noble_kinsmen.txt', '133947')]),
 ('wits',
  [('as_you_like_it.txt', '11982'),
   ('as_you_like_it.txt', '12120'),
   ('as_you_like_it.txt', '112154'),
   ('comedy_of_errors.txt', '71611'),
   ('comedy_of_errors.txt', '73602'),
   ('comedy_of_errors.txt', '74083'),
   ('first_part_henry_sixth.txt', '130419'),
   ('hamlet.txt', '98720'),
   ('hamlet.txt', '134212'),
   ('hamlet.txt', '134599'),
   ('hamlet.txt', '154428'),
   ('hamlet.txt', '154732'),
   ('hamlet.txt', '265656'),
   ('hamlet.txt', '365875'),
   ('hamlet.txt', '387865'),
   ('hamlet.txt', '482465'),
   ('history_of_troilus_and_cressida.txt', '175375'),
   ('history_of_troilus_and_cressida.txt', '233577'),
   ('history_of_troilus_and_cressida.txt', '258574'),
   ('histor

In [191]:
filesAndOffset = results.flatMapValues(lambda x: x).map(lambda x: x[1])
filesAndOffset.collect()

[('cymbeline.txt', '91744'),
 ('hamlet.txt', '378458'),
 ('history_of_troilus_and_cressida.txt', '272858'),
 ('king_john.txt', '116202'),
 ('much_ado_about_nothing.txt', '4628'),
 ('much_ado_about_nothing.txt', '143396'),
 ('two_noble_kinsmen.txt', '133947'),
 ('as_you_like_it.txt', '11982'),
 ('as_you_like_it.txt', '12120'),
 ('as_you_like_it.txt', '112154'),
 ('comedy_of_errors.txt', '71611'),
 ('comedy_of_errors.txt', '73602'),
 ('comedy_of_errors.txt', '74083'),
 ('first_part_henry_sixth.txt', '130419'),
 ('hamlet.txt', '98720'),
 ('hamlet.txt', '134212'),
 ('hamlet.txt', '134599'),
 ('hamlet.txt', '154428'),
 ('hamlet.txt', '154732'),
 ('hamlet.txt', '265656'),
 ('hamlet.txt', '365875'),
 ('hamlet.txt', '387865'),
 ('hamlet.txt', '482465'),
 ('history_of_troilus_and_cressida.txt', '175375'),
 ('history_of_troilus_and_cressida.txt', '233577'),
 ('history_of_troilus_and_cressida.txt', '258574'),
 ('history_of_troilus_and_cressida.txt', '259829'),
 ('history_of_troilus_and_cressida.t

In [195]:
fileToWord = results.flatMapValues(lambda x: x).map(lambda x: (x[1][0], x[0]))
fileToWord.collect()

[('cymbeline.txt', 'halting'),
 ('hamlet.txt', 'halting'),
 ('history_of_troilus_and_cressida.txt', 'halting'),
 ('king_john.txt', 'halting'),
 ('much_ado_about_nothing.txt', 'halting'),
 ('much_ado_about_nothing.txt', 'halting'),
 ('two_noble_kinsmen.txt', 'halting'),
 ('as_you_like_it.txt', 'wits'),
 ('as_you_like_it.txt', 'wits'),
 ('as_you_like_it.txt', 'wits'),
 ('comedy_of_errors.txt', 'wits'),
 ('comedy_of_errors.txt', 'wits'),
 ('comedy_of_errors.txt', 'wits'),
 ('first_part_henry_sixth.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('hamlet.txt', 'wits'),
 ('history_of_troilus_and_cressida.txt', 'wits'),
 ('history_of_troilus_and_cressida.txt', 'wits'),
 ('history_of_troilus_and_cressida.txt', 'wits'),
 ('history_of_troilus_and_cressida.txt', 'wits'),
 ('history_of_troilus_and_cressida.txt', 'wits'),
 ('history

### Here, we take the file to word mapping from above, group by key (file), and map the list of keywords for each file into a set.  Then, we filter out files that don't contain the full set of keywords we queried.  

In [196]:
filesIntersection = fileToWord.groupByKey().mapValues(set).filter(lambda x: len(x[1]) == len(keys)).map(lambda x: (x[0],0))
filesIntersection.collect()

[('two_noble_kinsmen.txt', 0),
 ('history_of_troilus_and_cressida.txt', 0),
 ('much_ado_about_nothing.txt', 0),
 ('hamlet.txt', 0)]

### With that done, we can join again on the file and offset mapping from above.  This way, we only report the set of files that contains all the keywords.  After performing another grouping and mapping, we can get the filename and offsets in a sorted list.  This output should match that of the command line interface.  

In [202]:
fileOffsetIntersection = filesIntersection.join(filesAndOffset).map(lambda x: (x[0], int(x[1][1])))
fileOffsetIntersection.groupByKey().map(list).map(lambda x: (x[0], sorted(x[1]))).collect()

[('much_ado_about_nothing.txt',
  [1600,
   4628,
   4628,
   4628,
   4628,
   4628,
   14564,
   16303,
   16699,
   37034,
   77058,
   80787,
   81230,
   84851,
   87780,
   87851,
   111757,
   118218,
   119025,
   119271,
   123725,
   143396,
   150949,
   179950,
   190216,
   199125,
   211894,
   222772,
   224708,
   227402,
   235056,
   235734,
   253571,
   256022,
   271364,
   280719,
   296632,
   316121]),
 ('two_noble_kinsmen.txt',
  [1337,
   7036,
   7330,
   8002,
   14209,
   16874,
   17408,
   20886,
   50053,
   53505,
   56555,
   59046,
   62241,
   65007,
   65669,
   79386,
   80893,
   85204,
   95186,
   103743,
   112810,
   116029,
   119121,
   124654,
   127556,
   131293,
   133947,
   136513,
   137530,
   139568,
   140168,
   143878,
   146801,
   161436,
   164917,
   167771,
   173475,
   175668,
   182724,
   185237,
   198617,
   219697,
   220448,
   220601,
   234336,
   236558,
   237648,
   238288,
   246116,
   253193,
   255465,
   26