In [88]:
# Saving variables to access the file locations
articles='articles.csv'
customers='customers.csv'
transactions='transactions.csv'
transactions_small ='transactions_small.csv'


For each garment group, show the most frequent product, the second most frequent section and the most frequent department it appears inside the article.csv file; make sure output has the following schema:

            garment_group_name, prod_name, section_name,  department_name

The product names are stored in "prod_name", the deparment name in "department_name", the garment group in "garment_group_name" and the section in "section_name". In case that there are multiple departments, garment groups or sections with the same number of occurences, you may resolve these conflicts randomly, i.e. pick one of them arbitrarily. In case there is only one section, or all sections appear with the same frequency, just pick the most frequent one, and resolve conflicts randomly. 

Make sure that your program correctly deals with the header, and possible sparse values.

In [89]:
%%file mymrjob1.py
# This will create a local file to run your MapReduce program  

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv 
import logging

log = logging.getLogger(__name__)

class MyMRJob1(MRJob):
    
    OUTPUT_PROTOCOL = CsvProtocol
    
    def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)
        
    def mapper(self, _, line):
        result = next(csv.reader([line]))
        
        garment_group_name = result[23]
        prod_name = result[2]
        section_name = result[21]
        department_name = result[15]
        
        if result[0] != 'article_id':
            yield (garment_group_name, 'prod_name', prod_name), 1
            yield (garment_group_name, 'section_name', section_name), 1
            yield (garment_group_name, 'department_name', department_name), 1
        
    def combiner(self, key, valuelist):
        yield key, sum(valuelist)
        
    def reducer(self, key, valuelist):
        yield key[:-1], (key[-1], sum(valuelist))
        
    def reducer2(self, key, valuelist):
        frequency = max(valuelist, key=lambda x: x[1])
        yield key[0], (key[1], frequency[0])
        
    def reducer3(self,key,valuelist):
        prod_name, section_name, department_name = [*dict(valuelist).values()]
        yield None, (key, prod_name, section_name, department_name)

    def steps(self):
        first_step = MRStep(
            mapper=self.mapper, 
            combiner=self.combiner, 
            reducer=self.reducer
        )
        
        second_step = MRStep(
            reducer=self.reducer2
        )
        
        third_step = MRStep(
            reducer=self.reducer3
        )
        
        return [ first_step, second_step, third_step ]

if __name__ == '__main__':
    MyMRJob1.run()


Overwriting mymrjob1.py


In [None]:
!python3.6 mymrjob1.py  $articles

Running a local MRjob 

Running a Hadoop job

In [None]:
! HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/  python3.6 mymrjob1.py -r hadoop hdfs://$articles > output.csv

---
For all customers older than 30 years, show the number of transactions items they were involved in with articles from department with name 'Jersey Basic' or 'Shirt'. 


Make sure to have the following format in your final output:

            customer_id,count_transactions


In [187]:
%%file mymrjob2.py
# This will create a local file to run your MapReduce program  

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv 
import logging
  
log = logging.getLogger(__name__)

class MyMRJob2(MRJob):
    
    OUTPUT_PROTOCOL = CsvProtocol  # write output as CSV
    
    def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)

    def mapper(self, _, line):
        result = next(csv.reader([line]))
        
        input_size = len(result)
        
        try:
            if input_size == 25: 
                if 'Jersey Basic' in result[15] or 'Shirt' in result[15]:
                    article_id  = result[0]
                    yield ('article', article_id), 1

            elif input_size == 7 and int(result[5]) > 30:
                customer_id = result[0]
                yield ('customer', customer_id), 1

            elif input_size == 5: 
                customer_id = result[1]
                article_id  = result[2]
                yield (customer_id, article_id), 1
                
        except ValueError:
            return 0
   
    def combiner(self, key, valuelist):
        if key[0] != "customer" and key[0] != "article":
            yield ('customer', key[0]), 1
            
        else:
            yield key, 1
            
    def reducer(self, key, valuelist):
        if key[0] == "customer":
            yield None, (key[1], sum(valuelist))
            
    def steps(self):
        first_step = MRStep(
            mapper=self.mapper,
            combiner=self.combiner, 
            reducer=self.reducer
        )
        
        return [ first_step ]

if __name__ == '__main__':
    MyMRJob2.run()

Overwriting mymrjob2.py


In [None]:
! python3.6 mymrjob2.py  $articles $transactions_small $customers > output2.csv

In [None]:
! HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/  python3.6  mymrjob2.py -r hadoop hdfs://$articles hdfs://$transactions hdfs://$customers > output2.csv