-
Notifications
You must be signed in to change notification settings - Fork 119
/
__init__.py
192 lines (149 loc) · 7.32 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""
Provides public API.
"""
import os
import inspect
import contextlib
from distutils.version import LooseVersion
from . import managers, exceptions, engines, utilities, resources
from .version import __version__
class Redmine(object):
"""
Entry point for all requests.
"""
def __init__(self, url, **kwargs):
"""
:param string url: (required). Redmine location.
:param string key: (optional). API key used for authentication.
:param string version: (optional). Redmine version.
:param string username: (optional). Username used for authentication.
:param string password: (optional). Password used for authentication.
:param dict requests: (optional). Connection options.
:param string impersonate: (optional). Username to impersonate.
:param string date_format: (optional). Formatting directives for date format.
:param string datetime_format: (optional). Formatting directives for datetime format.
:param raise_attr_exception: (optional). Control over resource attribute access exception raising.
:type raise_attr_exception: bool or tuple
:param resource_paths: (optional). Paths to modules which contain additional resources.
:type resource_paths: list or tuple
:param cls engine: (optional). Engine that will be used to make requests to Redmine.
"""
self.url = url.rstrip('/')
self.ver = kwargs.get('version', None)
self.date_format = kwargs.get('date_format', '%Y-%m-%d')
self.datetime_format = kwargs.get('datetime_format', '%Y-%m-%dT%H:%M:%SZ')
self.raise_attr_exception = kwargs.get('raise_attr_exception', True)
self.resource_paths = kwargs.get('resource_paths', ())
engine = kwargs.get('engine', engines.DefaultEngine)
if not inspect.isclass(engine) or not issubclass(engine, engines.BaseEngine):
raise exceptions.EngineClassError
self.engine = engine(**kwargs)
def __getattr__(self, resource_name):
"""
Returns either ResourceSet or Resource object depending on the method used on the ResourceManager.
:param string resource_name: (required). Resource name.
"""
if resource_name.startswith('_'):
raise AttributeError
return managers.ResourceManager(self, resource_name)
@contextlib.contextmanager
def session(self, **options):
"""
Initiates a temporary session with a copy of the current engine but with new options.
:param dict options: (optional). Engine's options for a session.
"""
engine = self.engine
self.engine = engine.__class__(
requests=utilities.merge_dicts(engine.requests, options.pop('requests', {})), **options)
yield self
self.engine = engine
def upload(self, filepath_obj):
"""
Uploads file from filepath to Redmine and returns an assigned token.
:param string filepath_obj: (required). Path to the file or filestream that will be uploaded.
"""
if self.ver is not None and LooseVersion(str(self.ver)) < LooseVersion('1.4.0'):
raise exceptions.VersionMismatchError('File uploading')
url = '{0}/uploads.json'.format(self.url)
headers = {'Content-Type': 'application/octet-stream'}
try:
from StringIO import StringIO
filetype = (file, StringIO)
except (NameError, ImportError):
from io import IOBase
filetype = IOBase
if isinstance(filepath_obj, filetype):
response = self.engine.request('post', url, data=filepath_obj, headers=headers)
else:
if not os.path.isfile(filepath_obj) or os.path.getsize(filepath_obj) == 0:
raise exceptions.NoFileError
with open(filepath_obj, 'rb') as stream:
response = self.engine.request('post', url, data=stream, headers=headers)
return response['upload']['token']
def download(self, url, savepath=None, filename=None, params=None):
"""
Downloads file from Redmine and saves it to savepath or returns a response directly
for maximum control over file processing.
:param string url: (required). URL of the file that will be downloaded.
:param string savepath: (optional). Path where to save the file.
:param string filename: (optional). Name that will be used for the file.
:param dict params: (optional). Params to send in the query string.
"""
response = self.engine.request('get', url, params=dict(params or {}, **{'stream': True}), return_raw=True)
# If a savepath wasn't provided we return a response directly
# so a user can have maximum control over response data
if savepath is None:
return response
try:
from urlparse import urlsplit
except ImportError:
from urllib.parse import urlsplit
if filename is None:
filename = urlsplit(url)[2].split('/')[-1]
if not filename:
raise exceptions.FileUrlError
savepath = os.path.join(savepath, filename)
with open(savepath, 'wb') as f:
for chunk in response.iter_content(1024):
f.write(chunk)
return savepath
def auth(self):
"""
Shortcut for the case if we just want to check if user provided valid auth credentials.
"""
return self.user.get('current')
def search(self, query, **options):
"""
Interface to Redmine Search API
:param string query: (required). What to search.
:param dict options: (optional). Dictionary of search options.
"""
if self.ver is not None and LooseVersion(str(self.ver)) < LooseVersion('3.0.0'):
raise exceptions.VersionMismatchError('Search functionality')
container_map, manager_map, results = {}, {}, {'unknown': {}}
for resource in options.pop('resources', []):
options[resource] = True
options['q'] = query
for name, details in resources.registry.items():
if details['class'].search_hints is not None:
for hint in details['class'].search_hints:
container_map[hint] = details['class'].container_many
manager_map[details['class'].container_many] = getattr(self, name)
raw_resources, _ = self.engine.bulk_request('get', '{0}/search.json'.format(self.url), 'results', **options)
for resource in raw_resources:
if resource['type'] in container_map:
container = container_map[resource['type']]
if container not in results:
results[container] = []
results[container].append(resource)
else:
if resource['type'] not in results['unknown']:
results['unknown'][resource['type']] = []
results['unknown'][resource['type']].append(resource)
del resource['type'] # all resources are already sorted by type so we don't need it
if not results['unknown']:
del results['unknown']
for container in results:
if container in manager_map:
results[container] = manager_map[container].to_resource_set(results[container])
return results or None