### Loading packages from config/init.py

In [None]:
%run ../config/init.py

### Testing gcloud configuration

In [None]:
account = !gcloud config get-value account
account = ''.join(account)
project = !gcloud config get-value project
project = ''.join(project)
if account != '(unset)' and project != '(unset)':
    print('Using account: {} with project: {}'.format(account, project))
else:
    print('Please, configure Cloud SDK before running this notebook')
    print('Open a Terminal and run: gcloud init')

### Defining variables

Edit GCP zone and region variable accordingly to your geographical location.

In [None]:
result_dir = os.path.join(RESULTS, DATASET, 'annotation')
if not os.path.exists(result_dir):
    os.mkdir(result_dir) 
os.chdir(result_dir)
print('Using as output directory: {}'.format(result_dir))

In [None]:
ZONE = 'us-east1'
REGION = 'us-east1-c'
QUERY_SIZE = 10000
KINGDOM_TAXID = 33090
BLASTDB_BUCKET = 'cbb-research-dl-blastdb-v5'
PIPELINE_JSON = os.path.join(BIN, 'gcp', 'pipeline-transcriptome-annotation.json')
RPSTBLASTN_PIPELINE = os.path.join(BIN, 'gcp', 'pipeline-transcriptome-annotation-rpstblastn.json')
RPSBLAST_PIPELINE = os.path.join(BIN, 'gcp', 'pipeline-transcriptome-annotation-rpsblast.json')

# Prices from 06/10/2021
# machine type n1-standard-96 Preemptible with 2 local SSD
PRICE_N96 = 0.96
# machine type n1-standard-8 Preemptible with 2 local SSD
PRICE_N8 = 0.08

### Create or retrieve GCP storage bucket

In [None]:
bucket_list = !gsutil ls
bucket = None

prefix = 'gs://{}-data-'.format(DATASET.lower())
for l in bucket_list:
    if prefix in l:
        bucket = l.replace('gs://{}-data-'.format(DATASET.lower()),'').replace('/','')
        break

if not bucket:
    print('ERROR!!!\nMust run notebook 06 - Vector Detection and data Partitioning notebook')
else:
    inbucket = '{}-data-{}'.format(DATASET.lower(), bucket)
    outbucket = '{}-res-{}'.format(DATASET.lower(), bucket)
    bucket_list = !gsutil ls gs://{outbucket}
    if ''.join(bucket_list).startswith('BucketNotFoundException'):
        !gsutil mb gs://{outbucket}

    tax_file = os.path.join(DATA, 'taxonomy', 'taxonomy_networkx.pickle')
    !gsutil -m cp {RPSBLAST_PIPELINE} {RPSTBLASTN_PIPELINE} {tax_file} gs://{inbucket}/

### Submitting jobs for annotation workflow

