In [None]:
from UtilityLib import ProjectManager

class TCGAProject(ProjectManager):

  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

  # Overridden Methods START
  def json_to_df(self, *args, **kwargs):

    """JSON structure to DataFrame converter

    0|json: JSON file path (string)/object (dict)
    1|map: Dot notation of keys to parse (parsable using UL.deepkey) i.e., column to key map e.g. entity|0|metadata|header

    @return
    Pandas DataFrame object

    """
    _json = args[0] if len(args) > 0 else kwargs.get("json")
    _map = args[1] if len(args) > 1 else kwargs.get("map", None)
    _sep = args[2] if len(args) > 2 else kwargs.get("sep", '.')

    if not hasattr(kwargs, 'sep'):
      kwargs.update({"sep": _sep})

    if isinstance(_json, (str)):
      _json = self.read_json(_json)

    _result = []

    for _json_el in _json:
      _row = {}
      if _map and isinstance(_map, (dict)):
        # If column map is provided
        for _column, _dotkey in _map.items():
          _row[_column] = self.get_deep_key(_json_el, _dotkey, **kwargs)
      else:
        # If column map is not provided
        for _column, _value in _json.items():
          _row[_column] = _value
      _result.append(_row)

    return self.DF(_result)

  def get_deep_key(self, *args, **kwargs):
    _obj = args[0] if len(args) > 0 else kwargs.get("obj", {})
    _keys = args[1] if len(args) > 1 else kwargs.get("keys", ())
    _default = args[2] if len(args) > 2 else kwargs.get("default")
    _sep = args[3] if len(args) > 3 else kwargs.get("sep", "|")

    _instance_list = (tuple, set, list)
    _instance_dict = (dict)
    _instance_singluar = (str, int)

    _keys = _keys if isinstance(_keys, _instance_list) else _keys.split(_sep)

    for _k in _keys:
      # hasattr(, 'get') & str|int=> _dict key, int => list, tuple, or set
      if "*" in _k:
        _obj = list(_obj)
      elif isinstance(_obj, _instance_dict) and isinstance(_k, _instance_singluar):
        _obj = _obj.get(_k, _default)
      elif(isinstance(_obj, _instance_list) and (isinstance(_k, _instance_singluar) or _k.isnumeric())):
        _k = int(_k)
        if len(_obj) > _k:
          _obj = _obj[_k]

    return _obj

  def fix_column_names(self, *args, **kwargs):
    _df = args[0] if len(args) > 0 else kwargs.get("df")
    _df.columns = [self.text_to_slug(_col).replace('-', '_') for _col in _df.columns]
  # Overridden Methods END

  _gdc_manifest = None
  def parse_gdc_manifest(self):
    __gdm = self.pd_tsv(self.find_files(self.get_path(), "gdc_manifest*")[0])
    __gdm.columns = [_col.replace(' ', '_') for _col in __gdm.columns]
    self.pd_excel(self.get_path("gdc-manifest.xlsx"), __gdm, "Manifest")
    self._gdc_manifest = __gdm

  def _convert_file_set_to_df(self, _file_set, _chunk_path):
    _expression_df = None
    for _f in _file_set:
      _fn = self.filename(_f, with_ext=True)
      if _fn in self.config.exp.star_files:
        continue
      _df = self.pd_tsv(_f, skiprows=1)
      _df['FileName'] = _fn
      _expression_df = _df if _expression_df is None else self.PD.concat([_expression_df, _df], axis=0)
      self.config.exp.star_files.append(_fn)
    _expression_df.to_pickle(_chunk_path)

  file_expression = 'expression-star-count.pkl.gz'
  _expression_df = None
  def get_expression_data(self):
    _exp_path = self.get_path(self.file_expression)
    if self.exists(_exp_path):
      self._expression_df = self.unpickle(_exp_path)
    else:
      self.process_expression_data()

  def get_keymap(self, _type):
    return self.config.key_maps.get(_type)

  def set_json_parse_key_maps(self):
    self.config.key_maps.clinical = {
        'alcohol_history': 'exposures|0|alcohol_history',
        'case_id': 'case_id',
        'project_id': 'project|project_id',
        'submitter_id': 'submitter_id',
        'synchronous_malignancy': 'diagnoses|0|synchronous_malignancy',
        'ajcc_pathologic_stage': 'diagnoses|0|ajcc_pathologic_stage',
        'days_to_diagnosis': 'diagnoses|0|days_to_diagnosis',
        'treatment_type': 'diagnoses|0|treatments|0|treatment_type',
        'treatment_or_therapy': 'diagnoses|0|treatments|0|treatment_or_therapy',
        'tissue_or_organ_of_origin': 'diagnoses|0|tissue_or_organ_of_origin',
        'primary_diagnosis': 'diagnoses|0|primary_diagnosis',
        'age_at_diagnosis': 'diagnoses|0|age_at_diagnosis',
        'ajcc_pathologic_t': 'diagnoses|0|ajcc_pathologic_t',
        'ajcc_pathologic_n': 'diagnoses|0|ajcc_pathologic_n',
        'ajcc_pathologic_m': 'diagnoses|0|ajcc_pathologic_m',
        'classification_of_tumor': 'diagnoses|0|classification_of_tumor',
        'site_of_resection_or_biopsy': 'diagnoses|0|site_of_resection_or_biopsy',
        'tumor_grade': 'diagnoses|0|tumor_grade',
        'icd_10_code': 'diagnoses|0|icd_10_code',
        'days_to_birth': 'demographic|days_to_birth',
        'vital_status': 'demographic|vital_status',
        'race': 'demographic|race',
        'gender': 'demographic|gender',
        'ethnicity': 'demographic|ethnicity',
    }

    self.config.key_maps.biospecimen = {
        'case_id': 'case_id',
        'project_id': 'project|project_id',
        'submitter_id': 'submitter_id',
        'samples': 'samples'
      }

    self.config.key_maps.metadata = {
        "submitter_id": "submitter_id",
        # "data_category": "data_category",
        "data_format": "data_format",
        "associated_case_id": "associated_entities.0.case_id",
        # "file_name": "file_name",
        "file_id": "file_id",
        # "data_type": "data_type",
        "experimental_strategy": "experimental_strategy",
      }

  _dir_chunks = 'Read-Chunks'
  _size_chunks = 10
  test_chunks = None
  _total_chunks = None
  dir_tcga_downloads = "TCGA-Downloads"
  template_file_chunk = "File-Chunk-%s.pkl.gz"

  def _process_chunks(self):
    if not isinstance(self.config.exp.star_files, (list, tuple)):
      self.config.exp.star_files = []

    self.validate_dir(self.get_path(self._dir_chunks))
    self.init_multiprocessing()

    _expression_files = self.find_files(self.get_path(self.dir_tcga_downloads), "*/*tsv")
    _chunk_list = list(self.chunks(_expression_files, self._size_chunks))
    _chunk_list = _chunk_list[:self.test_chunks]
    self._total_chunks = len(_chunk_list)

    for _chunk_idx, _file_set in self.PB(enumerate(_chunk_list), total=self._total_chunks):
      _chunk_path = self.get_path(f'{self._dir_chunks}/{self.template_file_chunk}' % _chunk_idx)
      if self.exists(_chunk_path):
        continue

      if False:
        self._convert_file_set_to_df(_file_set, _chunk_path)
      else:
        self.queue_task(self._convert_file_set_to_df, _file_set, _chunk_path)

    self.process_queue()

  _combined_df = None
  def combine_chunks(self):
    """Combined all the read files
    This isn't working as dimension of the data is getting very huge
    """
    _pkl_dfs = self.find_files(self.get_path(self._dir_chunks), "*pkl.gz")
    _pkl_dfs_reads = []
    for _pkf in self.PB(_pkl_dfs):
        _pkl_dfs_reads.append(self.unpickle(_pkf))

    self._combined_df = self.PD.concat(_pkl_dfs_reads, axis=0)

  def get_chunk_files(self, ext='pkl.gz'):
    return self.find_files(self.get_path(self._dir_chunks), ext)

  sample_type_codes_map = None
  def set_gdc_sample_type_codes(self):
    self.config.path_sample_codes = self.get_path("sample_type_codes.pkl.gz")
    if not self.exists(self.config.path_sample_codes):
      _tables = self.PD.read_html("https://gdc.cancer.gov/resources-tcga-users/tcga-code-tables/sample-type-codes")
      __sc_df = _tables[-1]
      self.fix_column_names(__sc_df)
      __sc_df.Code = __sc_df.Code.apply(lambda x: f"{x:02d}")
      __sc_df = dict(zip(__sc_df.Code, __sc_df.Definition))
      self.pickle(self.config.path_sample_codes, __sc_df)

    self.sample_type_codes_map = self.unpickle(self.config.path_sample_codes)

  def sample_code_def(self, _code):
    return self.sample_type_codes_map.get(_code)

  def get_chunk_status(self):
    _qs = self.queue_task_status()
    _qs['open_files'] = self.get_open_file_descriptors()
    _qs['processed_chunks'] = len(self.get_chunk_files())
    _qs['total_chunks'] = self._total_chunks
    self.update_config()
    return _qs

  def process_expression_data(self):
    self._process_chunks()
    # Wait while all chunks are process using chunk count etc

  def parse_sample_details(self, _sample):
    """Parse Sample Details"""
    _sid = _sample.get('sample_type_id', "99")
    return {
        "sample_type_id": _sid,
        "sample_type_alt": self.sample_code_def(_sid),
        "sample_type": _sample.get('sample_type'),
        "tumor_descriptor": _sample.get('tumor_descriptor'),
        "sample_id": _sample.get('sample_id'),
        "specimen_type": _sample.get('specimen_type'),
        "is_ffpe": _sample.get('is_ffpe'),
        "preservation_method": _sample.get('preservation_method'),
        "tissue_type": _sample.get('tissue_type'),
      }

