# Neo4J v057 test2 v01 - Trellis : Job based Analysis
================

## Set up the environment


### Install py2neo for querying Neo4J 

In [132]:
#!pip3 install -U py2neo

# add python path of py2neo in system

#!pip3 install -U neotime
#!pip3 install -U neobolt
#!pip3 install -U pandas-gbq

### Import Packages

In [133]:
from py2neo import Graph
from google.cloud import storage
import yaml

import pandas as pd
import pandas_gbq

import numpy as np
import subprocess
import matplotlib 
import matplotlib.pyplot as plt
import seaborn as sns

#plt.style.use('fivethirtyeight')
#plt.style.use('ggplot')
pd.set_option('display.float_format', lambda x: '%.2f' % x)

### Load Neo4J DB

In [134]:
## Option 1 : Read DB and Account Information in Google Storage (YAML)

# create storage client
storage_client = storage.Client()
# get bucket with name
bucket = storage_client.get_bucket('gbsc-gcp-project-mvp-{env}-trellis')
# get bucket data as blob
blob = bucket.get_blob('credentials/{env}-wgs35.yaml')
# convert to string
yaml_data = blob.download_as_string()

account = yaml.load(yaml_data, Loader=yaml.FullLoader)

## Main Account
graph = Graph(account['NEO4J_SCHEME']+'://'+account['NEO4J_HOST']+":"+str(account['NEO4J_PORT']), auth=(account['NEO4J_USER'],account['NEO4J_PASSPHRASE']))

-------
## FQ2U Job

### FQ2U table

In [135]:
## Query
query = "Match (fu:Job:Dsub {name:'fastq-to-ubam'})-[:STATUS]->(s:Dstat) RETURN fu.sample AS sample, fu.readGroup AS fq2urg_gatkid, fu.duplicate AS dup, fu.machineType AS VMtype, fu.durationMinutes as runtime, s.status as dstat_status, s.statusMessage as dstat_msg, s.logging as dstat_log"
job_fq2u = graph.run(query).to_data_frame()
job_fq2u.set_index('sample')

## Variable
num_fq2u_sample=len(job_fq2u['sample'].unique())
num_fq2u_job=len(job_fq2u)

## Print (Info)
print("The number of samples with FQ2U jobs : " + str(num_fq2u_sample))
print("The number of FQ2U jobs : " + str(num_fq2u_job))

## Bigquery Table Format
job_fq2u['exp_vm']=1
job_fq2u['unit_cost']=0
job_fq2u['cost']=0
job_fq2u['job']='FQ2U'
job_fq2u['attempts']=None
job_fq2u['unit_runtime']=None
columnlist=['sample','job','fq2urg_gatkid','dup','attempts','exp_vm','runtime','unit_runtime','VMtype','unit_cost','cost','dstat_status','dstat_msg','dstat_log']
job_fq2u=job_fq2u[columnlist]

#display(job_fq2u.head())

The number of samples with FQ2U jobs : 288
The number of FQ2U jobs : 1156


### FQ2U Duplication Check

In [136]:
## Query
fq2u_dup=job_fq2u.loc[job_fq2u['dup']==True,:]

## Variable
num_dup_fq2u_sample=len(fq2u_dup['sample'].unique())
num_dup_fq2u_job=len(fq2u_dup)

print("The number(percentage) of samples with duplicated FQ2U jobs : " + str(len(fq2u_dup['sample'].unique()))+" ("+'{:2f}'.format((num_dup_fq2u_sample/num_fq2u_sample)*100)+"%)")
print("The number(percentage) of FQ2U duplicated jobs : " + str(num_dup_fq2u_job)+" ("+'{:2f}'.format((num_dup_fq2u_job/num_fq2u_job)*100)+"%)")

#display(test)

The number(percentage) of samples with duplicated FQ2U jobs : 1 (0.347222%)
The number(percentage) of FQ2U duplicated jobs : 1 (0.086505%)


-------
## GATK Job

### GATK table

In [137]:
## Query
query = "MATCH (j:Job:CromwellWorkflow)-[:STATUS]->(s:Dstat) RETURN j.sample AS sample, j.cromwellWorkflowId AS fq2urg_gatkid, \
j.duplicate AS dup, j.durationMinutes as runtime, j.machineType as VMtype, s.status as dstat_status, s.statusMessage as dstat_msg, s.logging as dstat_log"
job_gatk = graph.run(query).to_data_frame()
job_gatk.set_index('sample')

