Skip to content

Commit

Permalink
(feat) Download from S3 on start
Browse files Browse the repository at this point in the history
  • Loading branch information
michalc committed Jul 25, 2019
1 parent bc7bfe6 commit 69c5e27
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 9 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ repos:
- --disable=too-many-locals
- --disable=too-many-public-methods
- --disable=too-many-statements
- --disable=try-except-raise
- --include-naming-hint=yes
- --max-args=10
- --max-line-length=99
Expand Down
2 changes: 0 additions & 2 deletions Dockerfile-minio
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ RUN \
apk add --no-cache \
openssl

RUN \
mkdir -p /test-data/my-bucket
COPY minio-entrypoint.sh /

ENTRYPOINT ["/minio-entrypoint.sh"]
Expand Down
13 changes: 12 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "3"
version: "3.7"
services:
test:
build:
Expand All @@ -9,6 +9,10 @@ services:
environment:
- AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
- AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
volumes:
- type: volume
source: minio-buckets
target: /test-data
minio:
build:
context: .
Expand All @@ -19,3 +23,10 @@ services:
- MINIO_REGION=us-east-1
ports:
- "9000:9000"
volumes:
- type: volume
source: minio-buckets
target: /test-data

volumes:
minio-buckets:
1 change: 1 addition & 0 deletions minio-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
set -e

echo $MINIO_ACCESS_KEY
mkdir -p /test-data/my-bucket

mkdir -p /root/.minio/certs
openssl req -new -newkey rsa:2048 -days 3650 -nodes -x509 -subj /CN=selfsigned \
Expand Down
72 changes: 72 additions & 0 deletions mobius3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from weakref import (
WeakValueDictionary,
)
from xml.etree import (
ElementTree as ET,
)

from aiodnsresolver import (
Resolver,
Expand Down Expand Up @@ -187,6 +190,7 @@ async def start():
asyncio.create_task(process_jobs())
for i in range(0, concurrent_uploads)
]
await download()
start_inotify()

def start_inotify():
Expand Down Expand Up @@ -439,6 +443,74 @@ async def locked_request(method, path, headers=(), body=empty_async_iterator):
if code not in [b'200', b'204']:
raise Exception(code, body_bytes)

async def download():
try:
async for path in list_keys_relative_to_prefix():
code, _, body = await signed_request(b'GET', bucket + prefix + path)
if code != b'200':
continue

try:
os.makedirs(directory / PurePosixPath(path).parent)
except FileExistsError:
pass
with open(directory / path, 'wb') as file:
async for chunk in body:
file.write(chunk)

except asyncio.CancelledError:
raise
except Exception:
logger.exception('Exception downloading original files')

async def list_keys_relative_to_prefix():
async def _list(extra_query_items=()):
query = (
('max-keys', '1000'),
('list-type', '2'),
('prefix', prefix),
) + extra_query_items
code, _, body = await signed_request(b'GET', bucket, params=query)
body_bytes = await buffered(body)
if code != b'200':
raise Exception(code, body_bytes)

namespace = '{http://s3.amazonaws.com/doc/2006-03-01/}'
root = ET.fromstring(body_bytes)
next_token = ''
keys_relative = []
for element in root:
if element.tag == f'{namespace}Contents':
key = first_child_text(element, f'{namespace}Key')
key_relative = key[len(prefix):]
keys_relative.append(key_relative)

if element.tag == f'{namespace}NextContinuationToken':
next_token = element.text

return (next_token, keys_relative)

async def list_first_page():
return await _list()

async def list_later_page(token):
return await _list((('continuation-token', token),))

def first_child_text(element, tag):
for child in element:
if child.tag == tag:
return child.text
return None

token, keys_page = await list_first_page()
for key in keys_page:
yield key

while token:
token, keys_page = await list_later_page(token)
for key in keys_page:
yield key

parent_locals = locals()

return start, stop
Expand Down
Loading

0 comments on commit 69c5e27

Please sign in to comment.