-
Notifications
You must be signed in to change notification settings - Fork 1
/
mediastream
executable file
·433 lines (307 loc) · 14.1 KB
/
mediastream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
#!/usr/bin/env python3
#
# Copyright (c) 2020 Gareth Palmer <gareth.palmer3@gmail.com>
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2.
import sys
import os
import os.path
import re
import getopt
import traceback
import socket
import ipaddress
import concurrent.futures
from lxml import etree
import requests
import requests.auth
import requests.adapters
import gi; gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
CGI_ERRORS = {
'1': 'Error parsing CiscoIPPhone object',
'2': 'Error framing CiscoIPPhone object',
'3': 'Internal file error',
'4': 'Authentication error'
}
class ProgramError(Exception):
pass
class HTTPSAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
kwargs['assert_hostname'] = False
super().init_poolmanager(*args, **kwargs)
def cgi_execute(target_hostname, timeout, username, password, certificate_file, xml):
if username != '':
auth = requests.auth.HTTPBasicAuth(username, password)
else:
auth = None
scheme = 'https' if certificate_file else 'http'
try:
session = requests.Session()
session.mount('https://', HTTPSAdapter())
response = session.post(f'{scheme}://{target_hostname}/CGI/Execute',
timeout = timeout, auth = auth, verify = certificate_file, data = {'XML': xml})
response.raise_for_status()
except requests.RequestException as error:
raise ProgramError(error)
if response.headers['Content-Type'][0:8] != 'text/xml':
raise ProgramError('Unexpected Content-Type: ' + response.headers['Content-Type'])
return response.content
def start_media(target_hostnames, multicast_address, port, timeout, username, password, certificate_file, volume, codec):
if multicast_address is not None:
source_address = multicast_address
else:
# Find our source address using the first target host
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
udp_socket.connect((target_hostnames[0], 0))
source_address = udp_socket.getsockname()[0]
xml = '<?xml version="1.0" charset="UTF-8">' \
'<startMedia>' \
'<mediaStream' + (' receiveVolume="' + str(volume) + '"' if volume != -1 else '') + '>' \
'<type>audio</type>' \
'<codec>G.' + codec[1:] + '</codec>' \
'<mode>receive</mode>' \
'<address>' + source_address + '</address>' \
'<port>' + str(port) + '</port>' \
'</mediaStream>' \
'</startMedia>'
with concurrent.futures.ThreadPoolExecutor(max_workers = 32) as executor:
futures = {}
for target_hostname in target_hostnames:
future = executor.submit(cgi_execute, target_hostname, timeout, username, password, certificate_file, xml)
futures[future] = target_hostname
for future in concurrent.futures.as_completed(futures):
target_hostname = futures[future]
try:
document = etree.fromstring(future.result())
if document.tag == 'CiscoIPPhoneError':
number = document.get('Number', '')
raise ProgramError('Error: ' + CGI_ERRORS.get(number, number))
if document.tag == 'errorResponse':
type = document.findtext('type', '')
data = document.findtext('data', '')
raise ProgramError(f'Error {type}: {data}')
if document.tag != 'mediaStream':
raise ProgramError(f'Unexpected XML: {document.tag}')
except Exception as error:
# Remove address so we don't try and send it audio
target_hostnames.remove(target_hostname)
print(f'{target_hostname} {error}')
def stream_media(target_hostnames, multicast_address, port, codec, wav_file):
Gst.init(None)
file_src = Gst.ElementFactory.make('filesrc', None)
if file_src is None:
raise ProgramError('No \'filesrc\' plugin')
file_src.set_property('location', wav_file)
wav_parse = Gst.ElementFactory.make('wavparse', None)
if wav_parse is None:
raise ProgramError('No \'wavparse\' plugin')
audio_convert = Gst.ElementFactory.make('audioconvert', None)
if audio_convert is None:
raise ProgramError('No \'audioconvert\' plugin')
audio_resample = Gst.ElementFactory.make('audioresample', None)
if audio_resample is None:
raise ProgramError('No \'audioresample\' plugin')
if codec == 'g711':
caps_filter = Gst.Caps.from_string('audio/x-raw,channels=1,rate=8000')
if caps_filter is None:
raise ProgramError('Unable to parse caps')
mulaw_enc = Gst.ElementFactory.make('mulawenc', None)
if mulaw_enc is None:
raise ProgramError('No \'mulawenc\' plugin')
rtp_pcmu_pay = Gst.ElementFactory.make('rtppcmupay', None)
if rtp_pcmu_pay is None:
raise ProgramError('No \'rtppcmupay\' plugin')
rtp_pcmu_pay.set_property('pt', 0)
rtp_pcmu_pay.set_property('min-ptime', 20000000)
rtp_pcmu_pay.set_property('max-ptime', 20000000)
elif codec == 'g722':
caps_filter = Gst.Caps.from_string('audio/x-raw,channels=1,rate=16000')
if caps_filter is None:
raise ProgramError('Unable to parse caps')
avenc_g722 = Gst.ElementFactory.make('avenc_g722', None)
if avenc_g722 is None:
raise ProgramError('No \'avenc_g722\' plugin')
rtp_g722_pay = Gst.ElementFactory.make('rtpg722pay', None)
if rtp_g722_pay is None:
raise ProgramError('No \'rtpg722pay\' plugin')
rtp_g722_pay.set_property('pt', 0)
rtp_g722_pay.set_property('min-ptime', 20000000)
rtp_g722_pay.set_property('max-ptime', 20000000)
udp_sink = Gst.ElementFactory.make('udpsink', None)
if udp_sink is None:
raise ProgramError('No \'udpsink\' plugin')
if multicast_address is not None:
udp_sink.set_property('clients', f'{multicast_address}:{port}')
else:
udp_sink.set_property('clients', ','.join([f'{target_hostname}:{port}' for target_hostname in target_hostnames]))
udp_sink.set_property('sync', True)
def bus_message(self, message, main_loop):
if message.type == Gst.MessageType.EOS:
main_loop.quit()
elif message.type == Gst.MessageType.ERROR:
error, reason = message.parse_error()
main_loop.quit()
print(f'Error {error}: {reason}', file = sys.stderr)
main_loop = GLib.MainLoop()
pipeline = Gst.Pipeline.new('pipeline')
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', bus_message, main_loop)
pipeline.add(file_src)
pipeline.add(wav_parse)
pipeline.add(audio_convert)
pipeline.add(audio_resample)
if codec == 'g711':
pipeline.add(mulaw_enc)
pipeline.add(rtp_pcmu_pay)
elif codec == 'g722':
pipeline.add(avenc_g722)
pipeline.add(rtp_g722_pay)
pipeline.add(udp_sink)
# Only link elements after they have been added to the pipeline
file_src.link(wav_parse)
wav_parse.link(audio_convert)
audio_convert.link(audio_resample)
if codec == 'g711':
audio_resample.link_filtered(mulaw_enc, caps_filter)
mulaw_enc.link(rtp_pcmu_pay)
rtp_pcmu_pay.link(udp_sink)
elif codec == 'g722':
audio_resample.link_filtered(avenc_g722, caps_filter)
avenc_g722.link(rtp_g722_pay)
rtp_g722_pay.link(udp_sink)
if multicast_address is not None:
print(f'Streaming {wav_file}: {multicast_address}')
else:
print(f'Streaming {wav_file}: ' + ', '.join(target_hostnames))
pipeline.set_state(Gst.State.PLAYING)
main_loop.run()
pipeline.set_state(Gst.State.NULL)
def stop_media(target_hostnames, timeout, username, password, certificate_file):
xml = '<?xml version="1.0" charset="UTF-8">' \
'<stopMedia>' \
'<mediaStream />' \
'</stopMedia>'
with concurrent.futures.ThreadPoolExecutor(max_workers = 32) as executor:
futures = {}
for target_hostname in target_hostnames:
future = executor.submit(cgi_execute, target_hostname, timeout, username, password, certificate_file, xml)
futures[future] = target_hostname
for future in concurrent.futures.as_completed(futures):
target_hostname = futures[future]
try:
document = etree.fromstring(future.result())
if document.tag == 'CiscoIPPhoneError':
number = document.get('Number', '')
raise ProgramError('Error: ' + CGI_ERRORS.get(number, number))
except Exception as error:
print(f'{target_hostname} {error}')
def main():
try:
short_options = 'f:t:m:P:v:C:u:p:c:H'
long_options = ['file=', 'timeout=', 'multicast=', 'port=', 'volume=', 'codec=', 'username=', 'password=', 'certificate=', 'help']
try:
options, arguments = getopt.gnu_getopt(sys.argv[1:], short_options, long_options)
except getopt.GetoptError as error:
raise ProgramError(error.msg[0].upper() + error.msg[1:] +
'. Try \'' + os.path.basename(sys.argv[0]) + ' --help\' for more information')
wav_file = None
timeout = 3
multicast_address = None
port = 20480
volume = -1
codec = 'g711'
username = ''
password = ''
certificate_file = None
help = False
for option, argument in options:
if option in ('-f', '--file'):
wav_file = argument
elif option in ('-t', '--timeout'):
timeout = argument
try:
timeout = int(timeout)
if timeout < 1:
raise ValueError
except ValueError:
raise ProgramError(f'Invalid timeout: {timeout}')
elif option in ('-m', '--multicast'):
multicast_address = argument
try:
multicast_address = ipaddress.IPv4Address(multicast_address)
if not multicast_address.is_multicast:
raise ipaddress.AddressValueError
except ipaddress.AddressValueError:
raise ProgramError(f'Invalid multicast IP address: {multicast_address}')
multicast_address = multicast_address.compressed
elif option in ('-P', '--port'):
port = argument
try:
port = int(port)
if port < 20480 or port > 65535 or port % 2:
raise ValueError
except ValueError:
raise ProgramError(f'Invalid port: {port}')
elif option in ('-v', '--volume'):
volume = argument
try:
volume = int(volume)
if volume < 0 or volume > 100:
raise ValueError
except ValueError:
raise ProgramError(f'Invalid volume: {volume}')
elif option in ('-C', '--codec'):
codec = argument
if codec not in ('g711', 'g722'):
raise ProgramError(f'Invalid codec: {codec}')
elif option in ('-u', '--username'):
username = argument
elif option in ('-p', '--password'):
password = argument
elif option in ('-c', '--certificate'):
certificate_file = argument
elif option in ('-H', '--help'):
help = True
if help:
print('Usage: ' + os.path.basename(sys.argv[0]) + ' -f FILE [OPTIONS] TARGET-HOST...\n'
'Stream media to one or more Cisco IP Phones.\n'
'\n'
' -f, --file FILE .wav file to stream\n'
' -t, --timeout TIMEOUT request timeout in seconds (default 3)\n'
' -m, --multicast ADDRESS multicast the stream instead of using multiple unicast streams\n'
' -P, --port PORT destination port on phone (default 20480)\n'
' -v, --volume VOLUME volume percent (1-100) on phone\n'
' -C, --codec CODEC g711 or g722 (default g711)\n'
' -u, --username USERNAME authentication username\n'
' -p, --password PASSWORD authentication password\n'
' -c, --certificate CERT-FILE connect using SSL and verify using certificate\n'
' -H, --help print this help and exit\n')
return
if not len(arguments):
raise ProgramError('No target hosts specified')
target_hostnames = []
for target_hostname in arguments:
if not re.search(r'(?xi) ^ (?: [a-z0-9\-]+ \.)* [a-z0-9\-]+ $', target_hostname):
raise ProgramError(f'Invalid target host: {target_hostname}')
target_hostnames.append(target_hostname)
if wav_file is None:
raise ProgramError('No .wav file specified')
# Invalid hostnames will be removed from the list
start_media(target_hostnames, multicast_address, port, timeout, username, password, certificate_file, volume, codec)
if len(target_hostnames):
try:
stream_media(target_hostnames, multicast_address, port, codec, wav_file)
except Exception as error:
print(f'{error}', file = sys.stderr)
stop_media(target_hostnames, timeout, username, password, certificate_file)
except ProgramError as error:
print(str(error), file = sys.stderr)
exit(1)
except Exception:
traceback.print_exc(file = sys.stderr)
exit(1)
exit(0)
if __name__ == '__main__':
main()