-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathclient.py
315 lines (232 loc) · 8.84 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
"""Speak to parent process
_____________ ___________
| | | |
| pyblish-qml | | e.g. Maya |
| | | |
| stdout o--------> |
| | | |
| stdin <--------o |
| | | |
| | | |
|_____________| |___________|
"""
import os
import sys
import json
import threading
import pyblish.api
import pyblish.plugin
from ..vendor import six
from ..vendor.six.moves import queue
class Proxy(object):
"""Messages sent from QML to parent process"""
channels = {
"response": queue.Queue(),
"parent": queue.Queue(),
}
def __init__(self):
self.cached_context = list()
self.cached_discover = list()
self._self_destruct()
self._listen()
def stats(self):
return self._dispatch("stats")
def reset(self):
return self._dispatch("reset")
def test(self, **vars):
"""Vars can only be passed as a non-keyword argument"""
return self._dispatch("test", kwargs=vars)
def ping(self):
self._dispatch("ping", args=[])
return True
def process(self, plugin, context, instance=None, action=None):
"""Transmit a `process` request to host
Arguments:
plugin (PluginProxy): Plug-in to process
context (ContextProxy): Filtered context
instance (InstanceProxy, optional): Instance to process
action (str, optional): Action to process
"""
plugin = plugin.to_json()
instance = instance.to_json() if instance is not None else None
return self._dispatch("process", args=[plugin, instance, action])
def repair(self, plugin, context, instance=None):
plugin = plugin.to_json()
instance = instance.to_json() if instance is not None else None
return self._dispatch("repair", args=[plugin, instance])
def context(self):
self.cached_context = ContextProxy.from_json(self._dispatch("context"))
return self.cached_context
def discover(self):
self.cached_discover[:] = list()
for plugin in self._dispatch("discover"):
self.cached_discover.append(PluginProxy.from_json(plugin))
return self.cached_discover
def emit(self, signal, **kwargs):
self._dispatch("emit", args=[signal, kwargs])
def update(self, key, value):
self._dispatch("update", args=[key, value])
def _self_destruct(self):
"""Auto quit exec if parent process failed
"""
# This will give parent process 15 seconds to reset.
self._kill = threading.Timer(15, lambda: os._exit(0))
self._kill.start()
def _listen(self):
"""Listen for messages passed from parent
This method distributes messages received via stdin to their
corresponding channel. Based on the format of the incoming
message, the message is forwarded to its corresponding channel
to be processed by its corresponding handler.
"""
def _listen():
"""This runs in a thread"""
for line in iter(sys.stdin.readline, b""):
try:
response = json.loads(line)
except Exception as e:
# The parent has passed on a message that
# isn't formatted in any particular way.
# This is likely a bug.
raise e
else:
if response.get("header") == "pyblish-qml:popen.response":
self.channels["response"].put(line)
elif response.get("header") == "pyblish-qml:popen.parent":
self.channels["parent"].put(line)
elif response.get("header") == "pyblish-qml:server.pulse":
self._kill.cancel() # reset timer
self._self_destruct()
else:
# The parent has passed on a message that
# is JSON, but not in any format we recognise.
# This is likely a bug.
raise Exception("Unhandled message "
"passed to Popen, '%s'" % line)
thread = threading.Thread(target=_listen)
thread.daemon = True
thread.start()
def _dispatch(self, func, args=None):
"""Send message to parent process
Arguments:
func (str): Name of function for parent to call
args (list, optional): Arguments passed to function when called
"""
data = json.dumps(
{
"header": "pyblish-qml:popen.request",
"payload": {
"name": func,
"args": args or list(),
}
}
)
# This should never happen. Each request is immediately
# responded to, always. If it isn't the next line will block.
# If multiple responses were made, then this will fail.
# Both scenarios are bugs.
assert self.channels["response"].empty(), (
"There were pending messages in the response channel")
# To ensure successful IPC message parsing, the message and the
# surrounding delimiters must be passed to the stream object at once.
# See https://github.com/pyblish/pyblish-qml/pull/325 for more info.
sys.stdout.write("\n" + data + "\n")
sys.stdout.flush()
try:
message = self.channels["response"].get()
if six.PY3:
response = json.loads(message)
else:
response = _byteify(json.loads(message, object_hook=_byteify))
except TypeError as e:
raise e
else:
assert response["header"] == "pyblish-qml:popen.response", response
return response["payload"]
def _byteify(data):
"""Convert unicode to bytes"""
# Unicode
if isinstance(data, six.text_type):
return data.encode("utf-8")
# Members of lists
if isinstance(data, list):
return [_byteify(item) for item in data]
# Members of dicts
if isinstance(data, dict):
return {
_byteify(key): _byteify(value) for key, value in data.items()
}
# Anything else, return the original form
return data
# Object Proxies
class ContextProxy(pyblish.api.Context):
"""Context Proxy
Given a JSON-representation of a Context, emulate its interface.
"""
def create_instance(self, name, **kwargs):
instance = InstanceProxy(name, parent=self)
instance.data.update(kwargs)
return instance
@classmethod
def from_json(cls, context):
self = cls()
self._id = context["id"]
self._data = context["data"]
self[:] = list(InstanceProxy.from_json(i)
for i in context["children"])
# Attach metadata
self._data["pyblishClientVersion"] = pyblish.api.version
return self
def to_json(self):
return {
"name": self.name,
"id": self.id,
"data": dict(self.data), # Convert pyblish.plugin._Dict object
"children": list(self),
}
class InstanceProxy(pyblish.api.Instance):
"""Instance Proxy
Given a JSON-representation of an Instance, emulate its interface.
"""
@classmethod
def from_json(cls, instance):
self = cls(instance["name"])
self._id = instance["id"]
self._data = instance["data"]
self[:] = instance["children"]
return self
def to_json(self):
return {
"name": self.name,
"id": self.id,
"data": dict(self.data),
"children": list(self),
}
class PluginProxy(object):
"""Plug-in Proxy
Given a JSON-representation of an Plug-in, emulate its interface.
"""
@classmethod
def from_json(cls, plugin):
"""Build PluginProxy object from incoming dictionary
Emulate a plug-in by providing access to attributes
in the same way they are accessed using the remote object.
This allows for it to be used by members of :mod:`pyblish.logic`.
"""
process = None
repair = None
name = plugin["name"] + "Proxy"
cls = type(name, (cls,), plugin)
# Emulate function
for name in ("process", "repair"):
args = ", ".join(plugin["process"]["args"])
func = "def {name}({args}): pass".format(name=name,
args=args)
exec(func)
cls.process = process
cls.repair = repair
cls.__orig__ = plugin
return cls
@classmethod
def to_json(cls):
return cls.__orig__.copy()