335 changes: 318 additions & 17 deletions sonos/__init__.py

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions sonos/examples/sonos.yaml
Expand Up @@ -272,3 +272,34 @@ MySonos:
type: str
sonos_recv: uid

play_tts:
type: str
sonos_send: play_tts
enforce_updates: True

tts_language:
type: str
sonos_attrib: tts_language

tts_volume:
type: num
sonos_attrib: tts_volume

tts_fade_in:
type: bool
sonos_attrib: tts_fade_in

play_snippet:
type: str
sonos_send: play_snippet
enforce_updates: True

snippet_volume:
type: num
value: 25
sonos_attrib: snippet_volume

snippet_fade_in:
type: bool
value: True
sonos_attrib: snippet_fade_in
1 change: 1 addition & 0 deletions sonos/requirements.txt
@@ -1,3 +1,4 @@
requests>=2.9.1;python_version>'3.2'
requests==2.5.1;python_version=='3.2'
xmltodict>=0.11.0
tinytag>=0.18.0
246 changes: 246 additions & 0 deletions sonos/tts.py
@@ -0,0 +1,246 @@
# -*- coding: utf-8 -*-
import calendar
import math
import time
import requests
import re

class Token:
""" Token (Google Translate Token)
Generate the current token key and allows generation of tokens (tk) with it
Python version of `token-script.js` itself from translate.google.com
"""

SALT_1 = "+-a^+6"
SALT_2 = "+-3^+b+-f"

def __init__(self):
self.token_key = None

def calculate_token(self, text, seed=None):
""" Calculate the request token (`tk`) of a string
:param text: str The text to calculate a token for
:param seed: str The seed to use. By default this is the number of hours since epoch
"""

if seed is None:
seed = self._get_token_key()

[first_seed, second_seed] = seed.split(".")

try:
d = bytearray(text.encode('UTF-8'))
except UnicodeDecodeError:
# This will probably only occur when d is actually a str containing UTF-8 chars, which means we don't need
# to encode.
d = bytearray(text)

a = int(first_seed)
for value in d:
a += value
a = self._work_token(a, self.SALT_1)
a = self._work_token(a, self.SALT_2)
a ^= int(second_seed)
if 0 > a:
a = (a & 2147483647) + 2147483648
a %= 1E6
a = int(a)
return str(a) + "." + str(a ^ int(first_seed))

def _get_token_key(self):
if self.token_key is not None:
return self.token_key

timestamp = calendar.timegm(time.gmtime())
hours = int(math.floor(timestamp / 3600))

response = requests.get("https://translate.google.com/")
line = response.text.split('\n')[-1]

tkk_expr = re.search(".*?(TKK=.*?;)W.*?", line).group(1)
a = re.search("a\\\\x3d(-?\d+);", tkk_expr).group(1)
b = re.search("b\\\\x3d(-?\d+);", tkk_expr).group(1)

result = str(hours) + "." + str(int(a) + int(b))
self.token_key = result
return result

""" Functions used by the token calculation algorithm """

def _rshift(self, val, n):
return val >> n if val >= 0 else (val + 0x100000000) >> n

def _work_token(self, a, seed):
for i in range(0, len(seed) - 2, 3):
char = seed[i + 2]
d = ord(char[0]) - 87 if char >= "a" else int(char)
d = self._rshift(a, d) if seed[i + 1] == "+" else a << d
a = a + d & 4294967295 if seed[i] == "+" else a ^ d
return a


class gTTS:
""" gTTS (Google Text to Speech): an interface to Google's Text to Speech API """

GOOGLE_TTS_URL = 'https://translate.google.com/translate_tts'
MAX_CHARS = 100 # Max characters the Google TTS API takes at a time
LANGUAGES = {
'af': 'Afrikaans',
'sq': 'Albanian',
'ar': 'Arabic',
'hy': 'Armenian',
'ca': 'Catalan',
'zh': 'Chinese',
'zh-cn': 'Chinese (Mandarin/China)',
'zh-tw': 'Chinese (Mandarin/Taiwan)',
'zh-yue': 'Chinese (Cantonese)',
'hr': 'Croatian',
'cs': 'Czech',
'da': 'Danish',
'nl': 'Dutch',
'en': 'English',
'en-au': 'English (Australia)',
'en-uk': 'English (United Kingdom)',
'en-us': 'English (United States)',
'eo': 'Esperanto',
'fi': 'Finnish',
'fr': 'French',
'de': 'German',
'el': 'Greek',
'ht': 'Haitian Creole',
'hi': 'Hindi',
'hu': 'Hungarian',
'is': 'Icelandic',
'id': 'Indonesian',
'it': 'Italian',
'ja': 'Japanese',
'ko': 'Korean',
'la': 'Latin',
'lv': 'Latvian',
'mk': 'Macedonian',
'no': 'Norwegian',
'pl': 'Polish',
'pt': 'Portuguese',
'pt-br': 'Portuguese (Brazil)',
'ro': 'Romanian',
'ru': 'Russian',
'sr': 'Serbian',
'sk': 'Slovak',
'es': 'Spanish',
'es-es': 'Spanish (Spain)',
'es-us': 'Spanish (United States)',
'sw': 'Swahili',
'sv': 'Swedish',
'ta': 'Tamil',
'th': 'Thai',
'tr': 'Turkish',
'vi': 'Vietnamese',
'cy': 'Welsh'
}

