In [None]:
%pip install pynput
%pip install screeninfo

## Data Collection

In [2]:
from pynput import mouse
import threading
import time
from screeninfo import get_monitors
from glob import glob

ACTION = "chess"
TIME = 10  # in minutes
NAME = "MS"

# Get monitor information
monitors = get_monitors()
for monitor in monitors:
    print(f"  Dimensions: {monitor.width}x{monitor.height}")
    print(f"  Position: ({monitor.x}, {monitor.y})\n")

count = len(glob(f"recordings/{ACTION}_mouse_events_{NAME}*.txt"))
filename = f"recordings/{ACTION}_mouse_events_{NAME}_{count}.txt"

with open(filename, "w") as f:
    for id, monitor in enumerate(monitors):
        f.write(str((id, monitor.width, monitor.height, monitor.x, monitor.y)) + "\n")

# List to store mouse events
event_list = []
# Lock to synchronize access to event_list
lock = threading.Lock()
# Global listener variable
listener = None

def get_monitor_for_position(x, y):
    for idx, monitor in enumerate(monitors):
        if (monitor.x <= x < monitor.x + monitor.width and
            monitor.y <= y < monitor.y + monitor.height):
            return idx
    return None  # If position is not within any monitor

def adjust_coordinates(x, y):
    monitor_idx = get_monitor_for_position(x, y)
    if monitor_idx is None:
        return x, y
    monitor = monitors[monitor_idx]
    x_adj = x - monitor.x
    y_adj = y - monitor.y
    if monitor_idx != 0:
        x_adj = x_adj % monitors[0].width
        y_adj = y_adj % monitors[0].height
    return x_adj, y_adj

def on_move(x, y):
    x_adj, y_adj = adjust_coordinates(x, y)
    with lock:
        event_list.append(("move", time.time(), x_adj, y_adj))

def on_click(x, y, button, pressed):
    x_adj, y_adj = adjust_coordinates(x, y)
    with lock:
        event_list.append(("click", time.time(), x_adj, y_adj, button.name, pressed))

def on_scroll(x, y, dx, dy):
    x_adj, y_adj = adjust_coordinates(x, y)
    with lock:
        event_list.append(("scroll", time.time(), x_adj, y_adj, dx, dy))

def flush_events():
    global listener
    interval = 0.5
    elapsed = 0
    while True:
        time.sleep(interval)
        elapsed += interval
        with lock:
            if event_list:
                # Write events to file
                with open(filename, "a") as f:
                    for event in event_list:
                        f.write(str(event) + "\n")
                event_list.clear()
        # print minutes, seconds remaining
        remain = int(TIME * 60 - elapsed)
        minutes = remain // 60
        seconds = remain % 60
        print(f"\r{minutes:02}:{seconds:02}", end="")
        if listener is not None and not listener.running:
            break

# Start the mouse listener and the flushing thread
def main():
    global listener
    flush_thread = threading.Thread(target=flush_events)
    flush_thread.daemon = True
    flush_thread.start()

    total = TIME * 60  # total seconds

    with mouse.Listener(
        on_move=on_move, on_click=on_click, on_scroll=on_scroll
    ) as listener:
        # Start timer to stop the listener after total seconds
        timer = threading.Timer(total, listener.stop)
        timer.start()
        listener.join()

    # After listener stops, flush any remaining events
    with lock:
        if event_list:
            with open(filename, "a") as f:
                for event in event_list:
                    f.write(str(event) + "\n")
            event_list.clear()
    print("\nRecording finished.")

if __name__ == "__main__":
    # Count down 3 seconds
    print("Recording starts in 3 seconds...")
    for i in range(3, 0, -1):
        print(i)
        time.sleep(1)
    print("Recording started.")
    main()

  Dimensions: 1728x1117
  Position: (0, 0)

Recording starts in 3 seconds...
3
2
1
Recording started.
00:05
Recording finished.


00:05

: 