## Variable
num_gatk_sample=len(job_gatk['sample'].unique())
num_gatk_job=len(job_gatk)

## Print (Info)
print("The number of samples with GATK jobs : " + str(num_gatk_sample))
print("The number of GATK jobs : " + str(num_gatk_job))

## Bigquery Table Format
job_gatk['job']='GATK'
job_gatk['attempts']=None
job_gatk['unit_runtime']=None
columnlist=['sample','job','fq2urg_gatkid','dup','attempts','runtime','unit_runtime','VMtype','dstat_status','dstat_msg','dstat_log']
job_gatk=job_gatk[columnlist]

display(job_gatk.head())

The number of samples with GATK jobs : 288
The number of GATK jobs : 360


Unnamed: 0,sample,job,fq2urg_gatkid,dup,attempts,runtime,unit_runtime,VMtype,dstat_status,dstat_msg,dstat_log
0,SHIP4962328,GATK,,,,3,,custom-2-12288,FAILURE,Workflow fdbf619e-f1e2-4693-9283-e312bbe53e06 ...,gs://gbsc-gcp-project-mvp-test-from-personalis...
1,SHIP5119477,GATK,,,,3,,custom-2-12288,FAILURE,Workflow c6fcc01a-0806-4a2d-8ca4-69730213cc00 ...,gs://gbsc-gcp-project-mvp-test-from-personalis...
2,SHIP5119479,GATK,,,,3,,custom-2-12288,FAILURE,Workflow 8a7ae6ab-5cb3-4c87-93a6-afbf8e43d4cb ...,gs://gbsc-gcp-project-mvp-test-from-personalis...
3,SHIP5141884,GATK,,,,3,,custom-2-12288,FAILURE,Workflow 33cd21bf-96a2-4e46-9ba7-d316d43f274a ...,gs://gbsc-gcp-project-mvp-test-from-personalis...
4,SHIP5141929,GATK,,,,3,,custom-2-12288,FAILURE,Workflow 637727fa-f9f7-4a84-ab37-55bf4dca4673 ...,gs://gbsc-gcp-project-mvp-test-from-personalis...


### GATK Duplication Check

In [138]:
## Query
gatk_dup=job_gatk.loc[job_gatk['dup']==True,:]

## Variable
num_dup_gatk_sample=len(gatk_dup['sample'].unique())
num_dup_gatk_job=len(gatk_dup)

print("The number(percentage) of samples with duplicated GATK jobs : " + str(len(gatk_dup['sample'].unique()))+" ("+'{:2f}'.format((num_dup_gatk_sample/num_gatk_sample)*100)+"%)")
print("The number(percentage) of GATK duplicated jobs : " + str(num_dup_gatk_job)+" ("+'{:2f}'.format((num_dup_gatk_job/num_gatk_job)*100)+"%)")

#display(test)

The number(percentage) of samples with duplicated GATK jobs : 0 (0.000000%)
The number(percentage) of GATK duplicated jobs : 0 (0.000000%)


### GATK exp_vm and add_vm

In [139]:
## expected vm data frame
exp_vm_df=pd.read_excel("./GATKstep_expected_vm.xlsx")

In [140]:
merged_attemps_df=pd.merge(exp_vm_df,job_gatk,left_on=['job'],right_on=['job'],how='right')
#merged_attemps_df['added_vm']=job_gatk['attempts']-merged_attemps_df['exp_vm']

-------
## GATK substeps

### Attempts table

In [141]:
## Query
query = "MATCH (g:Job:CromwellWorkflow)-[:LED_TO*]->(s:CromwellStep)-[:HAS_ATTEMPT]-()-[*0..100]->(j:Job) \
WHERE g.cromwellWorkflowId=s.cromwellWorkflowId RETURN g.sample as sample, s.cromwellWorkflowId as fq2urg_gatkid, \
s.wdlCallAlias as job, count(distinct j) as attempts, (max(j.stopTimeEpoch)-min(j.startTimeEpoch))/60 as runtime, avg(j.durationMinutes) as unit_runtime, j.machineType as VMtype"
#query = "MATCH (j:Job:CromwellWorkflow)-[:STATUS]->(s:Dstat) RETURN j.sample AS sample, j.duplicate AS dup, j.durationMinutes as runtime, s.status as dstat_status, s.statusMessage as dstat_msg, s.logging as dstat_log"
job_gatk_step = graph.run(query).to_data_frame()
job_gatk_step.set_index('sample')