After running this cell you should go to the [Google Cloud Console](https://console.cloud.google.com/compute) to visualize all running instances.

In [None]:
os.chdir(result_dir)
operations = {
    'operations': pandas.DataFrame(columns=['sample', 'id', 'status']),
    'logs': {},
    'logs_rpsblast': {},
    'logs_rpstblastn': {}
}

samples = []
ls_files = !gsutil ls gs://{inbucket}/{QUERY_SIZE}_*.fsa.gz
for f in ls_files:
    samples.append(f.replace('gs://{}/'.format(inbucket),'').replace('.fsa.gz',''))

op_dir = os.path.join(result_dir, 'gcp')
if not os.path.exists(op_dir):
    os.mkdir(op_dir)
os.chdir(op_dir)

if os.path.exists('operations.tsv'):
    operations['operations'] = pandas.read_csv('operations.tsv', sep='\t')

if len(operations['operations']) != len(samples):
    for sample in samples:  
        if operations['operations'][operations['operations']['sample'] == sample].empty:
            print('Submitting sample: {}'.format(sample))
            a = !gcloud beta lifesciences pipelines run --pipeline-file={PIPELINE_JSON} --env-vars=INBUCKET={inbucket},OUTBUCKET={outbucket},SAMPLE={sample},BLASTDB_BUCKET={BLASTDB_BUCKET},TAXID={KINGDOM_TAXID}
            df_length = len(operations['operations'])
            if len(a) == 1 and a[0].startswith('Running'):
                a = a[0].replace('].','').split('/')[5]
                operations['operations'].loc[df_length] = [sample, a, 'running']
            else:
                operations['operations'].loc[df_length] = [sample, None, a]
    operations['operations'].to_csv('operations.tsv', sep='\t', index=None)

display(operations['operations'])

### Downloading results fro the GCP storage bucket

In [None]:
blast_dir = os.path.join(result_dir, 'blasts')
if not os.path.exists(blast_dir):
    os.mkdir(blast_dir) 
os.chdir(blast_dir)
!gsutil -m cp -Rn gs://{outbucket}/{QUERY_SIZE}* ./


### GCP log retrival for plotting
This cell will download the GCP logs for completed operations (jobs) creating the `[sample_#.json` files for each sample. 

You should execute it multiple times until all operations are completed.

In [None]:
os.chdir(op_dir)
    
df = operations['operations'].dropna()
data = []  
data_rpsblast = []
data_rpstblastn = []
rerun = False
for i, r in df.iterrows():
    id = r['id']
    sample = r['sample']
    if os.path.exists('{}.json.gz'.format(r['sample'])):
        with gzip.GzipFile('{}.json.gz'.format(r['sample']), 'r') as fin:  
            operations['logs'][r['sample']] = json.loads(fin.read().decode('utf-8'))
    else:
        if sample not in operations['logs']:
            a = !gcloud beta lifesciences operations describe --format=json {id}
            l = json.loads(''.join(a))
            if 'done' in l and 'error' not in l:
                operations['logs'][sample] = l
                with gzip.GzipFile('{}.json.gz'.format(sample), 'w') as fout:   # 4. gzip
                    fout.write(json.dumps(l, indent=2).encode('utf-8'))
                if os.path.exists('{}_error.json.gz'.format(sample)):
                    print('Removing error file {}_error.json.gz'.format(sample))
                    os.remove('{}_error.json.gz'.format(sample))
            else:   
                rerun = True
                print('Sample {0} with error. Check file: {1}/{0}_error.json.gz'.format(sample, op_dir))
                with gzip.GzipFile('{}_error.json.gz'.format(sample), 'w') as fout:   # 4. gzip
                    fout.write(json.dumps(l, indent=2).encode('utf-8'))
                operations['operations'] = operations['operations'][operations['operations']['sample'] != sample]
                !gsutil -m rm -r gs://{outbucket}/{sample}
    if os.path.exists('{}_rpsblast.json.gz'.format(r['sample'])):
        with gzip.GzipFile('{}_rpsblast.json.gz'.format(r['sample']), 'r') as fin:  
            operations['logs_rpsblast'][r['sample']] = json.loads(fin.read().decode('utf-8'))
    elif os.path.exists(os.path.join(blast_dir, sample, 'rpsblast_operation.txt')):
        with open(os.path.join(blast_dir, sample, 'rpsblast_operation.txt')) as fin:
            line = fin.read().strip()
            id = line[line.rindex('/') + 1:-2]
            a = !gcloud beta lifesciences operations describe --format=json {id}
            l = json.loads(''.join(a))
            if 'done' in l and 'error' not in l:
                operations['logs_rpsblast'][sample] = l
                with gzip.GzipFile('{}_rpsblast.json.gz'.format(sample), 'w') as fout:   # 4. gzip
                    fout.write(json.dumps(l, indent=2).encode('utf-8'))
                if os.path.exists('{}_rpsblast_error.json.gz'.format(sample)):
                    print('Removing error file {}_rpsblast_error.json.gz'.format(sample))
                    os.remove('{}_rpsblast_error.json.gz'.format(sample))
            else:
                rerun = True
                print('Sample {0} with error in RPSBlast. Check file: {1}/{0}_rpsblast_error.json.gz'.format(sample, op_dir))
                with gzip.GzipFile('{}_rpsblast_error.json.gz'.format(sample), 'w') as fout:   # 4. gzip
                    fout.write(json.dumps(l, indent=2).encode('utf-8'))
                operations['operations'] = operations['operations'][operations['operations']['sample'] != sample]
                !gsutil -m rm -r gs://{outbucket}/{sample}
    else:
        print('Sample {0} with no RPSBlast opration file.'.format(sample))
        
    if os.path.exists('{}_rpstblastn.json.gz'.format(r['sample'])):
        with gzip.GzipFile('{}_rpstblastn.json.gz'.format(r['sample']), 'r') as fin:  
            operations['logs_rpstblastn'][r['sample']] = json.loads(fin.read().decode('utf-8'))
    elif os.path.exists(os.path.join(blast_dir, sample, 'rpstblastn_operation.txt')):
        with open(os.path.join(blast_dir, sample, 'rpstblastn_operation.txt')) as fin:
            line = fin.read().strip()
            id = line[line.rindex('/') + 1:-2]
            a = !gcloud beta lifesciences operations describe --format=json {id}
            l = json.loads(''.join(a))
            if 'done' in l and 'error' not in l:
                operations['logs_rpstblastn'][sample] = l
                with gzip.GzipFile('{}_rpstblastn.json.gz'.format(sample), 'w') as fout:   # 4. gzip
                    fout.write(json.dumps(l, indent=2).encode('utf-8'))
                if os.path.exists('{}_rpstblastn_error.json.gz'.format(sample)):
                    print('Removing error file {}_rpstblastn_error.json.gz'.format(sample))
                    os.remove('{}_rpstblastn_error.json.gz'.format(sample))
            else:
                rerun = True
                print('Sample {0} with error in RPSTBlastN. Check file: {1}/{0}_rpstblastn_error.json.gz'.format(sample, op_dir))
                with gzip.GzipFile('{}_rpstblastn_error.json.gz'.format(sample), 'w') as fout:   # 4. gzip
                    fout.write(json.dumps(l, indent=2).encode('utf-8'))
    else:
        print('Sample {0} with no RPSTBlastN operation file.'.format(sample))
                
    if r['sample'] in operations['logs']:
        d = parse_gcp_json(operations['logs'][r['sample']], r['sample'], 3, 6)
        if d:
            data.append(d)
    if r['sample'] in operations['logs_rpsblast']:
        d = parse_gcp_json(operations['logs_rpsblast'][r['sample']], r['sample'], 2, 5)
        if d:
            data_rpsblast.append(d)
            
    if r['sample'] in operations['logs_rpstblastn']:
        d = parse_gcp_json(operations['logs_rpstblastn'][r['sample']], r['sample'], 2, 5)
        if d:
            data_rpstblastn.append(d)

if rerun:
    print('\n\nERROR: Rerun previous cell to re-process failing samples')
    operations['operations'].to_csv('operations.tsv', sep='\t', index=None)
else:            
    operations['gcp'] = pandas.DataFrame(data, columns=['Sample', 'GCP', 'BlastDB', 'CWL'])
    operations['gcp']['GCP'] = operations['gcp']['GCP']/pandas.Timedelta('1 minute')
    operations['gcp']['BlastDB'] = operations['gcp']['BlastDB']/pandas.Timedelta('1 minute')
    operations['gcp']['CWL'] = operations['gcp']['CWL']/pandas.Timedelta('1 minute')
    display(operations['gcp'])
    
    operations['gcp_rpsblast'] = pandas.DataFrame(data_rpsblast, columns=['Sample', 'GCP', 'BlastDB', 'CWL'])
    operations['gcp_rpsblast']['GCP'] = operations['gcp_rpsblast']['GCP']/pandas.Timedelta('1 minute')
    operations['gcp_rpsblast']['BlastDB'] = operations['gcp_rpsblast']['BlastDB']/pandas.Timedelta('1 minute')
    operations['gcp_rpsblast']['CWL'] = operations['gcp_rpsblast']['CWL']/pandas.Timedelta('1 minute')
    display(operations['gcp_rpsblast'])
    
    operations['gcp_rpstblastn'] = pandas.DataFrame(data_rpstblastn, columns=['Sample', 'GCP', 'BlastDB', 'CWL'])
    operations['gcp_rpstblastn']['GCP'] = operations['gcp_rpstblastn']['GCP']/pandas.Timedelta('1 minute')
    operations['gcp_rpstblastn']['BlastDB'] = operations['gcp_rpstblastn']['BlastDB']/pandas.Timedelta('1 minute')
    operations['gcp_rpstblastn']['CWL'] = operations['gcp_rpstblastn']['CWL']/pandas.Timedelta('1 minute')
    display(operations['gcp_rpstblastn'])

In [None]:
def plot_cost(df, title, price, top_tick, low_tick):
    fig = plt.figure(figsize=(10,8), constrained_layout=True)

    box_plot = sns.boxplot(x="Cat", y="Time", data=df)

    top = []
    low = []
    billed = []
    cost = []
    for p in df['Cat'].unique():
        top.append(df[df['Cat'] == p]['Time'].max())
        low.append(df[df['Cat'] == p]['Time'].min())
        billed.append('$ {}'.format(round(df[df['Cat'] == p]['Time'].sum()/60 * price, 2)))
        cost.append('{:.2f}¢'.format(df[df['Cat'] == p]['Time'].sum()/60 * price * 100/(20000 * 20)))

    for xtick in box_plot.get_xticks():
        box_plot.text(xtick,top[xtick] + top_tick,billed[xtick], 
                horizontalalignment='center',size='small',weight='normal')

    for xtick in box_plot.get_xticks():
        box_plot.text(xtick,low[xtick] - low_tick,cost[xtick], 
                horizontalalignment='center',size='small',weight='normal')

    g = sns.stripplot(hue="Sample",x="Cat", y="Time", data=df, palette='tab20', size=10, color=".3")
    g.set_ylabel("Time (Minutes)", fontsize=12)
    g.set_xlabel("Steps")
    g.set_title(title, fontsize=16)
    g.get_legend().remove()


In [None]:
df = pandas.DataFrame()
df['Time'] = operations['gcp']['GCP']
df['Cat'] = 'Total'
df['Sample'] = operations['gcp']['Sample']
d = pandas.DataFrame()
d['Time'] = operations['gcp']['CWL']
d['Cat'] = 'CWL'
d['Sample'] = operations['gcp']['Sample']
df = pandas.concat([df, d])
d = pandas.DataFrame()
d['Time'] = operations['gcp']['BlastDB']
d['Cat'] = 'BlastDB'
d['Sample'] = operations['gcp']['Sample']
df = pandas.concat([df, d])
df['Time'] = df.Time.astype(float)

cost = round(df[df['Cat'] == 'Total']['Time'].sum()/60 * PRICE_N96, 2)
plot_cost(df, 'Cost for BlastN and BlastP', PRICE_N96, 3, 8)

df = pandas.DataFrame()
df['Time'] = operations['gcp_rpsblast']['GCP']
df['Cat'] = 'Total'
df['Sample'] = operations['gcp_rpsblast']['Sample']
d = pandas.DataFrame()
d['Time'] = operations['gcp_rpsblast']['CWL']
d['Cat'] = 'CWL'
d['Sample'] = operations['gcp_rpsblast']['Sample']
df = pandas.concat([df, d])
d = pandas.DataFrame()
d['Time'] = operations['gcp_rpsblast']['BlastDB']
d['Cat'] = 'BlastDB'
d['Sample'] = operations['gcp_rpsblast']['Sample']
df = pandas.concat([df, d])
df['Time'] = df.Time.astype(float)

cost += round(df[df['Cat'] == 'Total']['Time'].sum()/60 * PRICE_N8, 2)
plot_cost(df, 'Cost for RPSBlast', PRICE_N8, 0.5, 1)

df = pandas.DataFrame()
df['Time'] = operations['gcp_rpstblastn']['GCP']
df['Cat'] = 'Total'
df['Sample'] = operations['gcp_rpstblastn']['Sample']
d = pandas.DataFrame()
d['Time'] = operations['gcp_rpstblastn']['CWL']
d['Cat'] = 'CWL'
d['Sample'] = operations['gcp_rpstblastn']['Sample']
df = pandas.concat([df, d])
d = pandas.DataFrame()
d['Time'] = operations['gcp_rpstblastn']['BlastDB']
d['Cat'] = 'BlastDB'
d['Sample'] = operations['gcp_rpstblastn']['Sample']
df = pandas.concat([df, d])
df['Time'] = df.Time.astype(float)

cost += round(df[df['Cat'] == 'Total']['Time'].sum()/60 * PRICE_N8, 2)
plot_cost(df, 'Cost for RPSTBlastN', PRICE_N8, 5, 10)

print('Total cost of annotation: $ {:.2f}'.format(cost))