Skip to content

Commit

Permalink
Aws helper functions (#92)
Browse files Browse the repository at this point in the history
* migrated over helper functions

* better function name

* comment out validators test function

* whitespace change

* added todo re: lists
  • Loading branch information
ngreenwald committed Jun 6, 2020
1 parent 9aae72a commit 23d9e2a
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 61 deletions.
57 changes: 13 additions & 44 deletions caliban_toolbox/aws_functions.py
Expand Up @@ -67,55 +67,24 @@ def connect_aws():
return s3


def aws_upload_files(aws_folder, stage, upload_folder, pixel_only, label_only, rgb_mode):
def aws_upload_files(local_paths, aws_paths):
"""Uploads files to AWS bucket for use in Figure 8
Args:
aws_folder: folder where uploaded files will be stored
stage: specifies stage in pipeline for jobs requiring multiple rounds of annotation
upload_folder: path to folder containing files that will be uploaded
pixel_only: boolean flag to set pixel_only mode
label_only: boolean flag to set label_only mode
rgb_mode: boolean flag to set rgb_mode
Args:
local_paths: list of paths to npz files
aws_paths: list of paths for saving npz files in AWS
"""

s3 = connect_aws()

# load the images from specified folder but not the json log file
files_to_upload = list_npzs_folder(upload_folder)

filename_list = []

# change slashes separating nested folders to underscores for URL generation
subfolders = re.split('/', aws_folder)
subfolders = '__'.join(subfolders)

url_dict = {'pixel_only': pixel_only, 'label_only': label_only, 'rgb': rgb_mode}
url_encoded_dict = urlencode(url_dict)

# upload images
for img in files_to_upload:

# full path to image
img_path = os.path.join(upload_folder, img)

# destination path
img_key = os.path.join(aws_folder, stage, img)

# upload
s3.upload_file(img_path, 'caliban-input', img_key, Callback=ProgressPercentage(img_path),
ExtraArgs={'ACL': 'public-read', 'Metadata': {'source_path': img_path}})
for i in range(len(local_paths)):
s3.upload_file(local_paths[i], 'caliban-input', aws_paths[i],
Callback=ProgressPercentage(local_paths[i]),
ExtraArgs={'ACL': 'public-read',
'Metadata': {'source_path': local_paths[i]}})
print('\n')

url = 'https://caliban.deepcell.org/{}__{}__{}__' \
'{}__{}?{}'.format('caliban-input', 'caliban-output', subfolders, stage, img,
url_encoded_dict)

# add caliban url to list
filename_list.append(url)

return files_to_upload, filename_list


def aws_transfer_file(s3, input_bucket, output_bucket, key_src, key_dst):
"""Helper function to transfer files from one bucket/key to another. Used
Expand Down Expand Up @@ -144,12 +113,12 @@ def aws_download_files(upload_log, output_dir):
stage = upload_log['stage'][0]

# download all images
for img in files_to_download:
for file in files_to_download:

# full path to save image
save_path = os.path.join(output_dir, img)
local_path = os.path.join(output_dir, file)

# path to file in aws
img_path = os.path.join(aws_folder, stage, img)
aws_path = os.path.join(aws_folder, stage, file)

s3.download_file(Bucket='caliban-output', Key=img_path, Filename=save_path)
s3.download_file(Bucket='caliban-output', Key=aws_path, Filename=local_path)
133 changes: 116 additions & 17 deletions caliban_toolbox/figure_eight_functions.py
Expand Up @@ -29,10 +29,82 @@
import zipfile
import pandas as pd
import urllib
import re

from getpass import getpass
from urllib.parse import urlencode

from caliban_toolbox.log_file import create_upload_log
from caliban_toolbox.aws_functions import aws_upload_files, aws_download_files
from caliban_toolbox.utils.misc_utils import list_npzs_folder


def _format_url(aws_folder, stage, npz, url_encoded_dict):
base_url = 'https://caliban.deepcell.org/caliban-input__caliban-output__{}__{}__{}?{}'
formatted_url = base_url.format(aws_folder, stage, npz, url_encoded_dict)

return formatted_url


def _create_next_log_name(previous_log_name, stage):
stage_num = previous_log_name.split('_')[1]
new_log = 'stage_{}_{}_upload_log.csv'.format(stage_num + 1, stage)

return new_log


def get_latest_log_file(log_dir):
"""Find the latest log file in the log directory
Args:
log_dir: full path to log directory
Returns:
string: name of the latest log file
"""
files = os.listdir(log_dir)
log_files = [file for file in files if 'upload_log.csv' in file]
log_files.sort()

return log_files[-1]


def create_job_urls(crop_dir, aws_folder, stage, pixel_only, label_only, rgb_mode):
"""Helper function to create relevant URLs for caliban log and AWS upload
Args:
crop_dir: full path to directory with the npz crops
aws_folder: path for images to be stored in AWS
stage: which stage of the correction process this job is for
pixel_only: boolean flag to determine if only pixel mode is available
label_only: boolean flag to determine if only label is available
rgb_mode: boolean flag to determine if rgb mode will be enabled
Returns:
list: list of paths to local NPZs to be uploaded
list: list of paths to desintation for NPZs
list: list of URLs to supply to figure8 to to display crops
list: list of NPZs that will be uploaded
Raises:
ValueError: If URLs are not valid
"""
# TODO: check that URLS don't contain invalid character
# load the images from specified folder but not the json log file
npzs_to_upload = list_npzs_folder(crop_dir)

# change slashes separating nested folders to underscores for URL generation
subfolders = re.split('/', aws_folder)
subfolders = '__'.join(subfolders)

# create dictionary to hold boolean flags
url_dict = {'pixel_only': pixel_only, 'label_only': label_only, 'rgb': rgb_mode}
url_encoded_dict = urlencode(url_dict)

# create path to npz, key to upload npz, and url path for figure8
npz_paths, npz_keys, url_paths = [], [], []
for npz in npzs_to_upload:
npz_paths.append(os.path.join(crop_dir, npz))
npz_keys.append(os.path.join(aws_folder, stage, npz))
url_paths.append(_format_url(subfolders, stage, npz, url_encoded_dict))

# TODO: think about better way to structure than than many lists
return npz_paths, npz_keys, url_paths, npzs_to_upload


def copy_job(job_id, key):
Expand All @@ -57,11 +129,11 @@ def copy_job(job_id, key):
return new_job_id


def upload_data(csv_path, job_id, key):
"""Add data to an existing Figure 8 job by uploading a CSV file
def upload_log_file(log_file, job_id, key):
"""Upload log file to populate a job for Figure8
Args:
csv_path: full path to csv
log_file: file specifying paths to NPZs included in this job
job_id: ID number of job to upload data to
key: API key to access Figure 8 account
"""
Expand All @@ -76,8 +148,8 @@ def upload_data(csv_path, job_id, key):
csv_data = csv_file.read()

headers = {"Content-Type": "text/csv"}
add_data = requests.put(url, data=log_file, headers=headers)

add_data = requests.put(url, data=csv_data, headers=headers)
if add_data.status_code != 200:
print("Upload_data not successful. Status code: ", add_data.status_code)
else:
Expand All @@ -87,16 +159,33 @@ def upload_data(csv_path, job_id, key):
def create_figure_eight_job(base_dir, job_id_to_copy, aws_folder, stage,
rgb_mode=False, label_only=False, pixel_only=False):
"""Create a Figure 8 job and upload data to it. New job ID printed out for convenience.
Args:
base_dir: full path to directory that contains CSV files
base_dir: full path to job directory
job_id_to_copy: ID number of Figure 8 job to use as template for new job
aws_folder: folder in aws bucket where files be stored
stage: specifies stage in pipeline for jobs requiring multiple rounds of annotation
pixel_only: flag specifying whether annotators will be restricted to pixel edit mode
label_only: flag specifying whether annotators will be restricted to label edit mode
rgb_mode: flag specifying whether annotators will view images in RGB mode
Raises:
ValueError: If invalid base_dir supplied
ValueError: If no crop directory found within base_dir
ValueError: If no NPZs found in crop directory
"""

if not os.path.isdir(base_dir):
raise ValueError('Invalid directory name')

upload_folder = os.path.join(base_dir, 'crop_dir')

if not os.path.isdir(upload_folder):
raise ValueError('No crop directory found within base directory')

if len(list_npzs_folder(upload_folder)) == 0:
raise ValueError('No NPZs found in crop dir')

key = str(getpass("Figure eight api key? "))

# copy job without data
Expand All @@ -105,19 +194,29 @@ def create_figure_eight_job(base_dir, job_id_to_copy, aws_folder, stage,
return
print('New job ID is: ' + str(new_job_id))

# get relevant paths
npz_paths, npz_keys, url_paths, npzs = create_job_urls(crop_dir=upload_folder,
aws_folder=aws_folder,
stage=stage, pixel_only=pixel_only,
label_only=label_only,
rgb_mode=rgb_mode)

# upload files to AWS bucket
upload_folder = os.path.join(base_dir, 'crop_dir')
filenames, filepaths = aws_upload_files(aws_folder=aws_folder, stage=stage,
upload_folder=upload_folder, pixel_only=pixel_only,
rgb_mode=rgb_mode, label_only=label_only)
aws_upload_files(local_paths=npz_paths, aws_paths=npz_keys)

log_name = 'stage_0_{}_upload_log.csv'.format(stage)

# Generate log file for current job
create_upload_log(base_dir=base_dir, stage=stage, aws_folder=aws_folder,
filenames=filenames, filepaths=filepaths, job_id=new_job_id,
pixel_only=pixel_only, rgb_mode=rgb_mode, label_only=label_only)
filenames=npzs, filepaths=url_paths, job_id=new_job_id,
pixel_only=pixel_only, rgb_mode=rgb_mode, label_only=label_only,
log_name=log_name)

# upload NPZs using log file
upload_data(os.path.join(base_dir, 'logs/stage_0_upload_log.csv'), new_job_id, key)
log_path = open(os.path.join(base_dir, 'logs', log_name), 'r')
log_file = log_path.read()

# upload log file
upload_log_file(log_file, new_job_id, key)


def download_report(job_id, log_dir):
Expand Down Expand Up @@ -179,8 +278,9 @@ def download_figure_eight_output(base_dir):
"""

# get information from job creation
# TODO: check for latest stage job report and use that one
log_file = pd.read_csv(os.path.join(base_dir, 'logs/stage_0_upload_log.csv'))
log_dir = os.path.join(base_dir, 'logs')
latest_log = get_latest_log_file(log_dir)
log_file = pd.read_csv(os.path.join(log_dir, latest_log))
job_id = log_file['job_id'][0]

# download Figure 8 report
Expand All @@ -193,5 +293,4 @@ def download_figure_eight_output(base_dir):
if not os.path.isdir(output_dir):
os.makedirs(output_dir)

upload_log = pd.read_csv(os.path.join(base_dir, 'logs/stage_0_upload_log.csv'))
aws_download_files(upload_log, output_dir)
aws_download_files(log_file, output_dir)
88 changes: 88 additions & 0 deletions caliban_toolbox/figure_eight_functions_test.py
@@ -0,0 +1,88 @@
# Copyright 2016-2020 The Van Valen Lab at the California Institute of
# Technology (Caltech), with support from the Paul Allen Family Foundation,
# Google, & National Institutes of Health (NIH) under Grant U24CA224309-01.
# All rights reserved.
#
# Licensed under a modified Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.github.com/vanvalenlab/caliban-toolbox/LICENSE
#
# The Work provided may be used for non-commercial academic purposes only.
# For any other use of the Work, including commercial use, please contact:
# vanvalenlab@gmail.com
#
# Neither the name of Caltech nor the names of its contributors may be used
# to endorse or promote products derived from this software without specific
# prior written permission.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import tempfile
import os
import pytest

from pathlib import Path

from caliban_toolbox import figure_eight_functions


def test_get_latest_log_file():
with tempfile.TemporaryDirectory() as temp_dir:
upload_logs = ['stage_0_upload_log.csv', 'stage_3_upload_log.csv',
'stage_8_upload_log.csv']

for log in upload_logs:
Path(os.path.join(temp_dir, log)).touch()

latest_file = figure_eight_functions.get_latest_log_file(temp_dir)
assert latest_file == 'stage_8_upload_log.csv'


def test_create_job_urls():
with tempfile.TemporaryDirectory() as temp_dir:
npz_files = ['test_0.npz', 'test_1.npz', 'test_2.npz']

for npz in npz_files:
Path(os.path.join(temp_dir, npz)).touch()

aws_folder = 'aws_main_folder/aws_sub_folder'
stage = 'test_stage'
pixel_only, label_only, rgb_mode = True, False, True

output_lists = figure_eight_functions.create_job_urls(crop_dir=temp_dir,
aws_folder=aws_folder,
stage=stage,
pixel_only=pixel_only,
label_only=label_only,
rgb_mode=rgb_mode)

npz_paths, npz_keys, url_paths, npzs_to_upload = output_lists

assert len(npz_paths) == len(npz_keys) == len(url_paths) == len(npzs_to_upload) == 3

# TODO: Figure out how we're going to validate inputs
# with tempfile.TemporaryDirectory() as temp_dir:
# # NPZ name with spaces leads to bad url
# npz_files = ['bad file name.npz']
#
# for npz in npz_files:
# Path(os.path.join(temp_dir, npz)).touch()
#
# aws_folder = 'aws_main_folder/aws_sub_folder'
# stage = 'test_stage'
# pixel_only, label_only, rgb_mode = True, False, True
#
# with pytest.raises(ValueError):
#
# output_lists = figure_eight_functions.create_job_urls(crop_dir=temp_dir,
# aws_folder=aws_folder,
# stage=stage,
# pixel_only=pixel_only,
# label_only=label_only,
# rgb_mode=rgb_mode)

0 comments on commit 23d9e2a

Please sign in to comment.