## Log Standardisation and Extraction System

In [None]:
import pandas as pd
import re
from datetime import datetime

In [None]:
#=== STEP 1: Load Files ===

#Load desired fields and manual input flags from Excel
field_df = pd.read_excel("Data Extraction Fields.xlsx", header=None)
field_df.columns = ["Field Name", "Manual Input"]
field_df["Field Name"] = field_df["Field Name"].str.strip()
field_df["Manual Input"] = field_df["Manual Input"].fillna("").astype(str).str.lower()

#Get field names and manual input fields
desired_fields = field_df["Field Name"].tolist()
manual_fields = field_df[field_df["Manual Input"].isin(["yes", "true", "manual"])]["Field Name"].tolist()

#Load planned test cases CSV
planned_df = pd.read_csv("refined_security_test_cases.csv")

#Read and split log file
with open("updated_logs.txt", "r", encoding="utf-8") as f:
    log_blocks = f.read().split("======================================================")

In [None]:
#=== STEP 2: Parse Logs in Pairs ===

parsed_logs = []
i = 0
while i < len(log_blocks) - 1:
    req = log_blocks[i].strip()
    res = log_blocks[i + 1].strip()
    i += 2

    entry = {
        "Date of Test Executed": "",
        "Project ID": "",
        "User ID": "",
        "Test Case ID": "",
        "User Agent": "",
        "Request Method": "",
        "Requested Source": "",
        "Test Case Outcome Message": "",
        "Error Message": "",
        "Response Size": ""
    }

    #Parse request block
    for line in req.splitlines():
        line = line.strip()
        if not line:
            continue
        if re.match(r"^(GET|POST|PUT|DELETE|HEAD|OPTIONS|PATCH)", line):
            parts = line.split()
            entry["Request Method"] = parts[0]
            entry["Requested Source"] = parts[1]
        elif "User-Agent:" in line:
            entry["User Agent"] = line.split("User-Agent:")[-1].strip()
        elif "Project ID:" in line:
            entry["Project ID"] = line.split("Project ID:")[-1].strip()
        elif "User ID:" in line:
            entry["User ID"] = line.split("User ID:")[-1].strip()
        elif "Test Case ID:" in line:
            entry["Test Case ID"] = line.split("Test Case ID:")[-1].strip()

    #Parse response block
    for line in res.splitlines():
        line = line.strip()
        if not line:
            continue
        if line.startswith("HTTP"):
            entry["Test Case Outcome Message"] = "Pass" if "200 OK" in line else "Fail"
            entry["Error Message"] = "" if "200 OK" in line else line
        elif line.lower().startswith("date:"):
            raw_date = line.split(":", 1)[-1].strip()
            try:
                parsed_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z")
                entry["Date of Test Executed"] = parsed_date.strftime("%d/%m/%Y")
            except Exception:
                entry["Date of Test Executed"] = raw_date
        elif "Content-Length:" in line:
            entry["Response Size"] = line.split("Content-Length:")[-1].strip()

    #Append only if key fields exist
    if entry["Project ID"] and entry["User ID"] and entry["Test Case ID"]:
        parsed_logs.append(entry)

logs_df = pd.DataFrame(parsed_logs)

In [None]:
#=== STEP 3: Merge with Planned Test Cases ===

merged_df = pd.merge(planned_df, logs_df, on=["Project ID", "User ID", "Test Case ID"], how="left")

#Format Deadline field if it exists
if "Deadline" in merged_df.columns:
    try:
        merged_df["Deadline"] = pd.to_datetime(merged_df["Deadline"], errors='coerce').dt.strftime("%d/%m/%Y")
    except Exception:
        pass

#Dynamically generate Test Case Status
merged_df["Test Case Status"] = merged_df.apply(
    lambda row: "Completed" if pd.notna(row["Date of Test Executed"]) else "Not Started", axis=1
)

#=== STEP 4: Add Blank Manual Input Columns ===
for col in manual_fields:
    if col not in merged_df.columns:
        merged_df[col] = ""

#=== STEP 5: Align Final Output Columns ===

#Normalise columns to lowercase for safe matching
merged_df.columns = [col.lower().strip() for col in merged_df.columns]
final_cols_lower = [col.lower().strip() for col in desired_fields]

#Ensure all desired fields exist
for col in final_cols_lower:
    if col not in merged_df.columns:
        merged_df[col] = ""

#Reorder and restore headers
final_output = merged_df[final_cols_lower]
final_output.columns = desired_fields  #Restore original header casing

In [None]:
#=== STEP 6: Export to CSV ===
final_output.to_csv("logs_system_output.csv", index=False)