T0104 = TCGAProject(path_bases=[r"/mnt/DataDrive/MDD/T0104--Abhimanyu-GeneExpression/TCGA-BrCa-Abhimanyu-20240514"], test_chunks = 5)

"""Data Download"""
# T0104.process_expression_data()
# T0104.update_config()

"""Data Parsing"""
T0104.set_gdc_sample_type_codes()
T0104.set_json_parse_key_maps()


## Metadata

In [None]:
# GDC Client Downloaded Files
_gdc_manifest = T0104.pd_tsv(T0104.get_path('gdc_manifest.2024-05-14.txt'))
T0104.fix_column_names(_gdc_manifest)
print(_gdc_manifest.shape)
_gdc_manifest.head(2)


In [None]:
# Read Sample Sheet

_sample_sheet = T0104.pd_tsv(T0104.get_path("gdc_sample_sheet.2024-05-14.tsv"))
T0104.fix_column_names(_sample_sheet)
print(_sample_sheet.shape)
_sample_sheet.head(2)

In [None]:
_metadata = T0104.json_to_df(T0104.get_path("metadata.cohort.2024-05-14.json"), T0104.get_keymap('metadata'))
print(_metadata.shape)
_metadata.head(2)

In [None]:
_merged_meta_sampl = T0104.PD.merge(_sample_sheet, _metadata, left_on='File_ID', right_on='file_id')
_merged_meta_sampl.head(2)

