In [200]:
from typing import Optional, Iterator, NamedTuple

from csv import writer as csvwriter
from datetime import datetime, timezone
from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path
import re
import time

import cv2 as cv
from dateutil.tz import gettz
from matplotlib import pyplot as plt
import numpy as np
from PIL import Image
import pyautogui
import pytesseract

In [201]:
class Coord(NamedTuple):
    x: int
    y: int
    
    def __add__(self, other):
        return self.__class__(
            self.x + other.x,
            self.y + other.y,
        )

In [202]:
def show_image(img):
    plt.figure(figsize = (15,15))
    plt.imshow(cv.cvtColor(img, cv.COLOR_BGR2RGB))
    plt.show()
    
def save_image(arr, path):
    converted = cv.cvtColor(arr, cv.COLOR_BGR2RGB)
    img = Image.fromarray(converted)
    img.save(path)

In [211]:
def next_screen(location, delay=1):
    click_loc = location + Coord(1322, 690+11)
    pyautogui.moveTo(*click_loc, delay)
    pyautogui.click()
    
def take_screenshot():
    screenshot = pyautogui.screenshot()
    screenshot = cv.cvtColor(np.array(screenshot), cv.COLOR_RGB2BGR)
    
    return screenshot

def find_guild_log(search):
    log_offset = Coord(19, 126)
    log_dims = Coord(1293, 575)

    method = cv.TM_CCOEFF
    target = cv.imread('guild_log_corner.png')
    
    result = cv.matchTemplate(search, target, cv.TM_CCOEFF)
    max_loc = Coord(*cv.minMaxLoc(result)[3])
    
    top_left = max_loc + log_offset
    
    bottom_right = top_left + log_dims
    cropped = search[top_left.y:bottom_right.y, top_left.x:bottom_right.x]
    
    # show_image(cropped)
    
    return cropped, max_loc

def extract_event_lines(cropped):
    text = pytesseract.image_to_string(cropped)
    lines = [
        line.strip()
        for line in text.split("\n")
        if len(line) > 0
    ]

    RECORD_COUNT = 20

    records = [
        " ".join(lines[r::RECORD_COUNT])
        for r in range(RECORD_COUNT)
    ]
    
    return [r for r in records if r]

def find_events(screenshot_path = ""):
    screenshot = take_screenshot()
    guild_log, guild_log_loc = find_guild_log(screenshot)
    lines = extract_event_lines(guild_log)
    events = parse_events(lines)
    
    if screenshot_path:
        for e in events:
            e.screenshot_filename = str(screenshot_path)
    
    return events, guild_log_loc, guild_log

def find_all_events(limit=1025, dump=True):
    all_events = []
    screenshot_count = 0
    
    FILENAME_BASE = datetime.now(gettz("EST")).strftime(r"%Y%m%d_%H%M")
    DEDUPLICATE = 1
    
    output_path = Path("output")
    if not output_path.exists():
        output_path.mkdir()
    
    def get_screenshot_path(index):
        return output_path / Path(f"{FILENAME_BASE}_{DEDUPLICATE:02d}_{index:03d}.png")
    
    while get_screenshot_path(1).exists():
        DEDUPLICATE += 1
    
    screenshot_count += 1
    screenshot_path = get_screenshot_path(screenshot_count)
    new_events, window_loc, screenshot = find_events(screenshot_path)
    all_events.extend(new_events)
    
    if len(all_events) <= 0:
        print("Unable to scrape events")
        return []
    
    if dump:
        
        save_image(screenshot, screenshot_path)
    
    while True:
        if len(all_events) > limit:
            print("Reached watchdog limit")
            break
            
        next_screen(window_loc, delay=0.5)
        time.sleep(0.5)
        screenshot_count += 1
        screenshot_path = get_screenshot_path(screenshot_count)
        new_events, window_loc, screenshot = find_events(screenshot_path)
        
        if new_events[-1].date <= all_events[-1].date and new_events[-1].target == all_events[-1].target:
            print("Reached the end of the window")
            print(f"  {all_events[-1]}")
            print(f"  {new_events[-1]}")
            break
            
        if dump:
            save_image(screenshot, screenshot_path)
        all_events.extend(new_events)
        
    if dump:
        with open(output_path / Path(f"{FILENAME_BASE}_{DEDUPLICATE:02d}.csv"), "w", encoding="utf-8") as csv_fh:
            csv_file = csvwriter(csv_fh)
            
            csv_file.writerow(["Error", "Raw", "Date", "Action", "Actor", "Target", "Rank", "Image Filename"])
            for e in all_events:
                csv_file.writerow([
                    str(e.error),
                    e.raw,
                    e.date.strftime(r"%Y-%b-%d %H:%M"),
                    e.action.value.text,
                    e.actor,
                    e.target,
                    e.rank,
                    e.screenshot_filename,
                ])
    
    return all_events

