In [154]:
import io
import os
from datetime import datetime

from firecloud import fiss
import firecloud.api as fapi
import numpy as np
import pandas as pd

In [155]:
%%capture
# import pixiedust

# Gen3 High-Level Configuration and Functions

In [156]:
# The following information could(/should) be obtained from Gen3 (dynamically?).
# For now, use an explicit list.
GEN3_TABLE_NAMES={'aligned_reads_index',
 'aliquot',
 'blood_pressure_test',
 'case',
 'demographic',
 'exposure',
 'germline_variation_index',
 'lab_result',
 'medical_history',
 'medication',
 'read_group',
 'sample',
 'simple_germline_variation',
 'submitted_aligned_reads'}

In [157]:
GEN3_GENO_PHENO_MERGE_SPEC = [
    {
        "join_key": "simple_germline_variation",
        "table_names": ["simple_germline_variation", "germline_variation_index"]
    }
]
# GEN3_ENTITY_ID_COLUMN = "submitter_id_case"
GEN3_ENTITY_ID_COLUMN = "simple_germline_variation_eid"

In [158]:
def consolidate_gen3_geno_pheno_tables(new_table_name: str):
    consolidate_to_terra_table(GEN3_GENO_PHENO_MERGE_SPEC, new_table_name, GEN3_ENTITY_ID_COLUMN)

In [159]:
# action values: replace, update
def consolidate_gen3_geno_tables(new_table_name: str, action:str ="replace"):
    pass

# Common Mid-Level Functions

In [160]:
def consolidate_to_terra_table(merge_spec: list, entity_name: str, entity_id_column:str)  -> pd.DataFrame:
    consolidated_df = consolidate_to_df(merge_spec)
    # Add "entity:{entity_name}_id" as the first column, as required by Terra.
    # TODO Check if there is a better way to do this.
    consolidated_df.insert(0, f"entity:{entity_name}_id", consolidated_df[entity_id_column])
    columns = consolidated_df.columns.tolist()
    write_df_to_tsv_file(consolidated_df, "consolidated_df")
    consolidated_tsv = consolidated_df.to_csv(sep="\t", index=False)
    fiss_entity_import(BILLING_PROJECT_ID, WORKSPACE, consolidated_tsv, "flexible")

In [161]:
def consolidate_to_tsv(merge_spec: list)  -> pd.DataFrame:
    return consolidate_to_df(merge_spec).to_csv(sep="\t")

In [162]:
def consolidate_to_df(merge_spec: list)  -> pd.DataFrame:
    merged_df = None
    for merge_info in merge_spec:
        join_key = get_eid_column_name(merge_info['join_key'])
        merged_df = consolidate_tables_to_df(join_key, merge_info['table_names'], merged_df)
    return merged_df

# Common Lower-Level Functions

In [163]:
def consolidate_tables_to_terra_table(common_key: str, table_names: list, new_entity_type: str, entity_id_column:str) -> None:
    consolidated_df = consolidate_tables_to_df(common_key, table_names)
    # Add "entity:{new_entity_type}_id" column, as required by Terra
    consolidated_df[f"entity:{new_entity_type}_id"] = consolidated_df[entity_id_column]
    consolidated_tsv = consolidated_df.to_csv(sep="\t")
    fiss_entity_import(BILLING_PROJECT_ID, WORKSPACE, consolidated_tsv, "flexible")

In [164]:
def consolidate_tables_to_tsv(common_key: str, table_names: list) -> str:
    return consolidate_tables_to_df(common_key, table_names).to_csv(sep="\t")

In [165]:
def consolidate_tables_to_df(common_key: str, table_names: list, initial_df = None) -> pd.DataFrame:
    if initial_df is None:
        assert len(table_names) >= 2, "At least two table names are required." 
        table_name = table_names[0]
        merged_df = get_gen3_terra_table_to_df(BILLING_PROJECT_ID, WORKSPACE, table_name)
        table_names = table_names[1:]
    else:
        assert len(table_names) >= 1, "At least one table names is required to merge with previous data."
    for table_name in table_names:
        current_df = get_gen3_terra_table_to_df(BILLING_PROJECT_ID, WORKSPACE, table_name)
        # DEBUG -- Remove the following two lines before comitting
        write_df_to_tsv_file(merged_df, "merged_df")
        write_df_to_tsv_file(current_df, "current_df")
        merged_df = merged_df.merge(current_df, on=common_key, how="inner", copy=False, suffixes=(False, False))
        # Deduplicate "*_eid" columns
        merged_df = merged_df.loc[:,~merged_df.columns.duplicated()]
    return merged_df

In [166]:
def get_eid_column_name(entity_type: str):
    return f"{entity_type}_eid"

# Common Low-Level Functions

In [167]:
def rename_column(df: pd.DataFrame, current_column_name: str, new_column_name: str) -> None:
    df.rename(columns={current_column_name : new_column_name}, inplace=True)

