Skip to content

Commit

Permalink
Merge pull request #28 from lilab-bcb/fastq_upload
Browse files Browse the repository at this point in the history
Improve FASTQ file uploading

Former-commit-id: 64ba4ac
  • Loading branch information
bli25 committed Aug 5, 2022
2 parents fd38ec5 + b045be5 commit 67cf9b5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 142 deletions.
4 changes: 2 additions & 2 deletions alto/utils/bcl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ def get_lanes(self) -> List[str]:
return res


def path_is_flowcell(path: str) -> bool:
def path_is_bcl(path: str) -> bool:
"""If path represents BCL files of one sequencing flowcell.
"""
return os.path.isdir(path) and os.path.exists(f'{path}/RunInfo.xml')
return os.path.exists(f'{path}/RunInfo.xml')


def transfer_flowcell(
Expand Down
75 changes: 30 additions & 45 deletions alto/utils/fastq_utils.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,44 @@
import glob, os
from alto.utils import run_command
from typing import Optional

def folder_is_fastq(
path: str,
prefix: Optional[str] = None,
) -> Optional[str]:
if (prefix is None) or (not os.path.isdir(path)):
return None

# Check folder permission.
if not os.access(path, os.X_OK):
raise PermissionError(f"Need execution access to folder '{path}'!")

fa_list = glob.glob(f"{path}/{prefix}_*.fastq.gz")
if len(fa_list) > 0:
# Check fastq file permission.
for f in fa_list:
if not os.access(f, os.R_OK):
raise PermissionError(f"Need read access to '{f}'!")

return os.path.abspath(f"{path}/{prefix}_*.fastq.gz")
elif os.path.isdir(f"{path}/{prefix}"):
# Check subfolder permission.
if not os.access(f"{path}/{prefix}", os.X_OK):
raise PermissionError(f"Need execution access to folder '{path}/{prefix}'!")

fa_list = glob.glob(f"{path}/{prefix}/{prefix}_*.fastq.gz")
if len(fa_list) > 0:
# Check fastq file permission.
for f in fa_list:
if not os.access(f, os.R_OK):
raise PermissionError(f"Need read access to '{f}'!")

return os.path.abspath(f"{path}/{prefix}/{prefix}_*.fastq.gz")
else:
return None
else:
return None
from typing import List, Optional

class sample_manager:
def __init__(self):
self.samples = set()

def update_samples(self, sample_name: str):
if sample_name in self.samples:
raise ValueError(f"{sample_name} is duplicated!")
self.samples.add(sample_name)

def get_samples(self) -> List[str]:
return list(self.samples)


def path_is_fastq(path: str) -> bool:
"""If path represents FASTQ files .
"""
return len(glob.glob(f"{path}/*.fastq.gz")) > 0 or len(glob.glob(f"{path}/*/*.fastq.gz")) > 0


def transfer_fastq(
source: str,
dest: str,
backend: str,
samples: List[str],
dry_run: bool,
profile: Optional[str] = None,
verbose: bool = True,
) -> None:
if os.path.isdir(source):
strato_cmd = ['strato', 'sync', '--backend', backend, '--ionice', '-m', '--quiet', source, dest]
else:
strato_cmd = ['strato', 'cp', '--backend', backend, '--ionice', '-m', '--quiet', source, os.path.dirname(dest) + '/']
for sample in samples:
if len(glob.glob(f"{source}/{sample}_*.fastq.gz")) > 0:
strato_cmd = ['strato', 'cp', '--backend', backend, '--ionice', '-m', '--quiet', f"{source}/{sample}_*.fastq.gz", dest + '/']
elif len(glob.glob(f"{source}/{sample}/{sample}_*.fastq.gz")) > 0:
strato_cmd = ['strato', 'sync', '--backend', backend, '--ionice', '-m', '--quiet', f"{source}/{sample}", f"{dest}/{sample}"]
else:
raise ValueError(f"'{sample}' doesn't have any corresponding FASTQ file!")

if profile is not None:
strato_cmd.extend(['--profile', profile])
if profile is not None:
strato_cmd.extend(['--profile', profile])

run_command(strato_cmd, dry_run, suppress_stdout=not verbose)
run_command(strato_cmd, dry_run, suppress_stdout=not verbose)
177 changes: 82 additions & 95 deletions alto/utils/io_utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from typing import Optional
import os
import re
import json
import os, re, json, tempfile
import numpy as np
import pandas as pd
import tempfile
from collections import defaultdict
from typing import Tuple, Dict
from glob import glob
from collections import namedtuple
from typing import Dict, Tuple, Optional

from alto.utils import prefix_float, run_command
from .bcl_utils import lane_manager, path_is_flowcell, transfer_flowcell
from .fastq_utils import folder_is_fastq, transfer_fastq
from .bcl_utils import lane_manager, path_is_bcl, transfer_flowcell
from .fastq_utils import sample_manager, path_is_fastq, transfer_fastq


FlowcellType = namedtuple('FlowcellType', ['type', 'manager'])


def read_wdl_inputs(input_json: str) -> dict:
Expand Down Expand Up @@ -51,47 +49,16 @@ def __init__(self, backend, bucket): # here bucket should also include bucket fo
self.scheme = 'gs' if backend == 'gcp' else 's3'
self.bucket = bucket
self.unique_urls = set()
self.unique_urls_d = dict()
self.lastBaseDir = ""
self.parentDirCounter = 1

def get_unique_url(self, input_path: str):
counter = 1
is_fastq_dir = False
if os.path.isdir(input_path):
listOfFiles = []
for (dirpath, dirnames, filenames) in os.walk(input_path):
listOfFiles += [os.path.join(dirpath, file) for file in filenames if file.endswith("fastq.gz")]
if listOfFiles:
is_fastq_dir = True
if glob(input_path+"_*.fastq.gz"):
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(os.path.dirname(input_path))}/'
pathdir = os.path.basename(os.path.dirname(input_path))
input_dirname = self.unique_urls_d.get(uniq_url,"")
while uniq_url in self.unique_urls and input_dirname != os.path.dirname(input_path):
counter += 1
uniq_url = f'{self.scheme}://{self.bucket}/{pathdir}_{counter}/'
elif is_fastq_dir:
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(os.path.dirname(input_path))}/{os.path.basename(input_path)}/'
input_dirname = self.unique_urls_d.get(uniq_url,"")