class EventActionDef(NamedTuple):
    text: str
    parse: Optional[re.Pattern] = None
    extra_fields: tuple[str, ...] = ()
    
class EventAction(EventActionDef, Enum):
    UNKNOWN  = ("UNKNOWN")
    INVITED  = ("INVITED",  re.compile(r"(?P<actor>.+) has invited (?P<target>.+) to join the guild", re.I), ())
    JOINED   = ("JOINED",   re.compile(r"(?P<actor>.+) has joined the guild", re.I),                         ())
    PROMOTED = ("PROMOTED", re.compile(r"(?P<actor>.+) has promoted (?P<target>.+) to (?P<rank>.+)", re.I),  ("rank",))
    DEMOTED  = ("DEMOTED",  re.compile(r"(?P<actor>.+) has demoted (?P<target>.+) to (?P<rank>.+)", re.I),   ("rank",))
    LEFT     = ("LEFT",     re.compile(r"(?P<actor>.+) has left the guild", re.I),                           ())
    REMOVED  = ("REMOVED",  re.compile(r"(?P<actor>.+) has removed (?P<target>.+) from the guild", re.I),    ())
    BANNED   = ("BANNED",   re.compile(r"(?P<actor>.+) has banned (?P<target>.+) from the guild", re.I),     ())
    
    def __repr__(self):
        return f"<{self.__class__.__qualname__}:{self.name}>"

RE_DATETIME = re.compile(
    r"""
        \s*(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<year>[0-9]+)
        \s+(?P<hour>[0-9]+):(?P<minute>[0-9]+)
    """,
    re.IGNORECASE + re.VERBOSE,
)
    
@dataclass
class Event:
    date: datetime = field(default_factory=lambda: datetime.utcfromtimestamp(0))
    action: EventAction = EventAction.UNKNOWN
    actor: str = ""
    target: str = ""
    rank: str = ""
    raw: str = ""
    error: bool = False
    screenshot_filename: str = ""
    
    @classmethod
    def parse(cls, raw: str):
    
        DEBUG = False
        
        event_fields = {
            "raw": raw,
            "error": False,
        }
        
        m = RE_DATETIME.match(raw)
        
        if not m:
            DEBUG and print(f"Unable to parse date from '{raw}'")
            event_fields["error"] = True
        
        else:
            event_fields["date"] = datetime(
                year = int(m['year']),
                month = int(m['month']),
                day = int(m['day']),
                hour = int(m['hour']),
                minute = int(m['minute']),
            )
        
        
        action = EventAction.UNKNOWN
        for a in EventAction:
            if a.text in raw and a is not EventAction.UNKNOWN:
                action = a
                break
                
        if action is EventAction.UNKNOWN:
            DEBUG and print(f"Unable to parse event action from '{raw}'")
            event_fields["error"] = True
        
        event_fields["action"] = action
        
        
        if action.value.parse is None:
            DEBUG and print(f"Unable to parse event details: {action.value.text} has no rule")
            event_fields["error"] = True
        
        else:
            m = action.value.parse.search(
                raw.split(action.value.text)[-1].strip()
            )  
            if m is None:
                DEBUG and print(f"Unable to parse {action.value.text} details from '{raw}'")
                event_fields["error"] = True
            
            else:
                fields = m.groupdict()
        
                event_fields["actor"] = fields["actor"]
                event_fields["target"] = fields.get("target", fields["actor"])
                
                for fn in action.value.extra_fields:
                    event_fields[fn] = fields[fn]
        
        return cls(**event_fields)

    def __str__(self):
        return f"{self.date} : actor={self.actor}, action={self.action.value.text}, target={self.target}, rank={self.rank}, error={self.error}"

def parse_events(lines):
    return [
        Event.parse(line)
        for line in lines
    ]

def print_events(events):
    for e in events:
        print(e)

In [212]:
start = time.time()
events = find_all_events()
duration = time.time() - start
print(f"Found {len(events)} events in {duration:.0f} seconds")

Reached the end of the window
  2023-01-13 20:30:00 : actor=Exiledraider, action=PROMOTED, target=Zi'kich, rank=Luminary., error=False
  2023-01-13 20:30:00 : actor=Exiledraider, action=PROMOTED, target=Zi'kich, rank=Luminary., error=False
Found 1020 events in 160 seconds