In [168]:
# FISS entity_import expects the TSV content to be in a file, yet here it is already
# a string and it doesn't make sense to write content to a file only to read it in again.
# The data may be large, and FISS entity_import performs chunking, which should be used.
# Therefore bypass fiss `entity_import` per se and call `_batch_load` directly.
# EntityImportArgs = namedtuple("EntityImportArgs", ["project", "workspace", "tsvfile", "chunk_size", "model"])
def fiss_entity_import_batch_bad(project: str, workspace: str, entity_tsv: str, model: str):
    # args = EntityImportArgs(project, workspace, io.StringIO(entity_data), 500, model)
    # fiss.entity_import(args)
    entity_tsv_filelike = io.StringIO(entity_tsv)
    headerline = entity_tsv_filelike.readline().strip()
    entity_data = [l.rstrip('\n') for l in entity_tsv_filelike]
    return fiss._batch_load(project, workspace, headerline, entity_data, 500, model)

In [169]:
def fiss_entity_import(project: str, workspace: str, entity_tsv: str, model: str):
    response = fapi.upload_entities(project, workspace, entity_tsv, model)
    fapi._check_response_code(response, 200)

In [170]:
def get_gen3_terra_table_to_df(project: str, workspace: str, table_name: str, model="flexible") -> pd.DataFrame:
    table_df = get_terra_table_to_df(project, workspace, table_name)
    # Delete the first (entity id) column, as the Gen3 imported tables also contain
    # a column with the same name as the table that also contains the id and would
    # therefore be a duplicate column if the entity id column is retained.
    # I think these two id columns that are duplicates except for the column name
    # emerges from how Arrow is interpreting the PFB and forming the JSON to pass to RAWLs.
    # This should be reviewed/understood in more detail. For now, just drop entity id column
    # to avoid haveing duplicate columns.
    entity_id_column_name = f"entity:{table_name}_id"
    table_df.drop(columns=[entity_id_column_name], inplace=True)
    columns = table_df.columns
    for column in columns:
        if column in GEN3_TABLE_NAMES:
            rename_column(table_df, column, f"{column}_eid")
        else:
            rename_column(table_df, column, f"{table_name}_{column}")
    # Deduplicate "*_eid" columns
    # table_df = table_df.loc[:,~table_df.columns.duplicated()]
    return table_df


In [171]:
def get_terra_table_to_df(project: str, workspace: str, table_name: str, model="flexible") -> pd.DataFrame:
    table_df = pd.read_csv(io.StringIO(fapi.get_entities_tsv(project, workspace, table_name, model=model).text), sep='\t')
    return table_df

In [172]:
def delete_terra_table(project: str, workspace: str, table_name: str):
    # TODO There has to be better way than this to simply delete a table/entity-type.
    table_to_delete_df = get_terra_table_to_df(project, workspace, table_name)
    entity_id_column_name = f"entity:{table_name}_id"
    entity_id_series = table_to_delete_df[entity_id_column_name]
    num_chunks = entity_id_series.size / 100
    for chunk in  np.array_split(entity_id_series, num_chunks):
        response = fapi.delete_entity_type(project, workspace, table_name, chunk)
        fapi._check_response_code(response, 204)

In [173]:
def write_df_to_tsv_file(df: pd.DataFrame, filename: str) -> None:
    filename += "_" + datetime.now().strftime("%Y%m%d_%H%M%S%f") + ".tsv"
    with open(filename, mode="w") as tsv_file:
        tsv_string = df.to_csv(sep="\t", index=False)
        tsv_file.write(tsv_string)

# Temporary Test/Debug Code

In [174]:
# Temporary settings for running in PyCharm
os.environ['WORKSPACE_NAMESPACE']="anvil-stage-demo"
os.environ['GOOGLE_PROJECT']=os.environ['WORKSPACE_NAMESPACE']
os.environ['WORKSPACE_NAME']="mbaumann dev fiss debug playground 20190925 2141"
os.environ['WORKSPACE_BUCKET']="gs://fc-secure-55824595-dc0f-4b14-b5fb-9d7f9cf662be"

In [175]:
# Set and verify the Google billing project environment variable
BILLING_PROJECT_ID = os.environ['GOOGLE_PROJECT']
BILLING_PROJECT_ID

'anvil-stage-demo'

In [176]:
# Set and verify the Workspace name
WORKSPACE = os.environ['WORKSPACE_NAME']
WORKSPACE

'mbaumann dev fiss debug playground 20190925 2141'

In [177]:
# %%pixie_debugger

consolidated_table_name = "my_consolidated_table"
consolidate_gen3_geno_pheno_tables(consolidated_table_name)

In [178]:
# delete_terra_table(BILLING_PROJECT_ID, WORKSPACE, "my_consolidated_table")
