/
timer.py
97 lines (77 loc) · 2.59 KB
/
timer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import asyncio
from typing import Union
import uuid
# =================================================
# do no change import order for *thread*
# patching threading.Thread
import aio.gthread
# patched module
from threading import Thread
import pygame
import pygame.event
import pygame.time
# ====================================================================
# replace non working native function.
print(
"""\
https://github.com/pygame-web/pygbag/issues/16
applying: use aio green thread for pygame.time.set_timer
"""
)
# build the event and send it directly in the queue
# caveats :
# - could be possibly very late
# - delay cannot be less than frametime at device refresh rate.
# Local testing wrap patch_set_timer in a function and
# only apply on emscripten platform, so running pygame vanilla doesn't break
#
# import platform
#
# def patch_timer():
# THREADS = {}
#
# def patch_set_timer(
# event: Union[int, pygame.event.Event], millis: int, loops: int = 0):
# ...
# async def fire_event(thread_uuid):
# ...
#
# pygame.time.set_timer = patch_set_timer
#
# if platform.system().lower() == "emscripten":
# patch_timer()
# Global var to keep track of timer threads
# - key: event type
# - value: thread uuid
THREADS = {}
def patch_set_timer(event: Union[int, pygame.event.Event], millis: int, loops: int = 0):
"""Patches the pygame.time.set_timer function to use gthreads"""
dlay = float(millis) / 1000
cevent = pygame.event.Event(event)
event_loop = asyncio.get_event_loop()
async def fire_event(thread_uuid):
"""The thread's target function to handle the timer
Early exit conditions:
- event loop is closed
- event type is no longer in THREADS dictionary
- the thread's uuid is not the latest one
- Max loop iterations if loops param is not zero
"""
loop_counter = 0
while True:
await asyncio.sleep(dlay)
if event_loop.is_closed() or event not in THREADS or THREADS[event] != thread_uuid or (loops and loop_counter >= loops):
break
pygame.event.post(cevent)
loop_counter += 1 if loops else 0
if dlay > 0:
# uuid is used to track the latest thread,
# stale threads will be terminated
thread_uuid = uuid.uuid4()
Thread(target=fire_event, args=[thread_uuid]).start()
THREADS[event] = thread_uuid
else:
# This cancels the timer for the event
if event in THREADS:
del THREADS[event]
pygame.time.set_timer = patch_set_timer