# Improving the log analyzer

Enhancing our second CLI application so that it can analyze multiple files in multiple formats and use JSON configuration instead of hard-coded values.

## Objective

To understand what is **JSON** and how to use it and another helpful module **pprint**.

## JSON

[`json`](https://docs.python.org/3/library/json.html#module-json) exposes an API familiar to users of the standard library [`marshal`](https://docs.python.org/3/library/marshal.html#module-marshal) and [`pickle`](https://docs.python.org/3/library/pickle.html#module-pickle) modules.

### xml.etree.ElementTree

The [`xml.etree.ElementTree`](https://docs.python.org/3/library/xml.etree.elementtree.html#module-xml.etree.ElementTree) module implements a simple and efficient API for parsing and creating XML data.

## pprint

The [`pprint`](https://docs.python.org/3.9/library/pprint.html?highlight=pprint#module-pprint) module provides a capability to “pretty-print” arbitrary Python data structures in a form which can be used as input to the interpreter.

## Hands-on

In [2]:
import argparse
import os
import re
import sys
import colorama


class Grep:
    def __init__(self, is_regex=False, only_matching=False, with_filename=False):
        self._is_regex = is_regex
        self._only_matching = only_matching
        self._with_filename = with_filename

    def search_in_string(self, search, search_string, return_groups=False):
        search_result = None
        search_groups = None
        if self._is_regex:
            search_result = re.search(search, search_string)
            if search_result:
                # if search_result := re.search(search, search_string):
                search_groups = search_result.groupdict()
                search_result = search_string[search_result.span()[0]:search_result.span()[1]] \
                    if self._only_matching else search_string
        elif not self._is_regex and search_string.find(search) >= 0:
            search_result = search_string
        if return_groups:
            return (search_result, search_groups)
        else:
            return search_result

    def search_in_path(self, search, input_path):
        search_results = []
        if os.path.isdir(input_path):
            print('Scanning path: {:25s}'.format(input_path))
            input_dir_contents = os.scandir(path=input_path)
            for input_dir_element in input_dir_contents:
                search_results.extend(self.search_in_path(search, input_dir_element.path))
        else:
            input_file = open(input_path, 'r')
            print('Opening file: {:25s}'.format(input_file.name))
            for input_line in input_file.readlines():
                search_result = self.search_in_string(search, input_line)
                if search_result:
                    search_results.append('{}: {}'.format(os.path.basename(input_file.name), search_result) \
                                              if self._with_filename else search_result)
        return search_results


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('search', type=str, help='Pattern to search for')
    parser.add_argument('input_paths', nargs='+', type=str, help='List of input file paths')
    parser.add_argument('-e', '--regex', dest='is_regexp', action='store_true', help='Use search as regexp')
    parser.add_argument('-r', '--recursive', type=str, help='Search recursively in directories')
    parser.add_argument('-o', '--only-matching', dest='only_matching', action='store_true',
                        help='Show matched string only')
    parser.add_argument('-H', '--with-filename', dest='with_filename', action='store_true',
                        help='Show matched string only')
    args = parser.parse_args(sys.argv[1:])

    grep = Grep(args.is_regexp, args.only_matching, args.with_filename)
    for input_path in args.input_paths:
        search_results = grep.search_in_path(args.search, input_path)
        print(colorama.Style.RESET_ALL + 'Search results:')
        for search_result in search_results:
            print(colorama.Fore.GREEN + search_result)


if __name__ == '__main__':
    main()

usage: ipykernel_launcher.py [-h] [-e] [-r RECURSIVE] [-o] [-H]
                             search input_paths [input_paths ...]
ipykernel_launcher.py: error: the following arguments are required: input_paths


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [1]:
import datetime
import json
import pprint
import Code.Chapter5.egrep as egrep


def filter_by_source(source, watched_entries):
    for entry_match, entry_meta in watched_entries:
        if entry_meta['source'] == source:
            yield {'match': entry_match, 'meta': entry_meta}


def main():
    """ 6.1 Scan multiple files """
    # grep = egrep.Grep(is_regex=True, with_filename=False, only_matching=True)
    # files_to_scan = ["test_data/syslog", "test_data/syslog2"]
    # events = {}
    # for scanned_file in files_to_scan:
    #     input_file_lines = open(scanned_file, 'r').readlines()
    #     watched_entries = []
    #     for input_line in input_file_lines:
    #         search_result, search_groups = grep.search_in_string(
    #             "^\w+\W+\d+\W+(?P<time>\d{2}:\d{2}:\d{2})\W+\w+\W+\w+\W+(?P<source>[\w\-]+):", input_line, True)
    #         if search_result:
    #             watched_entries.append((search_result, search_groups))
    #     sources = (entry_meta['source'] for entry_match, entry_meta in watched_entries)
    #     for source in set(sources):
    #         events_per_source = filter_by_source(source, watched_entries)
    #         times = [datetime.datetime.strptime('2020-10-10 ' + event['meta']['time'], '%Y-%m-%d %H:%M:%S')
    #                  for event in events_per_source]
    #         if source in events:
    #             times.append(events[source]['min'])
    #             times.append(events[source]['max'])
    #         events[source] = {'min': min(times), 'max': max(times)}

    """ 6.2 Scan config from JSON """
    # grep = egrep.Grep(is_regex=True, with_filename=False, only_matching=True)
    # config = json.load(open('config.json', 'r'))
    # pprint.pprint(config)
    # events = {}
    # for scanned_file in config['files_to_scan']:
    #     input_file_lines = open(scanned_file, 'r').readlines()
    #     watched_entries = []
    #     for input_line in input_file_lines:
    #         for search_pattern in config['search_patterns']:
    #             search_result, search_groups = grep.search_in_string(search_pattern, input_line, True)
    #             if search_result:
    #                 watched_entries.append((search_result, search_groups))
    #     sources = (entry_meta['source'] for entry_match, entry_meta in watched_entries)
    #     for source in set(sources):
    #         events_per_source = filter_by_source(source, watched_entries)
    #         times = [datetime.datetime.strptime(event['meta']['time'], '%Y%m%d %H:%M:%S')
    #                  for event in events_per_source]
    #         if source in events:
    #             times.append(events[source]['min'])
    #             times.append(events[source]['max'])
    #         events[source] = {'min': min(times), 'max': max(times)}

    """ 6.3 Scan config from more complex JSON (alternative would be date time group capturing) """
    grep = egrep.Grep(is_regex=True, with_filename=False, only_matching=True)
    config = json.load(open('config2.json', 'r'))
    pprint.pprint(config)
    events = {}
    for scanned_file in config['files_to_scan']:
        input_file_lines = open(scanned_file, 'r').readlines()
        for search_config in config['search_config']:
            watched_entries = []
            for input_line in input_file_lines:
                search_result, search_groups = grep.search_in_string(search_config['search_pattern'], input_line, True)
                if search_result:
                    watched_entries.append((search_result, search_groups))
            sources = (entry_meta['source'] for entry_match, entry_meta in watched_entries)
            for source in set(sources):
                events_per_source = filter_by_source(source, watched_entries)
                times = [datetime.datetime.strptime(event['meta']['time'], search_config['date_pattern'])
                         for event in events_per_source]
                if source in events:
                    times.append(events[source]['min'])
                    times.append(events[source]['max'])
                events[source] = {'min': min(times), 'max': max(times)}

    """ Show pprint """
    pprint.pprint(events)


if __name__ == '__main__':
    main()

ModuleNotFoundError: No module named 'Code'

[Previous chapter - Chapter5](Chapter5.ipynb) | [Next chapter - Chapter7](Chapter7.ipynb)