-
Notifications
You must be signed in to change notification settings - Fork 2
/
tasks.py
125 lines (104 loc) · 3.66 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import gzip
import os
import shutil
from io import BytesIO
from subprocess import CalledProcessError
from tempfile import NamedTemporaryFile
from celery.exceptions import Ignore
from wort.app import create_celery_app
celery = create_celery_app()
@celery.task
def compute(sra_id):
import boto3
import botocore
from snakemake import shell
conn = boto3.client("s3")
s3 = boto3.resource("s3")
key_path = os.path.join("sigs", sra_id + ".sig")
try:
s3.Object("wort-sra", key_path).load()
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
pass # Object does not exist, let's compute it later
else:
# Something else has gone wrong
raise
else:
# The key already exists
return
with NamedTemporaryFile("w+b") as f:
try:
shell(
"fastq-dump --disable-multithreading --fasta 0 --skip-technical --readids --read-filter pass --dumpbase --split-spot --clip -Z {sra_id} | "
"sourmash compute -k 21,31,51 "
" --scaled 1000 "
" --track-abundance "
" --name {sra_id} "
" -o {output} "
" - ".format(sra_id=sra_id, output=f.name)
)
except CalledProcessError as e:
# We ignore SIGPIPE, since it is informational (and makes sense,
# it happens because `head` is closed and `fastq-dump` can't pipe
# its output anymore. More details:
# http://www.pixelbeat.org/programming/sigpipe_handling.html
if e.returncode != 141:
raise e
f.seek(0)
compressed_fp = BytesIO()
with gzip.GzipFile(fileobj=compressed_fp, mode="wb") as gz:
shutil.copyfileobj(f, gz)
conn.put_object(
Body=compressed_fp.getvalue(),
Bucket="wort-sra",
Key=key_path,
ContentType="application/json",
ContentEncoding="gzip",
)
@celery.task
def compute_genomes(accession, path, name):
import boto3
import botocore
from snakemake import shell
conn = boto3.client("s3")
s3 = boto3.resource("s3")
key_path = os.path.join("sigs", accession + ".sig")
try:
s3.Object("wort-genomes", key_path).load()
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
pass # Object does not exist, let's compute it later
else:
# Something else has gone wrong
raise
else:
# The key already exists
return
with NamedTemporaryFile("w+b") as f:
try:
shell(
f"sourmash compute -k 21,31,51 "
" --scaled 1000 "
" --track-abundance "
" --name {name:q} "
" -o {f.name} "
" <(curl {path})"
)
except CalledProcessError as e:
# We ignore SIGPIPE, since it is informational (and makes sense,
# it happens because `head` is closed and `fastq-dump` can't pipe
# its output anymore. More details:
# http://www.pixelbeat.org/programming/sigpipe_handling.html
if e.returncode != 141:
raise e
f.seek(0)
compressed_fp = BytesIO()
with gzip.GzipFile(fileobj=compressed_fp, mode="wb") as gz:
shutil.copyfileobj(f, gz)
conn.put_object(
Body=compressed_fp.getvalue(),
Bucket="wort-genomes",
Key=key_path,
ContentType="application/json",
ContentEncoding="gzip",
)