-
Notifications
You must be signed in to change notification settings - Fork 1
/
ls_connection.py
103 lines (81 loc) · 2.25 KB
/
ls_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
ls refers to LiveSplit in the entire document.
Conversation with livesplit is done through the server component.
"""
import socket
from threading import Thread
import config
import select # used for checking if socket has data pending
def ls_connect(ls_socket, call_func, window, server_port):
"""Connects given socket to the livesplit server."""
con_thread = Thread(target=try_connection, args=(ls_socket, call_func, window, server_port))
con_thread.start()
def init_socket():
"""Returns a fresh socket"""
return socket.socket()
def try_connection(ls_socket, call_func, window, server_port):
"""
Tries to connect given socket to ls.
If connection is successful given "call_func"
is called with window as argument.
(made to be ran in a separate thread)
"""
try:
ls_socket.connect((config.HOST, server_port))
except:
return False
call_func(window)
def close_socket(com_socket):
"""Closes given socket."""
com_socket.close()
def check_connection(ls_socket):
"""
Check so connection between socket and livesplit
is still active and working.
Returns boolean
"""
if send_to_ls(ls_socket, "best_possible"):
return True
else:
return False
def send_to_ls(ls_socket, command):
"""
Sends given command to ls using given socket.
If connected is False, tries to send without socket being connected to ls.
Returns the response, or False if an error occurs.
Check config.LS_COMMANDS for avaiable commands.
"""
try:
ls_socket.send(str.encode(config.LS_COMMANDS[command]))
except:
return False
socket_ready = select.select([ls_socket], [], [], config.COM_TIMEOUT)
if socket_ready[0]:
try:
return (ls_socket.recv(1000)).decode("utf-8")
except:
return False
else:
return False
def get_split_index(ls_socket):
"""
Returns the index of the active split in livesplit.
Returns -1 if timer is not yet started.
(First split is 0)
Returns False on Error
"""
ls_data = send_to_ls(ls_socket, "cur_split_index")
if not isinstance(ls_data, bool):
return int(ls_data)
else:
return False
def get_split_name(ls_socket):
"""
Returns name of the active split in livesplit.
Returns False if no split is active or Error occurs.
"""
ls_data = send_to_ls(ls_socket, "cur_split_name")
if ls_data:
return ls_data
else:
return False