def __init__(self, text, logger, lang='en'):

self._logger = logger
if lang.lower() not in self.LANGUAGES:
raise Exception('Language not supported: %s' % lang)
else:
self.lang = lang.lower()

if not text:
raise Exception('No text to speak')
else:
self.text = text

# Split text in parts
if len(text) <= self.MAX_CHARS:
text_parts = [text]
else:
text_parts = self._tokenize(text, self.MAX_CHARS)

# Clean

def strip(x):
return x.replace('\n', '').strip()

text_parts = [strip(x) for x in text_parts]
text_parts = [x for x in text_parts if len(x) > 0]
self.text_parts = text_parts

# Google Translate token
self.token = Token()

def save(self, savefile):
""" Do the Web request and save to `savefile` """
with open(savefile, 'wb') as f:
self._write_to_fp(f)
f.close()

def stream_url(self):
req = self._prepare_request()
params = req.params
prep_req = req.prepare()
prep_req.prepare_url(req.url, params)
return prep_req.url

def _prepare_request(self):
""" Do the Web request and save to a file-like object """
for idx, part in enumerate(self.text_parts):
payload = {'ie': 'UTF-8',
'q': part,
'tl': self.lang,
'total': len(self.text_parts),
'idx': idx,
'client': 'tw-ob',
'textlen': len(part),
'tk': self.token.calculate_token(part)}
headers = {
"Referer": "http://translate.google.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/47.0.2526.106 Safari/537.36"
}

self._logger.debug("Sonos: GoogleTTS: headers parameter: {param}".format(param=headers))
self._logger.debug("Sonos: GoogleTTS: request parameter: {param}".format(param=payload))
return requests.Request(method='GET', url=self.GOOGLE_TTS_URL, headers=headers, params=payload)

def _write_to_fp(self, fp):
try:
prepared_request = self._prepare_request().prepare()
s = requests.Session()
r = s.send(prepared_request)
self._logger.debug("Sonos: GoogleTTS: Headers: {}".format(r.request.headers))
self._logger.debug("Sonos: GoogleTTS: Reponse: {}, Redirects: {}".format(r.status_code, r.history))

r.raise_for_status()
for chunk in r.iter_content(chunk_size=1024):
fp.write(chunk)
except Exception as err:
self._logger.error("Sonos: {err}".format(err=err))
raise err

def _tokenize(self, text, max_size):
""" Tokenizer on basic roman punctuation """

punc = "¡!()[]¿?.,;:—«»\n"
punc_list = [re.escape(c) for c in punc]
pattern = '|'.join(punc_list)
parts = re.split(pattern, text)

min_parts = []
for p in parts:
min_parts += self._minimize(p, " ", max_size)
return min_parts

def _minimize(self, thestring, delim, max_size):
""" Recursive function that splits `thestring` in chunks
of maximum `max_size` chars delimited by `delim`. Returns list. """

if len(thestring) > max_size:
idx = thestring.rfind(delim, 0, max_size)
return [thestring[:idx]] + self._minimize(thestring[idx:], delim, max_size)
else:
return [thestring]


if __name__ == "__main__":
pass
57 changes: 57 additions & 0 deletions sonos/utils.py
@@ -0,0 +1,57 @@
import hashlib
import os
import socket
import re

def is_valid_port(port):
valid_port = re.compile(
"^(102[4-9]|10[3-9]\d|1[1-9]\d{2}|[2-9]\d{3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$")
if valid_port.match(port):
return True
return False

def is_open_port(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("127.0.0.1", port))
except socket.error:
return False
return True

def get_local_ip_address():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("10.10.10.10", 80))
return s.getsockname()[0]


def file_size(size):
_suffixes = ['bytes', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']
# determine binary order in steps of size 10
# (coerce to int, // still returns a float)
from math import log2
order = int(log2(size) / 10) if size else 0
# format file size
# (.4g results in rounded numbers for exact matches and max 3 decimals,
# should never resort to exponent values)
return '{:.4g} {}'.format(size / (1 << (order * 10)), _suffixes[order])


def get_free_diskspace(folder):
statvfs = os.statvfs(folder)
return statvfs.f_frsize * statvfs.f_bfree


def get_folder_size(folder):
total_size = 0
for dir_path, dir_names, file_names in os.walk(folder):
for f in file_names:
fp = os.path.join(dir_path, f)
total_size += os.path.getsize(fp)
return total_size


def get_tts_local_file_path(local_directory, tts_string, tts_language):
m = hashlib.md5()
m.update('{}_{}'.format(tts_language, tts_string).encode('utf-8'))
file_name = '{}.mp3'.format(m.hexdigest())
return os.path.join(local_directory, file_name)