This repository has been archived by the owner on Nov 4, 2018. It is now read-only.
forked from s3tools/s3cmd
/
S3Uri.py
136 lines (112 loc) · 3.26 KB
/
S3Uri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
## Amazon S3 manager
## Author: Michal Ludvig <michal@logix.cz>
## http://www.logix.cz/michal
## License: GPL Version 2
import re
import sys
from BidirMap import BidirMap
class S3Uri(object):
type = None
_subclasses = None
def __new__(self, string):
if not self._subclasses:
## Generate a list of all subclasses of S3Uri
self._subclasses = []
dict = sys.modules[__name__].__dict__
for something in dict:
if type(dict[something]) is not type(self):
continue
if issubclass(dict[something], self) and dict[something] != self:
self._subclasses.append(dict[something])
for subclass in self._subclasses:
try:
instance = object.__new__(subclass)
instance.__init__(string)
return instance
except ValueError, e:
continue
raise ValueError("%s: not a recognized URI" % string)
def __str__(self):
return self.uri()
def public_url(self):
raise ValueError("This S3 URI does not have Anonymous URL representation")
class S3UriS3(S3Uri):
type = "s3"
_re = re.compile("^s3://([^/]+)/?(.*)", re.IGNORECASE)
def __init__(self, string):
match = self._re.match(string)
if not match:
raise ValueError("%s: not a S3 URI" % string)
groups = match.groups()
self._bucket = groups[0]
self._object = groups[1]
def bucket(self):
return self._bucket
def object(self):
return self._object
def has_bucket(self):
return bool(self._bucket)
def has_object(self):
return bool(self._object)
def uri(self):
return "/".join(["s3:/", self._bucket, self._object])
def public_url(self):
return "http://s3.amazonaws.com/%s/%s" % (self._bucket, self._object)
@staticmethod
def compose_uri(bucket, object = ""):
return "s3://%s/%s" % (bucket, object)
class S3UriS3FS(S3Uri):
type = "s3fs"
_re = re.compile("^s3fs://([^/]*)/?(.*)", re.IGNORECASE)
def __init__(self, string):
match = self._re.match(string)
if not match:
raise ValueError("%s: not a S3fs URI" % string)
groups = match.groups()
self._fsname = groups[0]
self._path = groups[1].split("/")
def fsname(self):
return self._fsname
def path(self):
return "/".join(self._path)
def uri(self):
return "/".join(["s3fs:/", self._fsname, self.path()])
class S3UriFile(S3Uri):
type = "file"
_re = re.compile("^(\w+://)?(.*)")
def __init__(self, string):
match = self._re.match(string)
groups = match.groups()
if groups[0] not in (None, "file://"):
raise ValueError("%s: not a file:// URI" % string)
self._path = groups[1].split("/")
def path(self):
return "/".join(self._path)
def uri(self):
return "/".join(["file:/", self.path()])
if __name__ == "__main__":
uri = S3Uri("s3://bucket/object")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "bucket =", uri.bucket()
print "object =", uri.object()
print
uri = S3Uri("s3://bucket")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "bucket =", uri.bucket()
print
uri = S3Uri("s3fs://filesystem1/path/to/remote/file.txt")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "path =", uri.path()
print
uri = S3Uri("/path/to/local/file.txt")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "path =", uri.path()
print