Skip to content
This repository
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 137 lines (112 sloc) 3.335 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
## Amazon S3 manager
## Author: Michal Ludvig <michal@logix.cz>
## http://www.logix.cz/michal
## License: GPL Version 2

import re
import sys
from BidirMap import BidirMap

class S3Uri(object):
type = None
_subclasses = None

def __new__(self, string):
if not self._subclasses:
## Generate a list of all subclasses of S3Uri
self._subclasses = []
dict = sys.modules[__name__].__dict__
for something in dict:
if type(dict[something]) is not type(self):
continue
if issubclass(dict[something], self) and dict[something] != self:
self._subclasses.append(dict[something])
for subclass in self._subclasses:
try:
instance = object.__new__(subclass)
instance.__init__(string)
return instance
except ValueError, e:
continue
raise ValueError("%s: not a recognized URI" % string)

def __str__(self):
return self.uri()

def public_url(self):
raise ValueError("This S3 URI does not have Anonymous URL representation")

class S3UriS3(S3Uri):
type = "s3"
_re = re.compile("^s3://([^/]+)/?(.*)", re.IGNORECASE)
def __init__(self, string):
match = self._re.match(string)
if not match:
raise ValueError("%s: not a S3 URI" % string)
groups = match.groups()
self._bucket = groups[0]
self._object = groups[1]

def bucket(self):
return self._bucket

def object(self):
return self._object

def has_bucket(self):
return bool(self._bucket)

def has_object(self):
return bool(self._object)

def uri(self):
return "/".join(["s3:/", self._bucket, self._object])

def public_url(self):
return "http://s3.amazonaws.com/%s/%s" % (self._bucket, self._object)

@staticmethod
def compose_uri(bucket, object = ""):
return "s3://%s/%s" % (bucket, object)

class S3UriS3FS(S3Uri):
type = "s3fs"
_re = re.compile("^s3fs://([^/]*)/?(.*)", re.IGNORECASE)
def __init__(self, string):
match = self._re.match(string)
if not match:
raise ValueError("%s: not a S3fs URI" % string)
groups = match.groups()
self._fsname = groups[0]
self._path = groups[1].split("/")

def fsname(self):
return self._fsname

def path(self):
return "/".join(self._path)

def uri(self):
return "/".join(["s3fs:/", self._fsname, self.path()])

class S3UriFile(S3Uri):
type = "file"
_re = re.compile("^(\w+://)?(.*)")
def __init__(self, string):
match = self._re.match(string)
groups = match.groups()
if groups[0] not in (None, "file://"):
raise ValueError("%s: not a file:// URI" % string)
self._path = groups[1].split("/")

def path(self):
return "/".join(self._path)

def uri(self):
return "/".join(["file:/", self.path()])

if __name__ == "__main__":
uri = S3Uri("s3://bucket/object")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "bucket =", uri.bucket()
print "object =", uri.object()
print

uri = S3Uri("s3://bucket")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "bucket =", uri.bucket()
print

uri = S3Uri("s3fs://filesystem1/path/to/remote/file.txt")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "path =", uri.path()
print

uri = S3Uri("/path/to/local/file.txt")
print "type() =", type(uri)
print "uri =", uri
print "uri.type=", uri.type
print "path =", uri.path()
print
Something went wrong with that request. Please try again.