# DE Challenge
Create an efficient implementation to build an inverted index of a large collection of documents.

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
import sys
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/dechallengedataset/41
/kaggle/input/dechallengedataset/6
/kaggle/input/dechallengedataset/4
/kaggle/input/dechallengedataset/9
/kaggle/input/dechallengedataset/39
/kaggle/input/dechallengedataset/17
/kaggle/input/dechallengedataset/33
/kaggle/input/dechallengedataset/36
/kaggle/input/dechallengedataset/32
/kaggle/input/dechallengedataset/34
/kaggle/input/dechallengedataset/13
/kaggle/input/dechallengedataset/38
/kaggle/input/dechallengedataset/29
/kaggle/input/dechallengedataset/2
/kaggle/input/dechallengedataset/18
/kaggle/input/dechallengedataset/8
/kaggle/input/dechallengedataset/1
/kaggle/input/dechallengedataset/20
/kaggle/input/dechallengedataset/7
/kaggle/input/dechallengedataset/35
/kaggle/input/dechallengedataset/40
/kaggle/input/dechallengedataset/21
/kaggle/input/dechallengedataset/10
/kaggle/input/dechallengedataset/31
/kaggle/input/dechallengedataset/12
/kaggle/input/dechallengedataset/24
/kaggle/input/dechallengedataset/5
/kaggle/input/dechallengedataset/27


# Install Spark

In [2]:
# Install Spark
!cd $HOME && wget http://apachemirror.wuchna.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz && tar -xzf spark-3.0.1-bin-hadoop2.7.tgz
!pip install pyspark

user_home = os.environ['HOME']
os.environ['SPARK_HOME'] = f'{user_home}/spark-3.0.1-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON'] = sys.executable

--2020-09-27 19:19:26--  http://apachemirror.wuchna.com/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
Resolving apachemirror.wuchna.com (apachemirror.wuchna.com)... 159.65.154.237
Connecting to apachemirror.wuchna.com (apachemirror.wuchna.com)|159.65.154.237|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 219929956 (210M) [application/x-gzip]
Saving to: ‘spark-3.0.1-bin-hadoop2.7.tgz’


2020-09-27 19:19:47 (10.2 MB/s) - ‘spark-3.0.1-bin-hadoop2.7.tgz’ saved [219929956/219929956]

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 20 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 63.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | /

# Configure spark

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setMaster(os.getenv('SPARK_MASTER', 'local[*]'))\
    .setAppName("inverted_index_generator")\
    .setSparkHome(os.environ['SPARK_HOME'])
spark_context = SparkContext(conf=conf)
spark_session = SparkSession(spark_context)

# Configure Dataset locations

In [4]:
dataset_location = "/kaggle/input/dechallengedataset"
output_location = "/kaggle/id_files"

# Run the IndexGenerator

In [5]:
from pyspark import SparkContext
import os
import re


