In [18]:
DATA_PATH = './data/reviews_devset.json'
PYTHON = 'python3'

In [19]:
! pip3 install mr3px mrjob



In [142]:
%%file chiSquaredReduce.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import json

stopwords = set()

class ChiSquaredProcessor(MRJob):

    def mapper_1(self, _, line):
        """
        This Mapper return the category and the set of words in the reviewText
        Returns a key value pair of: None, (category, word_set)
        """

        data = json.loads(line)
        category = data.get('category', '')
        reviewText = data.get('reviewText', '')
        word_list = re.split('[^a-zA-Z<>^|]+', reviewText.lower())
        word_set = set([word for word in word_list if word not in stopwords and word.strip() != '' and len(word) > 1])
        yield None, (category, list(word_set))

    def reducer_1(self, _, compromised_reviewText):
        """
        This reducer groups the reviews by category and counts the number of all documents
        Returns a key value pair of: category, (reviews, N)
        """

        compromised_reviews = list(compromised_reviewText)
        N = len(compromised_reviews)
        for category, reviews in compromised_reviews:
            yield category, (reviews, N)

    def mapper_2(self, category, reviews_N):
        """
        This mapper returns all reviews for each category. N (the number of documents), is also passed along per category
        Returns a key value pair of: (category, N), reviews
        """

        reviews, N = reviews_N
        yield (category, N), reviews

       
    def reducer_2(self, category_N, reviews):
        """ 
        This reducer counts the number of documents per category and returns for each category the number of documents and all reviews
        Returns a key value pair of: (category, N), (number of documents, reviews)
        """

        all_reviews = list(reviews)
        yield category_N, (len(all_reviews), all_reviews)


    def mapper_3(self, category_N, all_reviews):
        """ 
        This mapper groups the terms per category to allow counting their occurences
        Returns a key value pair of: (category, term), (1, count, N)
        """

        count, reviews = all_reviews
        N = category_N[1]
        category = category_N[0]
        for review in reviews:
            for term in review:
                yield (category, term), (1, count, N)

    def reducer_3(self, category_term, counts): 
        """ 
        This reducer counts the occurences of each term per category
        Returns a key value pair of: (category, term), (number, count, N)
        """

        documents = list(counts)
        count = documents[0][1]
        N = documents[0][2]
        number = sum([n for n, _, _ in documents])
        yield category_term, (number, count, N)

    def mapper_4(self, category_term, number_count_N):
        """ 
        This mapper groups the occurences per category for each term
        Returns a key value pair of: term, ((category, number), count, N)
        """

        number, count, N = number_count_N
        category, term = category_term
        yield term, ((category, number), count, N)

    def reducer_4(self, term, category_number_count_N):
        """ 
        This reducer sums the occurences of each term for all categories and adds it to the occurences per category for each term
        Returns a key value pair of: term, ([((category, number), count, N)], occurence_overall)
        """

        categories = list(category_number_count_N)
        occurence_overall = sum([number for (_, number), _, _ in categories])

        yield term, (categories, occurence_overall)

    def mapper_5(self, term, categories_occurences):
        """
        This mapper adds the number of all occurences of each term to the occurences per category for each term and groups them by category and term
        Returns a key value pair of: (category, term), (number, occurence_overall, count_category, N)
        """

        categories, occurence_overall = categories_occurences
        for (category, number), count_category, N in categories:
            yield (category, term), (number, occurence_overall, count_category, N)

    def reducer_5(self, category_term, number_count_occurence_N):
        """ 
        This reducer calculates the chi squared value for each term and category.
        Returns a key value pair of: (category, term), chi_squared
        """

        number_count_occurence_N = list(number_count_occurence_N)
        for number_count_occurence in number_count_occurence_N:
            number, occurence, count, N = number_count_occurence
            A = number
            B = occurence - A
            C = count - A
            D = N - count - B
            chi_squared = N * (A*D - B*C)**2 / ((A+B)*(A+C)*(B+D)*(C+D))
            yield category_term, chi_squared

    def mapper_6(self, category_term, chi_squared):
        """ 
        This mapper groups the chi squared values by category.
        Returns a key value pair of: category, (term, chi_squared)
        """

        category, term = category_term
        yield category, (term, chi_squared)

    def reducer_6(self, category, term_chi_squared):
        """ 
        This reducer sorts the chi squared values and returns the 75 highest values for each category
        Returns a key value pair of: category, [term=chi_squared]
        """
        chi_squared_terms = list(term_chi_squared)
        chi_squared_terms.sort(key=lambda x: x[1], reverse=True)

        yield category, ', '.join(f"{x}={y}" for x, y in chi_squared_terms[:75])


    def steps(self):
        return [
            MRStep(
                mapper   = self.mapper_1,
                reducer  = self.reducer_1
            ),
            MRStep(
                mapper   = self.mapper_2,
                reducer  = self.reducer_2
            ),
            MRStep(
                mapper   = self.mapper_3,
                reducer  = self.reducer_3
            ),
            MRStep(
                mapper   = self.mapper_4,
                reducer  = self.reducer_4
            ),
            MRStep(
                mapper   = self.mapper_5,
                reducer  = self.reducer_5
            ),
            MRStep(
                mapper   = self.mapper_6,
                reducer  = self.reducer_6
            )
        ]
   
if __name__ == '__main__':
    with open('./data/stopwords.txt', 'r') as f:
        stopwords = set(f.read().splitlines())


    myjob1 = ChiSquaredProcessor()
    with myjob1.make_runner() as runner:
        runner.run()
        
        for key, value in myjob1.parse_output(runner.cat_output()):           
            print(key, value, "\n", end='')


