-
Notifications
You must be signed in to change notification settings - Fork 21
/
main.py
124 lines (103 loc) · 3.33 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# coding: utf8
__version__ = '0.2'
from kivy.app import App
from kivy.lang import Builder
from kivy.clock import Clock
from kivy.utils import platform
from jnius import autoclass
from oscpy.client import OSCClient
from oscpy.server import OSCThreadServer
SERVICE_NAME = u'{packagename}.Service{servicename}'.format(
packagename=u'org.kivy.oscservice',
servicename=u'Pong'
)
KV = '''
BoxLayout:
orientation: 'vertical'
BoxLayout:
size_hint_y: None
height: '30sp'
Button:
text: 'start service'
on_press: app.start_service()
Button:
text: 'stop service'
on_press: app.stop_service()
ScrollView:
Label:
id: label
size_hint_y: None
height: self.texture_size[1]
text_size: self.size[0], None
BoxLayout:
size_hint_y: None
height: '30sp'
Button:
text: 'ping'
on_press: app.send()
Button:
text: 'clear'
on_press: label.text = ''
Label:
id: date
'''
class ClientServerApp(App):
def build(self):
self.service = None
# self.start_service()
self.server = server = OSCThreadServer()
server.listen(
address=b'localhost',
port=3002,
default=True,
)
server.bind(b'/message', self.display_message)
server.bind(b'/date', self.date)
self.client = OSCClient(b'localhost', 3000)
self.root = Builder.load_string(KV)
return self.root
def start_service(self):
if platform == 'android':
service = autoclass(SERVICE_NAME)
self.mActivity = autoclass(u'org.kivy.android.PythonActivity').mActivity
argument = ''
service.start(self.mActivity, argument)
self.service = service
elif platform in ('linux', 'linux2', 'macos', 'win'):
from runpy import run_path
from threading import Thread
self.service = Thread(
target=run_path,
args=['src/service.py'],
kwargs={'run_name': '__main__'},
daemon=True
)
self.service.start()
else:
raise NotImplementedError(
"service start not implemented on this platform"
)
def stop_service(self):
if self.service:
if platform == "android":
self.service.stop(self.mActivity)
elif platform in ('linux', 'linux2', 'macos', 'win'):
# The below method will not work.
# Need to develop a method like
# https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s03.html
self.service.stop()
else:
raise NotImplementedError(
"service start not implemented on this platform"
)
self.service = None
def send(self, *args):
self.client.send_message(b'/ping', [])
def display_message(self, message):
if self.root:
self.root.ids.label.text += '{}\n'.format(message.decode('utf8'))
def date(self, message):
if self.root:
self.root.ids.date.text = message.decode('utf8')
if __name__ == '__main__':
ClientServerApp().run()