# Python Hadoop MapReduce: Analyzing AWS S3 Bucket Logs with mrjob


## Introduction


## Setup

From PyPI:

``pip install mrjob``

From source:

``python setup.py install``



## Processing S3 Logs

Sample mrjob code that processes log files on Amazon S3 based on the [S3 logging format](http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html):

In [0]:
pip install mrjob

Collecting mrjob
  Downloading https://files.pythonhosted.org/packages/2c/71/4821a7fa49d1e7be2d0defbdc394982b28994d97834ef5400c85bb91b66d/mrjob-0.6.12-py2.py3-none-any.whl (435kB)
Collecting boto3>=1.4.6
  Downloading https://files.pythonhosted.org/packages/8e/a9/1ceaeda8aa5d3effc9098ae301820e27bf54c4000ec6f8ec79f9b265c50e/boto3-1.10.19-py2.py3-none-any.whl (128kB)
Collecting botocore>=1.6.0
  Downloading https://files.pythonhosted.org/packages/18/e5/0f29669244ffacc15c4cec9e10d75c26e7d300e1786e79514e62373e648c/botocore-1.13.19-py2.py3-none-any.whl (5.4MB)
Collecting google-cloud-logging>=1.9.0
  Downloading https://files.pythonhosted.org/packages/b9/06/3a89173f80d02d275ab4fdca5283a7fdb84845e214f3383b5d52c29f4b5f/google_cloud_logging-1.14.0-py2.py3-none-any.whl (134kB)
Collecting google-cloud-dataproc>=0.3.0
  Downloading https://files.pythonhosted.org/packages/a2/2c/d702eb5be11f3a492b426b9756e3fdbe50869933261a6f98d970dd070c8e/google_cloud_dataproc-0.6.1-py2.py3-none-any.whl (264kB)
Col

In [0]:
%%file mr_s3_log_parser.py

import time
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, ReprProtocol
import re


class MrS3LogParser(MRJob):
    """Parses the logs from S3 based on the S3 logging format:
    http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html
    
    Aggregates a user's daily requests by user agent and operation
    
    Outputs date_time, requester, user_agent, operation, count
    """

    LOGPATS  = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \
               r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \
               r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \
               r'("([^"]+)"|-) ("([^"]+)"|-)'
    NUM_ENTRIES_PER_LINE = 17
    logpat = re.compile(LOGPATS)

    (S3_LOG_BUCKET_OWNER, 
     S3_LOG_BUCKET, 
     S3_LOG_DATE_TIME,
     S3_LOG_IP, 
     S3_LOG_REQUESTER_ID, 
     S3_LOG_REQUEST_ID,
     S3_LOG_OPERATION, 
     S3_LOG_KEY, 
     S3_LOG_HTTP_METHOD,
     S3_LOG_HTTP_STATUS, 
     S3_LOG_S3_ERROR, 
     S3_LOG_BYTES_SENT,
     S3_LOG_OBJECT_SIZE, 
     S3_LOG_TOTAL_TIME, 
     S3_LOG_TURN_AROUND_TIME,
     S3_LOG_REFERER, 
     S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)

    DELIMITER = '\t'

    # We use RawValueProtocol for input to be format agnostic
    # and avoid any type of parsing errors
    INPUT_PROTOCOL = RawValueProtocol

    # We use RawValueProtocol for output so we can output raw lines
    # instead of (k, v) pairs
    OUTPUT_PROTOCOL = RawValueProtocol

    # Encode the intermediate records using repr() instead of JSON, so the
    # record doesn't get Unicode-encoded
    INTERNAL_PROTOCOL = ReprProtocol

    def clean_date_time_zone(self, raw_date_time_zone):
        """Converts entry 22/Jul/2013:21:04:17 +0000 to the format
        'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into
        a database such as Redshift or RDS

        Note: requires the chars "[ ]" to be stripped prior to input
        Returns the converted datetime annd timezone
        or None for both values if failed

        TODO: Needs to combine timezone with date as one field
        """
        date_time = None
        time_zone_parsed = None

        # TODO: Probably cleaner to parse this with a regex
        date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]
        time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:
                                         raw_date_time_zone.find("+") - 1]
        time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]

        try:
            date_struct = time.strptime(date_parsed, "%d/%b/%Y")
            converted_date = time.strftime("%Y-%m-%d", date_struct)
            date_time = converted_date + " " + time_parsed

        # Throws a ValueError exception if the operation fails that is
        # caught by the calling function and is handled appropriately
        except ValueError as error:
            raise ValueError(error)
        else:
            return converted_date, date_time, time_zone_parsed

    def mapper(self, _, line):
        line = line.strip()
        match = self.logpat.search(line)

        date_time = None
        requester = None
        user_agent = None
        operation = None

        try:
            for n in range(self.NUM_ENTRIES_PER_LINE):
                group = match.group(1 + n)

                if n == self.S3_LOG_DATE_TIME:
                    date, date_time, time_zone_parsed = \
                        self.clean_date_time_zone(group)
                    # Leave the following line of code if 
                    # you want to aggregate by date
                    date_time = date + " 00:00:00"
                elif n == self.S3_LOG_REQUESTER_ID:
                    requester = group
                elif n == self.S3_LOG_USER_AGENT:
                    user_agent = group
                elif n == self.S3_LOG_OPERATION:
                    operation = group
                else:
                    pass

        except Exception:
            yield (("Error while parsing line: %s", line), 1)
        else:
            yield ((date_time, requester, user_agent, operation), 1)

    def reducer(self, key, values):
        output = list(key)
        output = self.DELIMITER.join(output) + \
                 self.DELIMITER + \
                 str(sum(values))

        yield None, output

    def steps(self):
        return [
            self.mr(mapper=self.mapper,
                    reducer=self.reducer)
        ]


if __name__ == '__main__':
    MrS3LogParser.run()

Overwriting mr_s3_log_parser.py


## Running Amazon Elastic MapReduce Jobs

Run an Amazon Elastic MapReduce (EMR) job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):

In [0]:
!python mr_s3_log_parser.py -r emr s3://bucket-source/ --output-dir=s3://bucket-dest/

Run a MapReduce job locally on the specified input file, sending the results to the specified output file:

In [0]:
!python mr_s3_log_parser.py input_data.txt > output_data.txt

## Sample Config File

In [0]:
runners:
  emr:
    aws_access_key_id: __ACCESS_KEY__
    aws_secret_access_key: __SECRET_ACCESS_KEY__
    aws_region: us-east-1
    ec2_key_pair: EMR
    ec2_key_pair_file: ~/.ssh/EMR.pem
    ssh_tunnel_to_job_tracker: true
    ec2_master_instance_type: m3.xlarge
    ec2_instance_type: m3.xlarge
    num_ec2_instances: 5
    s3_scratch_uri: s3://bucket/tmp/
    s3_log_uri: s3://bucket/tmp/logs/
    enable_emr_debugging: True
    bootstrap:
    - sudo apt-get install -y python-pip
    - sudo pip install --upgrade simplejson