class IndexGenerator:
    def __init__(self, spark_context: SparkContext):
        self.spark_context = spark_context

    def generate_word_ids(self, input_files_path: str, output_location: str):
        """Generate Word ID mapping for each word in the files located in the input files location and create id files in the output_location"""
        pattern = re.compile('^[0-9]+$')
        files = [file_name for file_name in os.listdir(input_files_path) if pattern.match(file_name)]
        for file_name in files:
            self._generate_word_id_for_file(os.path.join(input_files_path, file_name), output_location)

    @staticmethod
    def _generate_dictionary_tuple(row, file_base_name):
        """This will remove any special characters from the words and
        return a tuple containing the word, unique index, file name"""
        word = re.sub("[,!*().\\[\\]+:;\"'/]", "", row)
        return word, IndexGenerator.get_hash(word), file_base_name

    @staticmethod
    def get_hash(word):
        """Return 8 character long hash for a given word"""
        return abs(hash(word)) % (10 ** 8)

    def _generate_word_id_for_file(self, file_path: str, output_directory_path: str) -> None:
        """Generate Word ID mapping for each word in the file located in the input files location and create id files in the output_location"""
        file_base_name = int(os.path.basename(file_path))
        self.spark_context.textFile(file_path) \
            .flatMap(lambda x: x.split(' ')) \
            .map(lambda x: IndexGenerator._generate_dictionary_tuple(x, file_base_name)) \
            .distinct() \
            .sortBy(lambda x: x[0])\
            .saveAsPickleFile(os.path.join(output_directory_path, os.path.basename(file_path) + '_id'))

    @staticmethod
    def _concatenate(x, y):
        """Concatenate two items if they are lists, else convert them to a list before concatenation"""
        if not isinstance(x, list):
            x = [x]

        if not isinstance(y, list):
            y = [y]

        return x + y

    @staticmethod
    def _sort(x):
        """Sort the rdd based on the Unique ID"""
        if isinstance(x[1], list):
            return x[0], sorted(x[1])
        else:
            return x[0], [x[1]]

    def collate_documents(self, input_files_path: str, output_directory_path: str):
        """Collate documents created by generate word id function and then created a collated list mapping of each word id and file name
        Create a dictionary that contains the word and its respective id"""
        collated_pickle = self.spark_context.pickleFile(
            os.path.join(input_files_path, '*_id', '*'),
            minPartitions=2)
        collated_pickle.map(lambda x: (x[1], x[2])).distinct()\
            .saveAsPickleFile(os.path.join(output_directory_path, 'collated'))
        collated_pickle.map(lambda x: (x[0], x[1])).distinct()\
            .saveAsTextFile(os.path.join(output_directory_path, 'dictionary'))

    def create_inverted_index(self, input_files_path: str, output_directory_path: str):
        """Create inverted index"""
        self.spark_context.pickleFile(os.path.join(input_files_path, 'collated')) \
            .reduceByKey(IndexGenerator._concatenate).map(IndexGenerator._sort).sortByKey()\
            .saveAsTextFile(os.path.join(output_directory_path, 'inverted-index'))

    def run(self, dataset_location, output_location):
        """Run the pipeline to generate inverted indexes"""
        # Verify if results directory exists
        if os.path.exists(output_location) and len(os.listdir(output_location)):
            raise FileExistsError('Results directory not empty, initiating exit...')

        if not os.path.exists(output_location):
            os.mkdir(output_location)

        self.generate_word_ids(dataset_location, output_location)
        self.collate_documents(output_location, output_location)
        self.create_inverted_index(output_location, output_location)


In [6]:
IndexGenerator(spark_context).run(dataset_location, output_location)

# Dictionary
A dictionary that matches every word from the documents with a unique id.

In [7]:
spark_context.textFile('/kaggle/id_files/dictionary').collect()