## Variable
num_gatk_sample=len(job_gatk_step['sample'].unique())
num_gatk_subjobs=len(job_gatk_step)

## Print (Info)
print("The number of samples with GATK steps : " + str(num_gatk_sample))
print("The number of GATK subjobs : " + str(num_gatk_subjobs))

The number of samples with GATK steps : 288
The number of GATK subjobs : 6302


### GATK Duplication Check

In [142]:
## Bigquery Table Format
job_gatk_info=job_gatk[['sample','fq2urg_gatkid','dup','dstat_status','dstat_msg','dstat_log']]
job_gatk_stepm=pd.merge(job_gatk_info, job_gatk_step, left_on=['sample','fq2urg_gatkid'], right_on=['sample','fq2urg_gatkid'], how='right')
columnlist=['sample','job','fq2urg_gatkid','dup','attempts','runtime','unit_runtime','VMtype','dstat_status','dstat_msg','dstat_log']
job_gatk_stepm=job_gatk_stepm[columnlist]

display(job_gatk_stepm.head())

Unnamed: 0,sample,job,fq2urg_gatkid,dup,attempts,runtime,unit_runtime,VMtype,dstat_status,dstat_msg,dstat_log
0,SHIP5141880,getbwaversion,cce30376-db76-44e0-b308-6d9163b5f195,,1,3.43,3.0,custom-1-1024,FAILURE,nt.googleapis.services.AbstractGoogleClient <i...,gs://gbsc-gcp-project-mvp-test-from-personalis...
1,SHIP5141880,scatterintervallist,cce30376-db76-44e0-b308-6d9163b5f195,,1,4.06,4.0,custom-1-2048,FAILURE,nt.googleapis.services.AbstractGoogleClient <i...,gs://gbsc-gcp-project-mvp-test-from-personalis...
2,SHIP5141880,samsplitter,cce30376-db76-44e0-b308-6d9163b5f195,,5,189.35,99.4,custom-1-3840,FAILURE,nt.googleapis.services.AbstractGoogleClient <i...,gs://gbsc-gcp-project-mvp-test-from-personalis...
3,SHIP5141880,samtofastqandbwamemandmba,cce30376-db76-44e0-b308-6d9163b5f195,,24,181.89,44.0,custom-16-14848,FAILURE,nt.googleapis.services.AbstractGoogleClient <i...,gs://gbsc-gcp-project-mvp-test-from-personalis...
4,SHIP5141880,sumsplitalignedsizes,cce30376-db76-44e0-b308-6d9163b5f195,,4,131.1,2.5,custom-2-10240,FAILURE,nt.googleapis.services.AbstractGoogleClient <i...,gs://gbsc-gcp-project-mvp-test-from-personalis...


### GATK exp_vm and add_vm

In [143]:
## expected vm data frame
exp_vm_df=pd.read_excel("./GATKstep_expected_vm.xlsx")

In [144]:
job_gatk_stepm=pd.merge(exp_vm_df,job_gatk_stepm,left_on=['job'],right_on=['job'],how='right')
#merged_attemps_df['added_vm']=job_gatk['attempts']-merged_attemps_df['exp_vm']

-------
## Merge 

In [145]:
job_df=pd.concat([job_fq2u,job_gatk,job_gatk_stepm]).sort_values(['sample','job','fq2urg_gatkid'])
display(job_df)