In [None]:
_clinical = T0104.json_to_df(T0104.get_path("clinical.cohort.2024-05-14--COHORT.json"), T0104.get_keymap('clinical'), sep='|')
print(_clinical.shape)
_clinical.head(2)


In [None]:
_biospecimen = T0104.json_to_df(T0104.get_path("biospecimen.cohort.2024-05-14--COHORT.json"), T0104.get_keymap('biospecimen'), sep='|')
print(_biospecimen.shape)
_biospecimen.sample(2)

In [None]:
_merged_cli_bspeci = T0104.PD.merge(_biospecimen[['case_id', 'samples']], _clinical, on='case_id')
print(_merged_cli_bspeci.shape)
_merged_cli_bspeci.sample(5)

In [None]:
# Exapnd BioSpecimen Samples

_all_samples = []

for _sidx, _sample_det in _merged_cli_bspeci.iterrows():
  _sdict = _sample_det.to_dict()
  _slist = _sdict.pop('samples')
  for _sample in _slist:
    _sdata = T0104.parse_sample_details(_sample)
    _sdata.update(_sdict)
    _all_samples.append(_sdata)
  #   break
  # break

_all_samples_df = T0104.DF(_all_samples)
print(_all_samples_df.shape)
_all_samples_df.sample(2)

### Merged Metadata Mapping as Annotation