Overwriting chiSquaredReduce.py


Running a local MRjob 

In [67]:
! $PYTHON ./chiSquaredReduce.py $DATA_PATH > ./result/output.txt

No configs specified for inline runner


This approach works, some sorting and mergin all lists however is still missing. I didn't add this for the reason I am not fully happy with the overall architecture.
My first thought to improving it is by calculating the number of documents for each document and to count all documents into a first Job which we can leverage to a later point.

In [143]:
%%file chiSquaredReduceOptimized.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import json

stopwords = set()
category_counts = {}

class CountingProcessor(MRJob):
    def mapper(self, _, line):
        """ 
        This mapper returns for each review of a category 1
        Returns a key value pair of: category, 1
        """

        data = json.loads(line)
        category = data.get('category', '')
        yield category, 1

    def reducer(self, category, counts):
        """
        This reducer sums the number of reviews per category
        Returns a key value pair of: category, number of reviews
        """
        yield category, sum(counts)

    def steps(self):
        return [
            MRStep(
                mapper   = self.mapper,
                reducer  = self.reducer
            )
        ]

class ChiSquaredProcessor(MRJob):

    def mapper_1(self, _, line):
        """ 
        This mapper returns all terms per category for each document occurence
        Returns a key value pair of: (category, word), 1
        """

        data = json.loads(line)
        category = data.get('category', '')
        reviewText = data.get('reviewText', '')
        word_list = re.split('[^a-zA-Z<>^|]+', reviewText.lower())

        word_set = set([word for word in word_list if word not in stopwords and word.strip() != '' and len(word) > 1])
        for word in word_set:
            yield (category, word), 1

    def reducer_1(self, category_term, compromised_reviewText):
        """ 
        This reducer sums the occurences of each term per category
        Returns a key value pair of: (category, term), number
        """

        yield category_term, sum(compromised_reviewText)

    def mapper_2(self, category_term, count_term):
        """
        This mapper returns all categories which a term is occuring in and the respective number of occurences per category
        Returns a key value pair of: term, (category, count_term)
        """

        category, term = category_term
        yield term, (category, count_term)

    def reducer_2(self, term, category_count):
        """
        This reducer returns the number of occurences of a term in all categories and the number of occurences of all terms
        Returns a key value pair of: term, [(category, count_term, number_of_occurences)]
        """

        all_categories_count = list(category_count)
        number_of_occurences = sum([count for _, count in all_categories_count])
        yield term, [(category, count_term, number_of_occurences) for category, count_term in all_categories_count]

    def mapper_3(self, term, list_category_count):
        """
        This mapper groups the occurences of each term per category
        Returns a key value pair of: (category, term), (count_term, number_of_occurences)
        """

        for category_count in list_category_count:
            category, count_term, number_of_occurences = category_count
            yield (category, term), (count_term, number_of_occurences)

    def reducer_3(self, category_term, list_category_count): 
        """
        This reducer calculates the chi squared value for each term per category.
        Returns a key value pair of: (category, term), chi_squared
        """

        number_count_occurence_N = list(list_category_count)
        category, _ = category_term
        for count in number_count_occurence_N:
            count_term, number_of_occurences = count
            A = count_term
            B = number_of_occurences - count_term
            C = category_counts[category] - count_term
            D = category_counts["N"] - category_counts[category] - B
            chi_squared = category_counts["N"] * (A*D - B*C)**2 / ((A+B)*(A+C)*(B+D)*(C+D))
            yield category_term, chi_squared

    def mapper_4(self, category_term, chi_squared):
        """
        This mapper groups the chi squared values for each term by category. Additionally it returns all terms per category.
        Returns a key value pair of: category, (term, chi_squared)
        """

        category, term = category_term
        yield category, (term, chi_squared)
        yield None, term

    def reducer_4(self, category, term_chi_squared):
        """
        This reducer sorts the chi squared values and returns the 75 highest values for each category. If no category is given, all terms are returned.
        Returns a key value pair of: category, [term=chi_squared]
        """

        if category is None:
            all_words = set(term_chi_squared)
            all_words = sorted(all_words)
            yield ', '.join(all_words), ""
        else:
            chi_squared_terms = list(term_chi_squared)
            chi_squared_terms.sort(key=lambda x: x[1], reverse=True)
            yield category, ', '.join(f"{x}={y}" for x, y in chi_squared_terms[:75])


    def steps(self):
        return [
            MRStep(
                mapper   = self.mapper_1,
                reducer  = self.reducer_1
            ),
            MRStep(
                mapper   = self.mapper_2,
                reducer  = self.reducer_2
            ),
            MRStep(
                mapper   = self.mapper_3,
                reducer  = self.reducer_3
            ),
            MRStep(
                mapper   = self.mapper_4,
                reducer  = self.reducer_4
            )
        ]
   
if __name__ == '__main__':
    myjob = CountingProcessor()
    with myjob.make_runner() as runner:
        runner.run()
        for key, value in myjob.parse_output(runner.cat_output()):           
            category_counts[key] = value
        category_counts["N"] = sum(category_counts.values())

    with open('./data/stopwords.txt', 'r') as f:
        stopwords = set(f.read().splitlines())

    mrjob = ChiSquaredProcessor()
    with mrjob.make_runner() as runner:
        runner.run()
        
        for key, value in mrjob.parse_output(runner.cat_output()):           
            print(key, value, "\n", end='')


Overwriting chiSquaredReduceOptimized.py


In [139]:
! $PYTHON ./chiSquaredReduceOptimized.py $DATA_PATH > ./result/output.txt

No configs specified for inline runner
No configs specified for inline runner