of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.


  """Entry point for launching an IPython kernel.


Unnamed: 0,VMtype,attempts,cost,dstat_log,dstat_msg,dstat_status,dup,exp_vm,fq2urg_gatkid,job,runtime,sample,unit_cost,unit_runtime
337,custom-2-7680,,0.00,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,0,FQ2U,53.00,SHIP4946367,0.00,
238,custom-2-7680,,0.00,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,1,FQ2U,54.00,SHIP4946367,0.00,
379,custom-2-7680,,0.00,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,2,FQ2U,53.00,SHIP4946367,0.00,
341,custom-2-7680,,0.00,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,3,FQ2U,52.00,SHIP4946367,0.00,
107,custom-2-12288,,,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,,aeac1b5a-8a63-4014-8771-93efca54bdcd,GATK,1350.00,SHIP4946367,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
942,custom-16-14848,7,,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,4.00,439b1b52-b16d-41e8-af5f-22b88bdfc7dd,samtofastqandbwamemandmba,329.73,SHIP5141934,,142.71
486,custom-1-2048,1,,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,439b1b52-b16d-41e8-af5f-22b88bdfc7dd,scatterintervallist,4.02,SHIP5141934,,4.00
2760,custom-2-5120,2,,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,439b1b52-b16d-41e8-af5f-22b88bdfc7dd,sortsamplebam,306.87,SHIP5141934,,150.00
1852,custom-2-10240,1,,gs://gbsc-gcp-project-mvp-test-from-personalis...,Success,SUCCESS,,1.00,439b1b52-b16d-41e8-af5f-22b88bdfc7dd,sumfloats,3.20,SHIP5141934,,3.00


-------
## Cost

### unit_cost

In [146]:
## Custom CPU cost : 0.033174/CPU/Hour, # Custom Mem cost : 0.004446/GB/Hour, # Custom Disk cost : ???
cpu_sd_cost = 0.033174
memg_sd_cost = 0.004446

cpu_pem_cost = 0.00698
memg_pem_cost = 0.00094

## Extract cpu and mem info.
#temp=job_df.loc[:,['job','VMtype']]
job_df.loc[:,'vm_type']=None
job_df.loc[:,'cpu']=0
job_df.loc[:,'mem']=0
job_df[['vm_type','cpu','mem']]=[i.split('-') for i in job_df['VMtype']]
job_df['cpu']=[int(x) for x in job_df['cpu']]
job_df['mem']=[int(x) for x in job_df['mem']]
#columnlist=['job','VMtype','vm_type','cpu','mem']
#temp=temp[columnlist]

In [149]:
# FQ2U and GATK job unit cost with Standard VM
job_df.loc[job_df['job'].isin(['FQ2U','GATK']),'unit_cost']= np.array(job_df.loc[job_df['job'].isin(['FQ2U','GATK']),'cpu'])*cpu_sd_cost/60 + np.array(job_df.loc[job_df['job'].isin(['FQ2U','GATK']),'mem'])*memg_sd_cost/60/1000
# GATK sub jobs' unit cost with Preemptible VM
job_df.loc[job_df['job'].isin(['FQ2U','GATK'])==False,'unit_cost']=np.array(job_df.loc[job_df['job'].isin(['FQ2U','GATK'])==False,'cpu'])*cpu_pem_cost/60 + np.array(job_df.loc[job_df['job'].isin(['FQ2U','GATK'])==False,'mem'])*memg_pem_cost/60/1000

### job cost

In [150]:
pd.set_option('display.float_format', lambda x: '%.8f' % x)

## FQ2U
job_df.loc[job_df['job']=='FQ2U','cost']=np.array(job_df.loc[job_df['job']=='FQ2U','runtime'])*np.array(job_df.loc[job_df['job']=='FQ2U','unit_cost'])
#merged_cost_df.head(2)

## GATK
job_df.loc[job_df['job']=='GATK','cost']=np.array(job_df.loc[job_df['job']=='GATK','runtime'])*np.array(job_df.loc[job_df['job']=='GATK','unit_cost'])
#merged_cost_df[merged_cost_df['job']=='GATK'].head(2)

## GATK steps
job_df.loc[(job_df['job']!='FQ2U') & (job_df['job']!='GATK'),'cost']=np.array(job_df.loc[(job_df['job']!='FQ2U') & (job_df['job']!='GATK'),'attempts']) \
*np.array(job_df.loc[(job_df['job']!='FQ2U') & (job_df['job']!='GATK'),'unit_runtime'])*np.array(job_df.loc[(job_df['job']!='FQ2U') & (job_df['job']!='GATK'),'unit_cost'])

In [151]:
columnlist=['sample','job','fq2urg_gatkid','dup','exp_vm','attempts','runtime','unit_runtime','VMtype','unit_cost','cost','dstat_status','dstat_msg','dstat_log']
job_df=job_df[columnlist]
job_df.to_csv('job-based-analysis-v057-test2-v01.csv',index=False)

### Upload CSV Files to BigQuery

In [152]:
table_id='mvp_wgs35_v057_2.job_based_analysis'
projectid='gbsc-gcp-project-mvp-test'

pandas_gbq.to_gbq(
    job_df, table_id, project_id=projectid, if_exists='replace',
)

1it [00:11, 11.20s/it]
