John Stockton
CS-351
Wang
Winter 2022

Due to errors with the botocore package, I was unable to run the MRJob on EMR. I emailed you and you gave me the directive to not worry about the EMR. My output files are included. I spent approximately 2-3 hours working on the MapReduce python files, but spent close to 5-6 hours just trying to debug the botocore error on which I eventually gave up. I am unable to give you statistics for the EMR cluster performance as a result. 

In [None]:
%%file cond_prob.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

class MRConditionalProbability(MRJob):
    def steps(self):
        return [
            # Step 1: Count bigrams
            MRStep(mapper=self.mapper_counts,
                 reducer=self.reducer_counts),
            # Step 2: Calculate probabilities
            MRStep(reducer=self.reducer_prob)
        ]


    # Step 1a: Split out all bigrams
    def mapper_counts(self, _, line):
        words = re.findall('([a-zA-Z]+[\'][a-zA-Z]+|[a-zA-Z]+)', line)
        num_words = len(words)
        if num_words>1:
            words[0] = words[0].upper()
            for i in range(num_words-1):
                words[i+1] = words[i+1].upper()
                yield (words[i], words[i+1]), 1

    # Step 1b: Reduce bigram counts
    def reducer_counts(self, words, count):
        word1, word2 = words
        yield word1, (sum(count),word2)

    # Step 2: Calculate probablities for each bigram
    #         and get top words that come after 'my'
    def reducer_prob(self, word1, bigrams_gen):
        word1_count = 0

        bigrams = list(bigrams_gen) # Cast bigrams into list from generator
        num_bigrams = len(bigrams)

        # Calculate total count for word1
        for i in range(num_bigrams):
            word1_count += bigrams[i][0]

        # Output all words and probabilities
        #     If word1 is 'my' place word2 & probability into list
        for i in range(num_bigrams):
            bigram_count, word2 = bigrams[i]
            bigram_prob = round(bigram_count/word1_count,3)
            yield f'P( {word2} | {word1} ):', bigram_prob

# Invoke main
if __name__ == '__main__':
    MRConditionalProbability.run()

In [None]:
!python cond_prob.py < shortjokes.csv > probabilities.txt

In [None]:
%%file top_words.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

class MRWordFrequencyCount(MRJob):
    def steps(self):
        return [
            # Step 1: Count bigrams
            MRStep(mapper=self.mapper_counts,
                   reducer=self.reducer_counts),
            # Step 2: Calculate probabilities
            MRStep(reducer=self.reducer_top)
        ]

    # Step 1a: Split out all bigrams
    def mapper_counts(self, _, line):
        words = re.findall('([a-zA-Z]+[\'][a-zA-Z]+|[a-zA-Z]+)', line)
        num_words = len(words)
        if num_words>1:
            words[0] = words[0].upper()
            for i in range(num_words-1):
                words[i+1] = words[i+1].upper()
                if words[i]=='MY':
                    yield (words[i],words[i+1]), 1

    # Step 1b: Reduce bigram counts
    def reducer_counts(self, words, count):
        word1, word2 = words
        yield word1, (sum(count),word2)

    # Step 2: Calculate probablities for each bigram
    #         and get top words that come after 'my'
    def reducer_top(self, _, bigrams_gen):
        my_count = 0
        word2s = []

        bigrams = list(bigrams_gen) # Cast bigrams into list from generator
        num_word2s = len(bigrams)

        # Calculate total count for 'my'
        for i in range(num_word2s):
            my_count += bigrams[i][0]

        # Output all words and probabilities
        #     If word1 is 'my' place word2 & probability into list
        for i in range(num_word2s):
            bigram_count, word2 = bigrams[i]
            bigram_prob = round(bigram_count/my_count,3)
            word2s.append( (word2,bigram_prob) )

        # Sort list of words that come after 'my' in descending order by probability
        word2s.sort(key=(lambda word_pair:word_pair[1]), reverse=True)

        # List top words that come after my
        i = 10 if len(word2s)>10 else len(word2s)
        for j in range(i):
            yield f'{i}. {word2s[j][0]}', word2s[j][1]
            i -= 1 

# Invoke main
if __name__ == '__main__':
    MRWordFrequencyCount.run()

In [None]:
!python top_words.py < shortjokes.csv > topTenWords.txt