-
Notifications
You must be signed in to change notification settings - Fork 2
/
kudu.py
74 lines (61 loc) · 3.09 KB
/
kudu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import os.path
from requests import Session, RequestException
from xml.etree import ElementTree
from isodate import parse_datetime
class KuduSession(object):
def __init__(self, publishsettingspath):
try:
profile = ElementTree.parse(publishsettingspath)
data = profile.getroot().find('./publishProfile[@publishMethod="MSDeploy"]')
except OSError:
print("Unable to read the profile at {}.".format(publishsettingspath), file=sys.stderr)
print("Please restart deployment and provide the path to your publishing profile.", file=sys.stderr)
raise
self.session = Session()
self.session.auth = data.get('userName'), data.get('userPWD')
self.session.headers['If-Match'] = '*' # allow replacing files on upload
self.api_url = 'https://{}/api/'.format(data.get('publishUrl'))
def _request(self, operation_function, kudupath, **kwarg):
r = operation_function(self.api_url+kudupath, **kwarg)
r.raise_for_status()
return r
def get(self, kudupath, **kwarg):
return self._request(self.session.get, kudupath, **kwarg)
def head(self, kudupath, **kwarg):
return self._request(self.session.head, kudupath, **kwarg)
def delete(self, kudupath, **kwarg):
return self._request(self.session.delete, kudupath, **kwarg)
def put(self, kudupath, **kwarg):
return self._request(self.session.put, kudupath, **kwarg)
def post(self, kudupath, **kwarg):
return self._request(self.session.post, kudupath, **kwarg)
def command(self, command, workingdir=r'D:\home\site\wwwroot'):
'''Execute a command.
:param str command: The command to execute
:param str workingdir: The current working dir while executing the command. Default wwwroot
:return: The json
'''
cmd = {
'command': command,
'dir': workingdir
}
return self.post('command', json=cmd)
def get_deployed_files(self, vfs_basepath='site/wwwroot/'):
"""Return a list of deployed files in this Webapp as a generator.
This method also:
- Parse mtime and crtime as datetime
- Add a urlpath key with path fragment for future URL callable
- Keys are normalized using normcase for future comparison with the system.
"""
# Kudu accepts a // ended path. Let's keep thing simply and always append an ending slash.
kudu_folderpath = 'vfs/{}/'.format(vfs_basepath)
for element in self.get(kudu_folderpath).json():
if element['mime'] == "inode/directory":
for subelement, submeta in self.get_deployed_files(vfs_basepath+element['name']+'/'):
submeta['urlpath'] = "/".join([element['name'], submeta['urlpath']])
yield os.path.normcase(submeta['urlpath']), submeta
else:
for key in ["mtime", "crtime"]:
element[key] = parse_datetime(element[key])
element['urlpath'] = element['name']
yield os.path.normcase(element['name']), element