if uniq_url in self.unique_urls and self.lastBaseDir == os.path.dirname(os.path.dirname(input_path)):
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(os.path.dirname(input_path))}_{self.parentDirCounter}/{os.path.basename(input_path)}/'
elif uniq_url in self.unique_urls and input_dirname != os.path.dirname(input_path):
self.parentDirCounter += 1
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(os.path.dirname(input_path))}_{self.parentDirCounter}/{os.path.basename(input_path)}/'
elif uniq_url not in self.unique_urls and self.lastBaseDir == os.path.dirname(os.path.dirname(input_path)) and self.parentDirCounter > 1:
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(os.path.dirname(input_path))}_{self.parentDirCounter}/{os.path.basename(input_path)}/'

self.lastBaseDir = os.path.dirname(os.path.dirname(input_path))
else:
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(input_path)}'
root, ext = os.path.splitext(uniq_url)
while uniq_url in self.unique_urls:
counter += 1
uniq_url = f'{root}_{counter}{ext}'
uniq_url = f'{self.scheme}://{self.bucket}/{os.path.basename(input_path)}'
root, ext = os.path.splitext(uniq_url)
while uniq_url in self.unique_urls:
counter += 1
uniq_url = f'{root}_{counter}{ext}'
self.unique_urls.add(uniq_url)
self.unique_urls_d[uniq_url]=os.path.dirname(input_path)

return uniq_url


