-
Notifications
You must be signed in to change notification settings - Fork 10
/
dogma.py
250 lines (218 loc) · 9.66 KB
/
dogma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Copyright 2021 LINE Corporation
#
# LINE Corporation licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from typing import List, Optional, TypeVar, Generic, Callable
from centraldogma.base_client import BaseClient
from centraldogma.content_service import ContentService
from centraldogma.data import (
Change,
ChangeType,
Commit,
Content,
Project,
PushResult,
Repository,
)
from centraldogma.data.entry import Entry
from centraldogma.data.revision import Revision
from centraldogma.project_service import ProjectService
from centraldogma.query import Query
from centraldogma.repository_service import RepositoryService
from centraldogma.watcher import Watcher
T = TypeVar("T")
U = TypeVar("U")
_DEFAULT_WATCH_TIMEOUT_MILLIS = 1 * 60 * 1000 # 1 minute
class Dogma:
DEFAULT_BASE_URL = "http://localhost:36462"
DEFAULT_TOKEN = "anonymous"
def __init__(self, base_url: str = None, token: str = None, **configs):
"""A Central Dogma API client using requests.
: param base_url: a base URL indicating Central Dogma server such as domain.
: param token: a token for authorization.
: param configs: (optional) configurations for an HTTP client.
For example, cert and timeout can be applied by using it.
"""
if base_url is None:
env_host = os.getenv("CENTRAL_DOGMA_HOST")
base_url = env_host if env_host else self.DEFAULT_BASE_URL
if token is None:
env_token = os.getenv("CENTRAL_DOGMA_TOKEN")
token = env_token if env_token else self.DEFAULT_TOKEN
self.base_client = BaseClient(base_url, token, **configs)
self.project_service = ProjectService(self.base_client)
self.repository_service = RepositoryService(self.base_client)
self.content_service = ContentService(self.base_client)
def list_projects(self, removed: bool = False) -> List[Project]:
"""Lists all projects, in the order that they were created on the Central Dogma server."""
return self.project_service.list(removed)
def create_project(self, name: str) -> Project:
"""Creates a project. The creator of the project will become the owner of the project."""
return self.project_service.create(name)
def remove_project(self, name: str) -> bool:
"""Removes a project. Only the owner and an admin can remove the project."""
return self.project_service.remove(name)
def unremove_project(self, name: str) -> Project:
"""Unremoves a project which is removed before. Only an admin can unremove the project."""
return self.project_service.unremove(name)
def purge_project(self, name: str) -> bool:
"""Purges a project. Only the owner and an admin can purge the project removed before."""
return self.project_service.purge(name)
def list_repositories(
self, project_name: str, removed: bool = False
) -> List[Repository]:
"""Lists all repositories, in the order that they were created on the Central Dogma server."""
return self.repository_service.list(project_name, removed)
def create_repository(self, project_name: str, name: str) -> Repository:
"""Creates a repository. Only the owner and an admin can create."""
return self.repository_service.create(project_name, name)
def remove_repository(self, project_name: str, name: str) -> bool:
"""Removes a repository. Only the owner and an admin can remove."""
return self.repository_service.remove(project_name, name)
def unremove_repository(self, project_name: str, name: str) -> Repository:
"""Unremoves a repository. Only the owner and an admin can unremove."""
return self.repository_service.unremove(project_name, name)
def purge_repository(self, project_name: str, name: str) -> bool:
"""Purges a repository. Only the owner and an admin can purge a repository removed before."""
return self.repository_service.purge(project_name, name)
def normalize_repository_revision(
self, project_name: str, name: str, revision: int
) -> int:
"""Normalizes the revision into an absolute revision."""
return self.repository_service.normalize_revision(project_name, name, revision)
def list_files(
self,
project_name: str,
repo_name: str,
path_pattern: Optional[str] = None,
revision: Optional[int] = None,
) -> List[Content]:
"""Lists files. The user should have read permission at least.
:param path_pattern: A path pattern is a variant of glob as follows.
"/**" - find all files recursively
"*.json" - find all JSON files recursively
"/foo/*.json" - find all JSON files under the directory /foo
"/*/foo.txt" - find all files named foo.txt at the second depth level
"*.json,/bar/*.txt" - use comma to match any patterns
This will bring all of the files in the repository, if unspecified.
:param revision: The revision of the list to get. If not specified, gets the list of
the latest revision.
"""
return self.content_service.get_files(
project_name, repo_name, path_pattern, revision, include_content=False
)
def get_files(
self,
project_name: str,
repo_name: str,
path_pattern: Optional[str] = None,
revision: Optional[int] = None,
) -> List[Content]:
"""Gets files. The user should have read permission at least. The difference from
the API List files is that this includes the content of the files.
:param path_pattern: A path pattern is a variant of glob as follows.
"/**" - find all files recursively
"*.json" - find all JSON files recursively
"/foo/*.json" - find all JSON files under the directory /foo
"/*/foo.txt" - find all files named foo.txt at the second depth level
"*.json,/bar/*.txt" - use comma to match any patterns
This will bring all of the files in the repository, if unspecified.
:param revision: The revision of the list to get. If not specified, gets the list of
the latest revision.
"""
return self.content_service.get_files(
project_name, repo_name, path_pattern, revision, include_content=True
)
def get_file(
self,
project_name: str,
repo_name: str,
file_path: str,
revision: Optional[int] = None,
json_path: Optional[str] = None,
) -> Content:
"""Gets a file. The user should have read permission at least.
:param revision: The revision of the file to get. If not specified, gets the file of
the latest revision.
:param json_path: The JSON path expressions.
"""
return self.content_service.get_file(
project_name, repo_name, file_path, revision, json_path
)
def push(
self,
project_name: str,
repo_name: str,
commit: Commit,
changes: List[Change],
) -> PushResult:
"""Creates, replaces, renames or deletes files. The user should have write permission.
:param commit: A commit message for changes.
:param changes: Detailed changes including path, type and content.
If the type is REMOVE, the content should be empty. If the type is RENAME,
the content is supposed to be the new name.
"""
return self.content_service.push(project_name, repo_name, commit, changes)
def watch_repository(
self,
project_name: str,
repo_name: str,
last_known_revision: Revision,
path_pattern: str,
timeout_millis: int = _DEFAULT_WATCH_TIMEOUT_MILLIS,
) -> Optional[Revision]:
"""
TODO(ikhoon): TBU
"""
return self.content_service.watch_repository(
project_name, repo_name, last_known_revision, path_pattern, timeout_millis
)
def watch_file(
self,
project_name: str,
repo_name: str,
last_known_revision: Revision,
query: Query[T],
timeout_millis: int = _DEFAULT_WATCH_TIMEOUT_MILLIS,
) -> Optional[Entry[T]]:
"""
TODO(ikhoon): TBU
:rtype: object
"""
return self.content_service.watch_file(
project_name, repo_name, last_known_revision, query, timeout_millis
)
def repository_watcher(
self,
project_name: str,
repo_name: str,
path_pattern: str,
function: Callable[[Revision], T] = lambda x: x,
) -> Watcher[T]:
from centraldogma.repository_watcher import RepositoryWatcher
watcher = RepositoryWatcher(
self, project_name, repo_name, path_pattern, function
)
watcher.start()
return watcher
def file_watcher(
self,
project_name: str,
repo_name: str,
query: Query[T],
function: Callable[[T], U] = lambda x: x,
) -> Watcher[U]:
from centraldogma.repository_watcher import FileWatcher
watcher = FileWatcher(self, project_name, repo_name, query, function)
watcher.start()
return watcher