In [None]:
_annotated_ds = T0104.PD.merge(_all_samples_df, _merged_meta_sampl, left_on='case_id', right_on='associated_case_id')

T0104.config.path_sample_excel = T0104.get_path("T0104--sample-annotations.xlsx")

T0104.pd_excel(T0104.config.path_sample_excel, _annotated_ds, 'Annotated-Samples')

### Sample Filtering

In [None]:
print(_all_samples_df.columns)
_all_samples_df.head(2)

In [None]:
print(_merged_meta_sampl.columns)
_merged_meta_sampl.head(2)

In [None]:
_assoc_cid = set(_merged_meta_sampl.associated_case_id)
_smpl_cid = set(_all_samples_df.case_id)

len(_assoc_cid), len(_smpl_cid)

In [None]:
len(_assoc_cid | _smpl_cid)

In [None]:
_filter_smpl_typ =_annotated_ds.sample_type.isin(['Primary Tumor', 'Solid Tissue Normal', 'Metastatic'])
_filter_spcmn_typ =_annotated_ds.sample_type.isin(['Solid Tissue'])
_filter_ajcc_pthl_stg = _annotated_ds.ajcc_pathologic_stage.isin(['Stage IA', 'Stage IIB', 'Stage IIA', 'Stage IIIA', 'Stage IV',
       'Stage IIIC', 'Stage I', 'Stage IIIB', 'Stage IB',
       'Stage II', 'Stage X', 'Stage III'])
_ad_f1 = _annotated_ds[_filter_smpl_typ & _filter_ajcc_pthl_stg].copy()

# T0104.pd_excel(T0104.config.path_sample_excel, _ad_f1, 'Filtered-Samples')

print(_annotated_ds.shape, _ad_f1.shape)


In [None]:
_df_samples_Normal = _ad_f1[_ad_f1.tissue_type.eq('Normal')].copy()
_df_samples_Tumor = _ad_f1[_ad_f1.tissue_type.eq('Tumor')].copy()

In [None]:
_df_samples_Normal.shape, _df_samples_Tumor.shape

In [None]:
_ad_f1.tissue_type.value_counts()

In [None]:
_uvals = []
for _col in _ad_f1.columns:
    _uvals.append((_col, _ad_f1[_col].nunique()))

_uvals_df = T0104.DF(_uvals, columns=['Column', 'Unique_Values'])
_uvals_df[_uvals_df.Unique_Values.between(2, 1290)]

### Tumor vs Normal Expression Calculations

In [None]:
_list_files_Normal = _df_samples_Normal.File_Name.to_list()
_list_files_Tumor = _df_samples_Tumor.File_Name.to_list()

In [None]:
# Function to merge statistics of gene expression

T0104.require('numpy', 'NP')

