# Application Log Monitoring and Parsing using Python

Inspired by Jackson Chan's article, this notebook demonstrates how to use Python and regular expressions to parse application log files, identify errors and warnings, and extract meaningful information.

We will:
1. Set up necessary libraries.
2. Create sample log files (based on the article's examples).
3. Implement the Python parsing script.
4. Run the script on the sample logs.
5. Examine the output.
6. Briefly discuss next steps for `log2.txt` and `log3.txt`.

In [1]:
import re
import csv  # For future CSV output enhancements
import os   # For file operations

print("Libraries imported successfully.")

Libraries imported successfully.


## 1. Preparing Sample Log Data

For this demonstration, we'll create `log1.txt` based on the content and structure described in the article. In a real scenario, you would point the script to your actual log files.

The article also mentions `log2.txt` and `log3.txt` are available on GitHub: [https://github.com/jacksonsiachan/sample_logs](https://github.com/jacksonsiachan/sample_logs)

For self-containment, we will create `log1.txt` here. You can download `log2.txt` and `log3.txt` and place them in the same directory as this notebook to analyze them later.

In [2]:
log1_content = """
time="2020-03-18T14:40:29Z" level=info msg="Starting application services" file="main.go:30" func="main.main" kind=application
time="2020-03-18T14:40:30Z" level=info msg="Initialising database connection" file="corp/application1/database.go:55" func="corp/application1.ConnectDB" kind=database
time="2020-03-18T14:40:30Z" level=error msg="Database stream initialisation failed. Have you tried installing the WAL2JASON plugin?" file="corp/application1/task1.go:96" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:31Z" level=info msg="Attempting to connect to external services" file="corp/application1/task1.go:98" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:31Z" level=warning msg="Connection to hostA.local failed: host unreachable" file="corp/application1/task1.go:100" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:31Z" level=warning msg="Connection to hostA.local failed: host unreachable" file="corp/application1/task1.go:102" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:31Z" level=warning msg="Connection to hostB.local failed: Could not resolve host" file="corp/application1/task1.go:104" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:35Z" level=info msg="Successfully connected to hostC.local" file="corp/application1/task1.go:106" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:38Z" level=warning msg="Failed to write record, duplicate key violation for primary key" file="corp/interviews/application1/task1.go:114" func="corp/application1/task1.Dblogger" kind=application
time="2020-03-18T14:40:39Z" level=info msg="User 'jchan' logged in successfully" file="auth.go:120" func="auth.Login" kind=security
time="2020-03-18T14:40:40Z" level=warning msg="Archiving data source: old_records_2019" file="archive.go:70" func="archive.Run" kind=system
"""

with open("log1.txt", "w") as f:
    f.write(log1_content.strip())

print("log1.txt created successfully.")

log1.txt created successfully.


In [3]:
def parse_log_file(source_file_path):
    """
    Parses a log file to find error and warning lines that indicate a failure.

    Args:
        source_file_path (str): The path to the log file.

    Returns:
        tuple: A tuple containing two lists: (errors, warnings)
    """
    errors = []
    warnings = []

    if not os.path.exists(source_file_path):
        print(f"Error: Source file '{source_file_path}' not found.")
        return errors, warnings

    try:
        with open(source_file_path, "r") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue

                # Regex from the article:
                regex_pattern_article = r"\w*level=(warning|error)*[\w ]*[\=\"\w \.]*[fF]ailed"
                result = re.search(regex_pattern_article, line)

                if result:
                   level_type = result.group(1)
                   if level_type == 'error':
                      errors.append(line)
                   elif level_type == 'warning':
                      warnings.append(line)

    except Exception as e:
        print(f"An error occurred while processing {source_file_path}: {e}")

    return errors, warnings

def save_messages(messages, output_filename):
    """Saves a list of messages to a file, one message per line."""
    with open(output_filename, 'w', newline='') as outfile:
        for message in messages:
            outfile.write(f"{message}\n")
    print(f"Saved {len(messages)} messages to {output_filename}")

print("Log parsing functions defined.")

Log parsing functions defined.


In [4]:
source_log_file = "log1.txt"
error_output_file = "error_messages_from_log1.txt"
warning_output_file = "warning_messages_from_log1.txt"

print(f"Processing {source_log_file}...")
errors_found, warnings_found = parse_log_file(source_log_file)

if errors_found:
    save_messages(errors_found, error_output_file)
else:
    print(f"No 'failed' errors found in {source_log_file} matching the criteria.")
    if os.path.exists(error_output_file): os.remove(error_output_file)

if warnings_found:
    save_messages(warnings_found, warning_output_file)
else:
    print(f"No 'failed' warnings found in {source_log_file} matching the criteria.")
    if os.path.exists(warning_output_file): os.remove(warning_output_file)

print("\n--- Script Execution Complete ---")

Processing log1.txt...
Saved 1 messages to error_messages_from_log1.txt

--- Script Execution Complete ---


## 2. Examining Output from `log1.txt`

Let's display the contents of the generated files.

In [5]:
print(f"--- Contents of {error_output_file} ---")
if os.path.exists(error_output_file):
    with open(error_output_file, "r") as f:
        print(f.read())
else:
    print(f"{error_output_file} not created (no relevant errors found or error during processing).")

--- Contents of error_messages_from_log1.txt ---
time="2020-03-18T14:40:30Z" level=error msg="Database stream initialisation failed. Have you tried installing the WAL2JASON plugin?" file="corp/application1/task1.go:96" func="corp/application1/task1.Dblogger" kind=application



In [6]:
print(f"\n--- Contents of {warning_output_file} ---")
if os.path.exists(warning_output_file):
    with open(warning_output_file, "r") as f:
        print(f.read())
else:
    print(f"{warning_output_file} not created (no relevant warnings found or error during processing).")






## 3. Analysis of `log1.txt` Output

Compare the output above with the article's expected parsed results for `log1.txt`:

1.  `time="2020–03–18T14:40:30Z” level=error msg=”Database stream initialisation failed. Have you tried installing the WAL2JASON plugin?” file=”corp/application1/task1.go:96" func=corp/application1/task1.Dblogger kind=application`
2.  `time=”2020–03–18T14:40:31Z” level=warning msg=”Connection to hostA.local failed: host unreachable” file=”corp/application1/task1.go:100" func=corp/application1/task1.Dblogger kind=application`
3.  `time=”2020–03–18T14:40:31Z” level=warning msg=”Connection to hostA.local failed: host unreachable” file=”corp/application1/task1.go:102" func=corp/application1/task1.Dblogger kind=application`
4.  `time=”2020–03–18T14:40:31Z” level=warning msg=”Connection to hostB.local failed: Could not resolve host” file=”corp/application1/task1.go:104" func=corp/application1/task1.Dblogger kind=application`
5.  `time=”2020–03–18T14:40:38Z” level=warning msg=”Failed to write record, duplicate key violation for primary key” file=”corp/interviews/application1/task1.go:114" func=corp/application1/task1.Dblogger kind=application`

The script specifically looks for "failed" in the message. The line `level=warning msg="Archiving data source: old_records_2019"` would not be caught, as intended by the article's script logic ("not all 'Warning' throws a failure message").

## 4. Next Steps: Analyzing `log2.txt` and `log3.txt`

The article challenges the reader: "Can you figure out any discrepancies in `log2.txt` and `log3.txt`? Feel free to share the methods you’ve used to identify the issues."

**To do this:**
1. Download `log2.txt` and `log3.txt` from [https://github.com/jacksonsiachan/sample_logs](https://github.com/jacksonsiachan/sample_logs) and place them in the same directory as this notebook.
2. Modify and re-run the code cell for parsing, changing `source_log_file` to `"log2.txt"` and then to `"log3.txt"`.
3. Examine the generated `error_messages...txt` and `warning_messages...txt` files.
4. Look for patterns, recurring issues, or anything unusual.

**Possible Methods to Identify Issues (beyond the current script):**
* Refine Regex: The current regex is specific. You might need different or broader regex patterns for other types of errors/warnings not containing "failed".
* Count Occurrences: Count how many times specific error or warning messages appear.
* Timestamp Analysis: Look for errors/warnings clustered around specific times.
* Extract More Fields: Modify the regex to capture not just the line, but specific fields like timestamp, message, file, function, etc., and then perhaps output to CSV for easier sorting and filtering in a spreadsheet program or pandas.
* Look for "INFO" logs around errors/warnings: These can provide context about what the application was doing before an issue occurred.

## 5. Further Enhancements 

The article suggests several improvements and alternative approaches:

* CSV Report: Modify the script to parse out individual fields (Timestamp, Level, Description, File, Function, Type) and write them to a structured CSV file. This would involve more complex regex with multiple capture groups.
* Missing Information: The article notes that User IDs, Computer IDs, and specific problematic datasource names could be valuable additions to log messages for better debugging.
* Cloud Logging Solutions:
    * Stream logs to AWS CloudWatch or GCP Cloud Logging (Stackdriver).
    * Use logging agents.
    * Utilize cloud-native analytics (e.g., BigQuery) or built-in alerting.
* Regular Expression Tool: Use online tools like `regex101.com` to build and test regex patterns.