Expand All @@ -100,10 +67,9 @@ def transfer_data(
dest: str,
backend: str,
dry_run: bool,
flowcells: Dict[str, lane_manager] = None,
verbose: bool = True,
flowcells: Dict[str, FlowcellType] = None,
profile: Optional[str] = None,
is_fastq_dir: bool = False,
verbose: bool = True,
) -> None:
"""Transfer source to dest (cloud destination).
backend, choosing from gcp and aws.
Expand All @@ -112,25 +78,29 @@ def transfer_data(
if verbose:
print(f'{"Dry run: " if dry_run else ""}Uploading {source} to {dest}.')

if (not is_fastq_dir) and path_is_flowcell(source):
lanes = flowcells[source].get_lanes() if flowcells is not None else ['*']
transfer_flowcell(
source=source,
dest=dest,
backend=backend,
lanes=lanes,
dry_run=dry_run,
profile=profile,
)
elif is_fastq_dir:
transfer_fastq(
source=source,
dest=dest,
backend=backend,
dry_run=dry_run,
verbose=verbose,
profile=profile,
)
if flowcells != None and source in flowcells:
flowcell = flowcells[source]
if flowcell.type == 'bcl':
transfer_flowcell(
source=source,
dest=dest,
backend=backend,
lanes=flowcell.manager.get_lanes(),
dry_run=dry_run,
profile=profile,
verbose=verbose,
)
else:
assert flowcell.type == 'fastq'
transfer_fastq(
source=source,
dest=dest,
backend=backend,
samples=flowcell.manager.get_samples(),
dry_run=dry_run,
profile=profile,
verbose=verbose,
)
else:
if os.path.isdir(source):
strato_cmd = ['strato', 'sync', '--backend', backend, '--ionice', '-m', '--quiet', source, dest]
Expand All @@ -148,8 +118,8 @@ def transfer_sample_sheet(
input_file_to_output_url: dict,
url_gen: cloud_url_factory,
dry_run: bool,
verbose: bool = True,
profile: Optional[str] = None,
verbose: bool = True,
) -> Tuple[str, bool]:
"""Check sample sheet and upload files inside it.
input_file: sample sheet
Expand All @@ -174,49 +144,65 @@ def transfer_sample_sheet(
except Exception:
return input_file, is_changed

flowcells = defaultdict(lane_manager)
flowcells = {}
col_names = np.char.array(df.iloc[0,:], unicode = True).lower()

if ('flowcell' in col_names) and ('lane' in col_names):
if ('flowcell' in col_names) or ('location' in col_names):
flowcell_keyword = 'flowcell' if 'flowcell' in col_names else 'location'
df.columns = col_names
for idx, row in df[1:].iterrows():
flowcells[row['flowcell']].update_lanes(row['lane'])

sample_keyword = 'library' if 'library' in col_names else 'sample'
idx_sample = np.where(col_names==sample_keyword)[0][0] if np.where(col_names==sample_keyword)[0].size > 0 else None
sample_keyword = None
if 'library' in col_names:
sample_keyword = 'library'
elif 'sample' in col_names:
sample_keyword = 'sample'
else:
raise ValueError("Cannot detect either Library or Sample column in the sample sheet!")

for _, row in df[1:].iterrows():
path = os.path.abspath(row[flowcell_keyword])
if not os.path.isdir(path):
raise ValueError(f"{path} is not a folder!")
if not os.access(path, os.X_OK):
raise PermissionError(f"Need execution access to folder '{path}'!")

flowcell = None
if path in flowcells:
flowcell = flowcells[path]
else:
if path_is_bcl(path):
flowcell = FlowcellType(type='bcl', manager=lane_manager())
elif path_is_fastq(path):
flowcell = FlowcellType(type='fastq', manager=sample_manager())
else:
raise ValueError(f"{path} is neither a BCL folder nor a FASTQ folder!")
flowcells[path] = flowcell

if flowcell.type == 'bcl':
flowcell.manager.update_lanes(row['lane'] if 'lane' in row else '*')
else:
flowcell.manager.update_samples(row[sample_keyword])

for _, row in df[1:].iterrows():
sample_name = row[idx_sample] if idx_sample is not None else None
for idxc, value in row.iteritems():
if isinstance(value, str) and os.path.exists(value):
is_fastq_dir = False
file_pattern = folder_is_fastq(value, sample_name)
if file_pattern is not None:
# Fastq folder.
is_fastq_dir = True
source = file_pattern
local_path = os.path.abspath(value) + '/' + sample_name
sub_url = input_file_to_output_url.get(local_path, None)
else:
# BCL folder, or file.
source = os.path.abspath(value)
sub_url = input_file_to_output_url.get(source, None)
source = os.path.abspath(value)
sub_url = input_file_to_output_url.get(source, None)

if sub_url is None:
path_key = source if not is_fastq_dir else local_path
sub_url = url_gen.get_unique_url(path_key)
sub_url = url_gen.get_unique_url(source)
transfer_data(
source=source,
dest=sub_url,
backend=backend,
dry_run=dry_run,
flowcells=flowcells,
verbose=verbose,
profile=profile,
is_fastq_dir=is_fastq_dir,
verbose=verbose,
)
input_file_to_output_url[path_key] = sub_url
row[idxc] = sub_url if not is_fastq_dir else os.path.dirname(sub_url)
input_file_to_output_url[source] = sub_url

row[idxc] = sub_url
is_changed = True

if is_changed:
Expand Down Expand Up @@ -310,6 +296,7 @@ def upload_to_cloud_bucket(
verbose=verbose,
profile=profile,
)

inputs[k] = input_url
if is_changed: # delete temporary file after uploading
os.remove(input_path)
Expand Down

0 comments on commit 67cf9b5

Please sign in to comment.