def merge_gxp_stats(_exp1, _exp2):
  _key_gid = 'gene_id'
  if _exp1.get(_key_gid) == _exp2.get(_key_gid):
    _key_count = 'FPKM_count'
    _key_min = 'FPKM_min'
    _key_mean = 'FPKM_mean'
    _key_max = 'FPKM_max'
    _key_std = 'FPKM_std'
    _key_median = 'FPKM_median'
    # Combine counts
    _final_count = _exp1[_key_count] + _exp2[_key_count]

    # Calculate combined mean
    _final_mean = (_exp1[_key_mean] * _exp1[_key_count] + _exp2[_key_mean] * _exp2[_key_count]) / _final_count

    # Calculate combined standard deviation
    _final_variance = (
        ((_exp1[_key_count] - 1) * _exp1[_key_std]**2 + (_exp2[_key_count] - 1) * _exp2[_key_std]**2) +
        (_exp1[_key_count] * (_exp1[_key_mean] - _final_mean)**2 + _exp2[_key_count] * (_exp2[_key_mean] - _final_mean)**2)
    ) / (_final_count - 1)
    _final_std = T0104.NP.sqrt(_final_variance)

    # Calculate combined min and max
    _final_min = min(_exp1[_key_min], _exp2[_key_min])
    _final_max = max(_exp1[_key_max], _exp2[_key_max])

    # Calculate combined median
    _final_values = [_exp1[_key_median]] * _exp1[_key_count] + [_exp2[_key_median]] * _exp2[_key_count]
    _final_median = T0104.NP.median(_final_values)

    # Create the combined dictionary
    _final_dict = {
      _key_gid: _exp1.get(_key_gid),
      _key_min: _final_min,
      _key_max: _final_max,
      _key_std: _final_std,
      _key_count: _final_count,
      _key_mean: _final_mean,
      _key_median: _final_median
    }

    return _final_dict
  else:
    return _exp1

def store_gene_exp(dtyp, _gdata):
  _gene_id = _gdata.gene_id
  _gexp = _gdata.to_dict()
  if not _gene_id in T0104.config.expt3[dtyp]:
    T0104.config.expt3[dtyp][_gene_id] = _gexp
  else:
    _stats = merge_gxp_stats(T0104.config.expt3[dtyp][_gene_id], _gexp)
    T0104.config.expt3[dtyp][_gene_id] = _stats

In [None]:
_chunks = T0104.get_chunk_files()

for _c in T0104.PB(_chunks):
  _cdf = T0104.unpickle(_c)
  _cdf = _cdf[~_cdf.gene_id.str.startswith('N_')]
  _cdf_Tumor = _cdf[_cdf.FileName.isin(_list_files_Tumor)]
  _cdf_Normal = _cdf[_cdf.FileName.isin(_list_files_Normal)]

  _cdf_Tumor_summ =  _cdf_Tumor.groupby('gene_id').agg(
      # Files = ('FileName', list),
      FPKM_min = ('fpkm_uq_unstranded', 'min'),
      FPKM_max = ('fpkm_uq_unstranded', 'max'),
      FPKM_std = ('fpkm_uq_unstranded', 'std'),
      FPKM_count = ('fpkm_uq_unstranded', 'count'),
      FPKM_mean = ('fpkm_uq_unstranded', 'mean'),
      FPKM_median = ('fpkm_uq_unstranded', 'median'),
  ).reset_index()

  _cdf_Normal_summ =  _cdf_Normal.groupby('gene_id').agg(
      # Files = ('FileName', list),
      FPKM_min = ('fpkm_uq_unstranded', 'min'),
      FPKM_max = ('fpkm_uq_unstranded', 'max'),
      FPKM_std = ('fpkm_uq_unstranded', 'std'),
      FPKM_count = ('fpkm_uq_unstranded', 'count'),
      FPKM_mean = ('fpkm_uq_unstranded', 'mean'),
      FPKM_median = ('fpkm_uq_unstranded', 'median'),
  ).reset_index()

  for _ix, _ir in _cdf_Tumor_summ.iterrows():
    store_gene_exp('Tumor', _ir)

  for _ix, _ir in _cdf_Normal_summ.iterrows():
    store_gene_exp('Normal', _ir)


In [None]:
_Tumor_exp = T0104.DF(T0104.config.expt3.Tumor.values())
_Normal_exp = T0104.DF(T0104.config.expt3.Normal.values())

In [None]:
_compare_ids = _Tumor_exp.head(5).gene_id

In [None]:
_Normal_exp[_Normal_exp.gene_id.isin(_compare_ids)]

In [None]:
_Tumor_exp[_Tumor_exp.gene_id.isin(_compare_ids)]