["('10000', 88236572)",
 "('1811', 70494890)",
 "('5', 87747005)",
 "('53���305648264�4�80648�8�60851-8-83885�', 80240815)",
 "('76', 57180415)",
 "('Aidenn', 30216139)",
 "('American', 86600820)",
 "('As', 30944137)",
 "('Besides', 7260548)",
 "('By', 46132051)",
 "('C', 29342982)",
 "('Director', 83900267)",
 "('Doctors', 87873033)",
 "('Donations', 27761038)",
 "('Dumas_', 72794670)",
 "('England-in', 94179929)",
 "('Etienne_', 29014323)",
 "('German', 86644722)",
 "('Him', 97192296)",
 "('IT', 82625259)",
 "('Jupiter', 15767201)",
 "('Let', 5877266)",
 "('Livre', 66949084)",
 "('MY', 54464599)",
 "('Morgue', 84753575)",
 "('Mos', 58597321)",
 "('NUMBER', 73460886)",
 "('Ourang-Outang', 38015961)",
 "('P', 26301487)",
 "('Puss', 95164756)",
 "('Reihe', 58747161)",
 "('Seleucus', 10372098)",
 "('Special', 43267560)",
 "('Truth', 81799107)",
 "('W', 74071206)",
 "('_mustachio_', 15760704)",
 "('_par', 63970718)",
 "('_secret_', 58431806)",
 "('_sole_', 74136763)",
 "('_that', 57510145

For every wordID, we have a pairs so we have names of documents

In [8]:
spark_context.pickleFile('/kaggle/id_files/*_id').collect()

[('', 0, 30),
 ('#2147', 99527366, 30),
 ('#6', 37442374, 30),
 ('$10', 78690626, 30),
 ('$100', 95262244, 30),
 ('$2', 78234333, 30),
 ('$20', 59333543, 30),
 ('$600', 30520998, 30),
 ('&c', 87978494, 30),
 ('&sect', 27580810, 30),
 ('-', 83551111, 30),
 ('--', 6440857, 30),
 ('----', 68701415, 30),
 ('-----', 79267615, 30),
 ('--_Crebillons', 81047816, 30),
 ('--_Sir', 86015799, 30),
 ('-It', 57970343, 30),
 ('-Mr', 97815407, 30),
 ('-Oh', 7041498, 30),
 ('-and', 8467929, 30),
 ('-many', 39525801, 30),
 ('-of', 83139747, 30),
 ('-satisfying', 4925604, 30),
 ('-succeed', 38488146, 30),
 ('0', 57227702, 30),
 ('005484', 9022284, 30),
 ('1', 13308882, 30),
 ('10', 53643200, 30),
 ('10%', 58722943, 30),
 ('1000', 78622195, 30),
 ('10000', 88236572, 30),
 ('100000000', 80298879, 30),
 ('10600', 20059079, 30),
 ('1080', 13810063, 30),
 ('10th', 21195485, 30),
 ('10}', 60904041, 30),
 ('11', 94144933, 30),
 ('11th', 80532459, 30),
 ('12', 90825616, 30),
 ('1200', 59979665, 30),
 ('12th', 44

The collated file containing the wordId and Document IDs

In [9]:
spark_context.pickleFile('/kaggle/id_files/collated').collect()

[(43995554, 30),
 (73850654, 30),
 (13272432, 30),
 (61994306, 30),
 (51181818, 30),
 (82479846, 30),
 (38517356, 30),
 (40627318, 30),
 (7141530, 30),
 (46773194, 30),
 (10578706, 30),
 (45809768, 30),
 (40338026, 30),
 (27761038, 30),
 (18090558, 30),
 (88932756, 30),
 (58952778, 30),
 (91181372, 30),
 (87104182, 30),
 (1276940, 30),
 (12916818, 30),
 (19077610, 30),
 (23877448, 30),
 (66949084, 30),
 (58678662, 30),
 (28959134, 30),
 (72051698, 30),
 (80095226, 30),
 (44332048, 30),
 (19410922, 30),
 (33298510, 30),
 (25766118, 30),
 (44173590, 30),
 (13344858, 30),
 (35805860, 30),
 (82462270, 30),
 (37838756, 30),
 (11752302, 30),
 (16684724, 30),
 (18081744, 30),
 (1619974, 30),
 (21697612, 30),
 (24097448, 30),
 (35214586, 30),
 (66770752, 30),
 (81260242, 30),
 (52639480, 30),
 (59633454, 30),
 (81046924, 30),
 (19108810, 30),
 (94660, 30),
 (70972096, 30),
 (50302134, 30),
 (51680450, 30),
 (76133012, 30),
 (23142002, 30),
 (98483384, 30),
 (90683590, 30),
 (76340926, 30),
 (7

# Inverted Index

In [10]:
spark_context.textFile('/kaggle/id_files/inverted-index').collect()

['(63728051, [9])',
 '(63730592, [1, 2, 3, 4, 5, 6, 7, 9, 11, 12, 16, 17, 18, 19, 21, 22, 23, 24, 26, 27, 30, 32, 34, 40, 42, 44])',
 '(63731198, [0, 1, 2, 4, 7, 8, 9, 10, 12, 13, 18, 19, 20, 23, 24, 30, 31, 32, 33, 37, 41, 42])',
 '(63731392, [0])',
 '(63731607, [6, 44])',
 '(63731669, [32])',
 '(63731805, [43])',
 '(63732457, [6])',
 '(63732975, [19])',
 '(63734085, [6])',
 '(63734642, [0, 2, 10, 24])',
 '(63735575, [16, 17])',
 '(63735877, [41])',
 '(63736201, [1, 2, 3, 4, 7, 9, 11, 13, 21, 31, 33, 34, 36, 37, 41, 43, 44])',
 '(63736468, [1])',
 '(63736800, [0, 4, 15])',
 '(63737243, [2, 6, 17, 43, 44])',
 '(63737307, [18, 23, 30])',
 '(63738357, [23])',
 '(63739344, [9, 11, 22])',
 '(63740035, [0])',
 '(63741330, [2])',
 '(63741909, [14])',
 '(63741980, [43])',
 '(63743062, [12])',
 '(63743825, [0])',
 '(63743931, [0, 1, 3, 4, 5, 9, 10, 11, 15, 16, 17, 18, 21, 22, 26, 29, 30, 31, 32, 35, 37, 39])',
 '(63745571, [0])',
 '(63746063, [8])',
 '(63746737, [23])',
 '(63748019, [9])',
 '(