# NAACCR Tumor Registry Data

This is both a notebook and a module, sync'd using [jupytext][]. See also

  - README for motivation and usage
  - CONTRIBUTING for coding style etc.
    - Note especially **ISSUE**, **TODO** and **IDEA** markers

[jupytext]: https://github.com/mwouts/jupytext

### Preface: PyData Tools: Pandas, PySpark



In [1]:
# python stdlib
from gzip import GzipFile
from importlib import resources as res
from pathlib import Path as Path_T
from sys import stderr
from typing import ContextManager, Dict, Iterator, List, NoReturn
from typing import Optional as Opt, Union, cast
from xml.etree import ElementTree as XML
import logging

In [2]:
# 3rd party code: PyData
from pyspark.sql import SparkSession as SparkSession_T, Window
from pyspark.sql import types as ty, functions as func
from pyspark.sql.dataframe import DataFrame
from pyspark import sql as sq
import numpy as np   # type: ignore
import pandas as pd  # type: ignore

In [3]:
# 3rd party: naaccr-xml
import naaccr_xml_samples

import bc_qa

In [4]:
# this project
from sql_script import SqlScript
from tumor_reg_ont import NAACCR1, NAACCR_Layout, LOINC_NAACCR, NAACCR_R, NAACCR_I2B2
from tumor_reg_ont import create_objects, ddictDF, eltSchema, xmlDF, eltDict
import heron_load

In [5]:
log = logging.getLogger(__name__)

if __name__ == '__main__' or True:
    logging.basicConfig(level=logging.INFO, stream=stderr)
    log.info('NAACCR exploration...')

INFO:__main__:NAACCR exploration...


In [6]:
log.info('%s', dict(pandas=pd.__version__))

INFO:__main__:{'pandas': '0.24.2'}


## I/O Access: local files, Spark / Hive metastore

In a notebook context, we have `__name__ == '__main__'`.

Otherwise, we maintain ocap discipline (see CONTRIBUTING)
and don't import powerful objects.

 - **TODO/WIP**: use `_spark` and `_cwd`; i.e. be sure not to "export" ambient authority.

In [7]:
IO_TESTING = __name__ == '__main__'
_spark = cast(SparkSession_T, None)
if IO_TESTING:
    if 'spark' in globals():
        _spark = spark  # type: ignore  # noqa
        del spark       # type: ignore
    else:
        def _make_spark_session(appName: str = "tumor_reg_data") -> SparkSession_T:
            """
            ref:
            https://spark.apache.org/docs/latest/sql-getting-started.html
            """
            from pyspark.sql import SparkSession

            return SparkSession \
                .builder \
                .appName(appName) \
                .getOrCreate()
        _spark = _make_spark_session()

    def _get_cwd() -> Path_T:
        # ISSUE: ambient
        from pathlib import Path
        return Path('.')

    _cwd = _get_cwd()
    log.info('cwd: %s', _cwd.resolve())

INFO:__main__:cwd: /home/dconnolly/projects/naaccr-tumor-data/notebooks


The `spark` global is available when we launch as
`PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook
   pyspark ...`.

In [8]:
IO_TESTING and _spark


def quiet_logs(spark: SparkSession_T) -> None:
    sc = spark.sparkContext
    # ack: FDS Aug 2015 https://stackoverflow.com/a/32208445
    logger = sc._jvm.org.apache.log4j  # type: ignore
    logger.LogManager.getLogger("org"). setLevel(logger.Level.WARN)
    logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)

In [9]:
if IO_TESTING:
    log.info('spark web UI: %s', _spark.sparkContext.uiWebUrl)
    quiet_logs(_spark)

INFO:__main__:spark web UI: http://10.14.6.24:4044


## `naaccr-xml` Data Dictionary

In [10]:
if IO_TESTING:
    ddictDF(_spark).createOrReplaceTempView('ndd180')
IO_TESTING and _spark.table('ndd180').limit(5).toPandas().set_index('naaccrId')

Unnamed: 0_level_0,naaccrNum,naaccrName,parentXmlElement,dataType,padding,trim,startColumn,length,allowUnlimitedText,sourceOfStandard,recordTypes
naaccrId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
recordType,10,Record Type,NaaccrData,,,,1,1,,,"A,M,C,I"
registryType,30,Registry Type,NaaccrData,digits,,,2,1,,,"A,M,C,I"
naaccrRecordVersion,50,NAACCR Record Version,NaaccrData,digits,,,17,3,,,"A,M,C,I"
npiRegistryId,45,NPI--Registry ID,NaaccrData,digits,,,20,10,,,"A,M,C,I"
registryId,40,Registry ID,NaaccrData,digits,leftZero,,30,10,,,"A,M,C,I"


In [11]:
if IO_TESTING:
    _spark.sql("""create or replace temporary view current_task as select 'abc' task_id from (values('X'))""")
    _ont = NAACCR_I2B2.ont_view_in(_spark)  # TODO: seer recode in NAACCR_ONTOLOGY
IO_TESTING and _ont.limit(5).toPandas()



INFO:tumor_reg_ont:naaccr_concepts_load.sql: section = DataFrame[sectionid: bigint, section: string]


INFO:tumor_reg_ont:naaccr_concepts_load.sql: tumor_item_type = DataFrame[naaccrNum: bigint, sectionId: bigint, section: string, naaccrId: string, naaccrName: string, length: bigint, source: string, loinc_num: string, scale_typ: string, AnswerListId: string, scheme: string, valtype_cd: string, phi_id_kind: string]


INFO:tumor_reg_ont:naaccr_concepts_load.sql: code_labels = DataFrame[item: bigint, name: string, scheme: string, code: string, label: string, means_missing: boolean, description: string]


INFO:tumor_reg_ont:naaccr_concepts_load.sql: seer_site_terms = DataFrame[hlevel: int, path: string, name: string, basecode: string, visualattributes: string]


INFO:tumor_reg_ont:naaccr_concepts_load.sql: create naaccr_code_values


INFO:tumor_reg_ont:naaccr_concepts_load.sql: create naaccr_ont_aux


INFO:tumor_reg_ont:naaccr_concepts_load.sql: create naaccr_ont_aux_seer


INFO:tumor_reg_ont:naaccr_concepts_load.sql: create naaccr_ontology


Unnamed: 0,c_hlevel,c_fullname,c_name,c_basecode,c_dimcode,c_comment,c_visualattributes,c_synonym_cd,c_facttablecolumn,c_tablename,c_columnname,c_columndatatype,c_operator,m_applied_path,update_date,import_date,sourcesystem_cd
0,1,\i2b2\naaccr\,Cancer Cases (NAACCR Hierarchy),,\i2b2\naaccr\,abc,FA,N,concept_cd,concept_dimension,CONCEPT_PATH,T,like,@,2019-09-09 17:10:29.503,2019-09-09 17:10:29.503,tumor_registry@kumed.com
1,2,\i2b2\naaccr\S:1 Cancer Identification\,01 Cancer Identification,,\i2b2\naaccr\S:1 Cancer Identification\,abc,FA,N,concept_cd,concept_dimension,CONCEPT_PATH,T,like,@,2019-09-09 17:10:29.503,2019-09-09 17:10:29.503,tumor_registry@kumed.com
2,2,\i2b2\naaccr\S:2 Demographic\,02 Demographic,,\i2b2\naaccr\S:2 Demographic\,abc,FA,N,concept_cd,concept_dimension,CONCEPT_PATH,T,like,@,2019-09-09 17:10:29.503,2019-09-09 17:10:29.503,tumor_registry@kumed.com
3,2,\i2b2\naaccr\S:3 Edit Overrides/Conversion His...,03 Edit Overrides/Conversion History/System Admin,,\i2b2\naaccr\S:3 Edit Overrides/Conversion His...,abc,FA,N,concept_cd,concept_dimension,CONCEPT_PATH,T,like,@,2019-09-09 17:10:29.503,2019-09-09 17:10:29.503,tumor_registry@kumed.com
4,2,\i2b2\naaccr\S:4 Follow-up/Recurrence/Death\,04 Follow-up/Recurrence/Death,,\i2b2\naaccr\S:4 Follow-up/Recurrence/Death\,abc,FA,N,concept_cd,concept_dimension,CONCEPT_PATH,T,like,@,2019-09-09 17:10:29.503,2019-09-09 17:10:29.503,tumor_registry@kumed.com


## tumor_item_type: numeric /  date / nominal / text; identifier?

In [12]:
IO_TESTING and _spark.sql('''
select *
from ndd180 as idef
''').limit(8).toPandas()

Unnamed: 0,naaccrId,naaccrNum,naaccrName,parentXmlElement,dataType,padding,trim,startColumn,length,allowUnlimitedText,sourceOfStandard,recordTypes
0,recordType,10,Record Type,NaaccrData,,,,1,1,,,"A,M,C,I"
1,registryType,30,Registry Type,NaaccrData,digits,,,2,1,,,"A,M,C,I"
2,naaccrRecordVersion,50,NAACCR Record Version,NaaccrData,digits,,,17,3,,,"A,M,C,I"
3,npiRegistryId,45,NPI--Registry ID,NaaccrData,digits,,,20,10,,,"A,M,C,I"
4,registryId,40,Registry ID,NaaccrData,digits,leftZero,,30,10,,,"A,M,C,I"
5,tumorRecordNumber,60,Tumor Record Number,Tumor,digits,leftZero,,40,2,,,"A,M,C,I"
6,patientIdNumber,20,Patient ID Number,Patient,digits,leftZero,,42,8,,,"A,M,C,I"
7,patientSystemIdHosp,21,Patient System ID-Hosp,Tumor,digits,leftZero,,50,8,,,"A,M,C,I"


In [13]:
if IO_TESTING:
    NAACCR_R.field_info_in(_spark)
IO_TESTING and _spark.table('field_info').limit(5).toPandas()

Unnamed: 0,item,name,type
0,10,recordType,factor
1,20,patientIdNumber,character
2,21,patientSystemIdHosp,character
3,30,registryType,factor
4,35,finCodingSystem,character


`WerthPADOH/naaccr` has complete coverage:

In [14]:
# TODO: turn this into a doctest, independent of Spark
IO_TESTING and _spark.sql('''
with check as (
select case when r.item is null then 0 else 1 end as has_r
from ndd180 v18
left join field_info r on r.item = v18.naaccrNum
)
select has_r, count(*) from check
group by has_r

order by has_r
''').toPandas()

Unnamed: 0,has_r,count(1)
0,1,772


Werth assigns a `type` to each item:

In [15]:
IO_TESTING and _spark.sql('''
select rl.section, type, nd.length, count(*), collect_list(rl.`naaccr-item-num`), collect_list(naaccrId)
from ndd180 nd
left join field_info f on f.item = nd.naaccrNum
left join record_layout rl on rl.`naaccr-item-num` = nd.naaccrNum
group by section, type, nd.length
order by section, type, nd.length
''').toPandas()

Unnamed: 0,section,type,length,count(1),collect_list(naaccr-item-num),collect_list(naaccrId)
0,,census_block,1,4,[],"[censusBlockGroup2010, censusBlockGrp197090, c..."
1,,census_tract,6,4,[],"[censusTract2000, censusTract19708090, censusT..."
2,,character,1,26,[],"[subsqRx2ndScopeLnSu, subsqRx3rdScopeLnSu, beh..."
3,,character,2,12,[],"[subsqRx2ndCourseSurg, subsqRx4thCourseSurg, s..."
4,,character,3,1,[],[eodTumorSize]
5,,character,4,3,[],"[histologyIcdO2, histologyIcdO1, histologicTyp..."
6,,county,3,4,[],"[countyAtDxGeocode1990, countyAtDxGeocode2000,..."
7,,sentineled_integer,2,2,[],"[regionalNodesExamined, regionalNodesPositive]"
8,Cancer Identification,Date,8,3,"[443, 390, 445]","[dateConclusiveDx, dateOfDiagnosis, dateOfMult..."
9,Cancer Identification,character,2,3,"[439, 448, 391]","[dateOfMultTumorsFlag, dateConclusiveDxFlag, d..."


### Mixednaaccr-xml, LOINC, and Werth

In [16]:
if IO_TESTING:
    _spark.createDataFrame(NAACCR_I2B2.tumor_item_type).createOrReplaceTempView('tumor_item_type')

#### Any missing?

In [17]:
IO_TESTING and _spark.sql('''
select *
from tumor_item_type
where valtype_cd is null or  scale_typ is null
''').toPandas().sort_values(['sectionId', 'naaccrNum']).reset_index(drop=True)

Unnamed: 0,naaccrNum,sectionId,section,naaccrId,naaccrName,length,source,loinc_num,scale_typ,AnswerListId,scheme,valtype_cd,phi_id_kind


#### Ambiguous valtype_cd?

In [18]:
IO_TESTING and _spark.sql('''
select naaccrId, length, count(distinct valtype_cd), collect_list(valtype_cd)
from tumor_item_type
group by naaccrId, length
having count(distinct valtype_cd) > 1
''').toPandas()

Unnamed: 0,naaccrId,length,count(DISTINCT valtype_cd),collect_list(valtype_cd)


**ISSUE: LOINC mapping is ambiguous!**

In [19]:
IO_TESTING and _spark.sql('''
select naaccrId, count(distinct valtype_cd), collect_list(valtype_cd), collect_list(loinc_num)
from tumor_item_type
group by naaccrId
having count(*) > 1
''').toPandas()

Unnamed: 0,naaccrId,count(DISTINCT valtype_cd),collect_list(valtype_cd),collect_list(loinc_num)


In [20]:
def csv_meta(dtypes: Dict[str, np.dtype], path: str,
             context: str = 'http://www.w3.org/ns/csvw') -> Dict[str, object]:
    # ISSUE: dead code? obsolete in favor of _fixna()?
    def xlate(dty: np.dtype) -> str:
        if dty.kind == 'i':
            return 'number'
        elif dty.kind == 'O':
            return 'string'
        raise NotImplementedError(dty.kind)

    cols = [
        {"titles": name,
         "datatype": xlate(dty)}
        for name, dty in dtypes.items()
    ]
    return {"@context": context,
            "url": path,
            "tableSchema": {
                "columns": cols
            }}

In [21]:
def csv_spark_schema(columns: List[Dict[str, str]]) -> ty.StructType:
    """
    Note: limited to exactly 1 titles per column
    IDEA: expand to boolean
    IDEA: nullable / required
    """
    def oops(what: object) -> NoReturn:
        raise NotImplementedError(what)
    fields = [
        ty.StructField(
            name=col['titles'],
            dataType=ty.IntegerType() if col['datatype'] == 'number'
            else ty.StringType() if col['datatype'] == 'string'
            else oops(col))
        for col in columns]
    return ty.StructType(fields)

# IDEA: csv_spark_schema(csv_meta(x.dtypes, 'tumor_item_type.csv')['tableSchema']['columns'])

## Coded Concepts

In [22]:
if IO_TESTING:
    LOINC_NAACCR.answers_in(_spark)

IO_TESTING and _spark.table('loinc_naaccr_answers').where('code_value = 380').limit(5).toPandas()

Unnamed: 0,loinc_number,component,code_system,code_value,answerlistid,answerlistname,answer_code,sequence_no,answer_string
0,21853-7,Sequence number central.patient,NAACCR_ID,380,LL210-6,CR_380_Cancer sequence number,0,1,One primary only
1,21853-7,Sequence number central.patient,NAACCR_ID,380,LL210-6,CR_380_Cancer sequence number,1,2,First of one or more primaries
2,21853-7,Sequence number central.patient,NAACCR_ID,380,LL210-6,CR_380_Cancer sequence number,2,3,Second of two or more primaries
3,21853-7,Sequence number central.patient,NAACCR_ID,380,LL210-6,CR_380_Cancer sequence number,98,4,Case of carcinoma in situ of the cervix diagno...
4,21853-7,Sequence number central.patient,NAACCR_ID,380,LL210-6,CR_380_Cancer sequence number,99,5,Unspecified sequence number


#### Werth Code Values

In [23]:
if IO_TESTING:
    _spark.createDataFrame(NAACCR_R.field_code_scheme).createOrReplaceTempView('field_code_scheme')
    _spark.createDataFrame(NAACCR_R.code_labels()).createOrReplaceTempView('code_labels')
IO_TESTING and _spark.table('code_labels').limit(5).toPandas().set_index(['item', 'name', 'scheme', 'code'])

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,label,means_missing,description
item,name,scheme,code,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
10,recordType,recordType,I,incidence,False,Incidence-only record type (nonconfidential co...
10,recordType,recordType,C,confidential,False,Confidential record type (incidence record plu...
10,recordType,recordType,A,abstract,False,Full case Abstract record type (incidence and ...
10,recordType,recordType,U,update,False,Correction/ Update record type (short format r...
10,recordType,recordType,M,modified,False,Record Modified since previous submission to c...


## NAACCR XML Data

In [24]:
class NAACCR2:
    s100x = XML.parse(GzipFile(fileobj=res.open_binary(  # type: ignore # typeshed/issues/2580  # noqa
        naaccr_xml_samples, 'naaccr-xml-sample-v180-incidence-100.xml.gz')))

    @classmethod
    def s100t(cls) -> ContextManager[Path_T]:
        """
        TODO: check in results of converting from XML sample
        using `java -jar ~/opt/naaccr-xml-utility-6.2/lib/naaccr-xml-utility.jar`  # noqa
        """
        return res.path(
            naaccr_xml_samples, 'naaccr-xml-sample-v180-incidence-100.txt')

In [25]:
def tumorDF(spark: SparkSession_T, doc: XML.ElementTree) -> DataFrame:
    rownum = 0
    ns = {'n': 'http://naaccr.org/naaccrxml'}

    to_parent = {c: p for p in doc.iter() for c in p}

    def tumorItems(tumorElt: XML.Element, schema: ty.StructType,
                   simpleContent: bool = True) -> Iterator[Dict[str, object]]:
        nonlocal rownum
        assert simpleContent
        rownum += 1
        patElt = to_parent[tumorElt]
        ndataElt = to_parent[patElt]
        for elt in [ndataElt, patElt, tumorElt]:
            for item in elt.iterfind('./n:Item', ns):
                for itemRecord in eltDict(item, schema, simpleContent):
                    yield dict(itemRecord, rownum=rownum)

    itemSchema = eltSchema(NAACCR1.item_xsd, simpleContent=True)
    rownumField = ty.StructField('rownum', ty.IntegerType(), False)
    tumorItemSchema = ty.StructType([rownumField] + itemSchema.fields)
    data = xmlDF(spark, schema=tumorItemSchema, doc=doc, path='.//n:Tumor',
                 eltRecords=tumorItems,
                 ns={'n': 'http://naaccr.org/naaccrxml'},
                 simpleContent=True)
    return data.drop('naaccrNum')


IO_TESTING and (tumorDF(_spark, NAACCR2.s100x)
                .toPandas().sort_values(['naaccrId', 'rownum']).head(5))

Unnamed: 0,rownum,naaccrId,value
54,1,ageAtDiagnosis,66
114,2,ageAtDiagnosis,80
189,3,ageAtDiagnosis,30
297,4,ageAtDiagnosis,42
357,5,ageAtDiagnosis,71


What columns are covered by the 100 tumor sample?

In [26]:
IO_TESTING and (tumorDF(_spark, NAACCR2.s100x)  # type: ignore
                .select('naaccrId').distinct().sort('naaccrId')
                .toPandas().naaccrId.values)

array(['ageAtDiagnosis', 'behaviorCodeIcdO3', 'birthplaceCountry',
       'birthplaceState', 'causeOfDeath', 'censusTrCertainty2000',
       'censusTrCertainty2010', 'censusTrPovertyIndictr',
       'censusTract2000', 'censusTract2010', 'computedEthnicity',
       'computedEthnicitySource', 'csExtension', 'csLymphNodes',
       'csLymphNodesEval', 'csMetsAtDx', 'csMetsAtDxBone',
       'csMetsAtDxBrain', 'csMetsAtDxLiver', 'csMetsAtDxLung',
       'csMetsEval', 'csSiteSpecificFactor1', 'csSiteSpecificFactor10',
       'csSiteSpecificFactor11', 'csSiteSpecificFactor12',
       'csSiteSpecificFactor13', 'csSiteSpecificFactor14',
       'csSiteSpecificFactor15', 'csSiteSpecificFactor16',
       'csSiteSpecificFactor17', 'csSiteSpecificFactor18',
       'csSiteSpecificFactor19', 'csSiteSpecificFactor2',
       'csSiteSpecificFactor20', 'csSiteSpecificFactor21',
       'csSiteSpecificFactor22', 'csSiteSpecificFactor23',
       'csSiteSpecificFactor24', 'csSiteSpecificFactor25',
       'csSi

In [27]:
def naaccr_pivot(ddict: DataFrame, skinny: DataFrame, key_cols: List[str],
                 pivot_on: str = 'naaccrId', value_col: str = 'value',
                 start: str = 'startColumn') -> DataFrame:
    groups = skinny.select(pivot_on, value_col, *key_cols).groupBy(*key_cols)
    wide = groups.pivot(pivot_on).agg(func.first(value_col))
    start_by_id = {id: start
                   for (id, start) in ddict.select(pivot_on, start).collect()}
    sorted_cols = sorted(wide.columns, key=lambda id: start_by_id.get(id, -1))
    return wide.select(cast(List[Union[sq.Column, str]], sorted_cols))


IO_TESTING and (naaccr_pivot(ddictDF(_spark),
                             tumorDF(_spark, NAACCR2.s100x),
                             ['rownum'])
                .limit(3).toPandas())

Unnamed: 0,rownum,tumorRecordNumber,patientIdNumber,censusTract2000,censusTrCertainty2000,censusTract2010,censusTrCertainty2010,maritalStatusAtDx,race1,race2,...,rxSummSurgSite9802,rxSummScopeReg9802,rxSummSurgOth9802,seerCodingSysCurrent,seerTypeOfFollowUp,seerRecordNumber,dateOfLastContact,vitalStatus,causeOfDeath,icdRevisionNumber
0,31,1,38,999999,9,999999,9,1,1,88,...,,,,9,2,1,20170308,1,0,0
1,85,1,89,999999,9,999999,9,3,1,88,...,,,,F,2,1,20190311,1,0,0
2,65,1,69,999999,9,999999,9,4,1,88,...,,,,F,2,1,20190218,1,0,0


## NAACCR Flat File v18

In [28]:
if IO_TESTING:
    with NAACCR2.s100t() as _tr_file:
        log.info('tr_file: %s', _tr_file)
        _naaccr_text_lines = _spark.read.text(str(_tr_file))
else:
    _naaccr_text_lines = cast(DataFrame, None)

INFO:__main__:tr_file: /home/dconnolly/projects/naaccr-tumor-data/naaccr_xml_samples/naaccr-xml-sample-v180-incidence-100.txt


In [29]:
IO_TESTING and _naaccr_text_lines.rdd.getNumPartitions()

1

In [30]:
IO_TESTING and _naaccr_text_lines.limit(5).toPandas()

Unnamed: 0,value
0,I 180 0100000...
1,I 180 0100000...
2,I 180 0100000...
3,I 180 0100000...
4,I 180 0100000...


In [31]:
def non_blank(df: pd.DataFrame) -> pd.DataFrame:
    return df[[
        col for col in df.columns
        if (df[col].str.strip() > '').any()
    ]]

In [32]:
def naaccr_read_fwf(flat_file: DataFrame, itemDefs: DataFrame,
                    value_col: str = 'value',
                    exclude_pfx: str = 'reserved') -> DataFrame:
    """
    @param flat_file: as from spark.read.text()
                      typically with .value
    @param itemDefs: see ddictDF. ISSUE: should just use static CSV data now.
    """
    fields = [
        func.substring(flat_file[value_col],
                       item.startColumn, item.length).alias(item.naaccrId)
        for item in itemDefs.collect()
        if not item.naaccrId.startswith(exclude_pfx)
    ]  # type: List[Union[sq.Column, str]]
    return flat_file.select(fields)


_extract = cast(DataFrame, None)  # for static analysis when not IO_TESTING
if IO_TESTING:
    _extract = naaccr_read_fwf(_naaccr_text_lines, ddictDF(_spark))
    _extract.createOrReplaceTempView('naaccr_extract')
# _extract.explain()
IO_TESTING and non_blank(_extract.limit(5).toPandas())

Unnamed: 0,recordType,naaccrRecordVersion,tumorRecordNumber,patientIdNumber,censusTract2000,censusTrCertainty2000,censusTract2010,censusTrCertainty2010,maritalStatusAtDx,race1,...,rxSummBrm,rxSummOther,rxSummSystemicSurSeq,seerCodingSysCurrent,seerTypeOfFollowUp,seerRecordNumber,dateOfLastContact,vitalStatus,causeOfDeath,icdRevisionNumber
0,I,180,1,10,999999,9,999999,9,5,2,...,99,9,9,F,2,1,20171019,1,0,0
1,I,180,1,11,999999,9,999999,9,6,1,...,99,9,9,F,2,1,20180710,1,0,0
2,I,180,1,12,999999,9,999999,9,2,1,...,99,9,9,D,2,1,20131216,1,0,0
3,I,180,1,13,999999,9,999999,9,2,1,...,99,9,9,F,2,1,20180916,1,0,0
4,I,180,1,14,999999,9,999999,9,2,1,...,99,9,9,F,2,1,20170328,1,0,0


In [33]:
def cancerIdSample(spark: SparkSession_T, cache: Path_T, tumors: DataFrame,
                   portion: float = 1.0, cancerID: int = 1) -> DataFrame:
    """Cancer Identification items from a sample

    """
    cols = NAACCR_I2B2.tumor_item_type
    cols = cols[cols.sectionId == cancerID]
    colnames = cols.naaccrId.values.tolist()
    # TODO: test data for morphTypebehavIcdO2 etc.
    colnames = [cn for cn in colnames if cn in tumors.columns]
    return tumors.sample(False, portion).select(colnames)


if False and IO_TESTING:
    _cancer_id = cancerIdSample(_spark, _cwd / 'naaccr_ddict', _extract)

False and IO_TESTING and non_blank(_cancer_id.limit(15).toPandas())

False

In [34]:
False and IO_TESTING and _cancer_id.toPandas().describe()

False

## NAACCR Dates

 - **ISSUE**: hide date flags in i2b2? They just say why a date is missing, which doesn't seem worth the screenspace.

In [35]:
def naaccr_dates(df: DataFrame, date_cols: List[str],
                 keep: bool = False) -> DataFrame:
    orig_cols = df.columns
    for dtcol in date_cols:
        strcol = dtcol + '_'
        df = df.withColumnRenamed(dtcol, strcol)
        dt = func.substring(func.concat(func.trim(df[strcol]), func.lit('0101')), 1, 8)
        # df = df.withColumn(dtcol + '_str', dt)
        dt = func.to_date(dt, 'yyyyMMdd')
        df = df.withColumn(dtcol, dt)
    if not keep:
        df = df.select(cast(Union[sq.Column, str], orig_cols))
    return df


IO_TESTING and naaccr_dates(
    _extract.select(['dateOfDiagnosis', 'dateOfLastContact']),
    ['dateOfDiagnosis', 'dateOfLastContact'],
    keep=True).limit(10).toPandas()

Unnamed: 0,dateOfDiagnosis_,dateOfLastContact_,dateOfDiagnosis,dateOfLastContact
0,20171019,20171019,2017-10-19,2017-10-19
1,20180710,20180710,2018-07-10,2018-07-10
2,20131216,20131216,2013-12-16,2013-12-16
3,20180916,20180916,2018-09-16,2018-09-16
4,20170328,20170328,2017-03-28,2017-03-28
5,20170902,20170902,2017-09-02,2017-09-02
6,20100425,20100425,2010-04-25,2010-04-25
7,20161105,20161105,2016-11-05,2016-11-05
8,20180420,20180420,2018-04-20,2018-04-20
9,20170210,20170210,2017-02-10,2017-02-10


### Strange dates: TODO?

In [36]:
def strange_dates(extract: DataFrame) -> DataFrame:
    x = naaccr_dates(extract.select(['dateOfDiagnosis']),
                     ['dateOfDiagnosis'], keep=True)
    x = x.withColumn('dtlen', func.length(func.trim(x.dateOfDiagnosis_)))
    x = x.where(x.dtlen > 0)
    x = x.withColumn('cc', func.substring(func.trim(x.dateOfDiagnosis_), 1, 2))

    return x.where(
        ~(x.cc.isin(['19', '20'])) |
        ((x.dtlen < 8) & (x.dtlen > 0)))


IO_TESTING and (strange_dates(_extract)
                .toPandas().groupby(['dtlen', 'cc']).count())

Unnamed: 0_level_0,Unnamed: 1_level_0,dateOfDiagnosis_,dateOfDiagnosis
dtlen,cc,Unnamed: 2_level_1,Unnamed: 3_level_1


## Patients, Tumors, Unique key columns

 - `patientSystemIdHosp` - "This provides a stable identifier to
   link back to all reported tumors for a patient. It also serves as
   a reliable linking identifier; useful when central registries
   send follow-up information back to hospitals. Other identifiers
   such as social security number and medical record number, while
   useful, are subject to change and are thus less useful for this
   type of record linkage."

 - `tumorRecordNumber` - "Description: A system-generated number
    assigned to each tumor. The number should never change even if
    the tumor sequence is changed or a record (tumor) is deleted.
    Rationale: This is a unique number that identifies a specific
    tumor so data can be linked. "Sequence Number" cannot be used as
    a link because the number is changed if a report identifies an
    earlier tumor or if a tumor record is deleted."

Turns out to be not enough:

In [37]:
def dups(df_spark: DataFrame, key_cols: List[str]) -> pd.DataFrame:
    df_pd = df_spark.toPandas().sort_values(key_cols)
    df_pd['dup'] = df_pd.duplicated(key_cols, keep=False)
    return df_pd[df_pd.dup]


_key1 = ['patientSystemIdHosp', 'tumorRecordNumber']

IO_TESTING and dups(_extract.select('sequenceNumberCentral',
                                    'dateOfDiagnosis', 'dateCaseCompleted',
                                    *_key1),
                    _key1).set_index(_key1)

Unnamed: 0_level_0,Unnamed: 1_level_0,sequenceNumberCentral,dateOfDiagnosis,dateCaseCompleted,dup
patientSystemIdHosp,tumorRecordNumber,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,01,00,20171019,,True
,01,00,20180710,,True
,01,00,20131216,,True
,01,00,20180916,,True
,01,00,20170328,,True
,01,00,20170902,,True
,01,00,20100425,,True
,01,00,20161105,,True
,01,00,20180420,,True
,01,00,20170210,,True


In [38]:
class TumorKeys:
    # issue: accessionNumberHosp is not unique
    pat_ids = ['patientSystemIdHosp', 'patientIdNumber']
    pat_attrs = pat_ids + ['dateOfBirth', 'dateOfLastContact',
                           'sex', 'vitalStatus']
    tmr_ids = ['tumorRecordNumber']
    tmr_attrs = tmr_ids + [
        'dateOfDiagnosis',
        'sequenceNumberCentral', 'sequenceNumberHospital', 'primarySite',
        'ageAtDiagnosis', 'dateOfInptAdm', 'dateOfInptDisch', 'classOfCase',
        'dateCaseInitiated', 'dateCaseCompleted', 'dateCaseLastChanged',
    ]
    report_ids = ['naaccrRecordVersion', 'npiRegistryId']
    report_attrs = report_ids + ['dateCaseReportExported']

    dtcols = ['dateOfBirth', 'dateOfDiagnosis', 'dateOfLastContact',
              'dateCaseCompleted', 'dateCaseLastChanged']
    key4 = [
        'patientSystemIdHosp',  # NAACCR stable patient ID
        'tumorRecordNumber',    # NAACCR stable tumor ID
        'patientIdNumber',      # patient_mapping
        'abstractedBy',         # IDEA/YAGNI?: provider_id
    ]
    @classmethod
    def pat_tmr(cls, spark: SparkSession_T,
                naaccr_text_lines: DataFrame) -> DataFrame:
        return cls._pick_cols(spark, naaccr_text_lines,
                              cls.tmr_attrs + cls.pat_attrs + cls.report_attrs)

    @classmethod
    def patients(cls, spark: SparkSession_T,
                 naaccr_text_lines: DataFrame) -> DataFrame:
        pat = cls._pick_cols(spark, naaccr_text_lines,
                             cls.pat_ids + cls.pat_attrs +
                             cls.report_ids + cls.report_attrs)
        # distinct() wasn't fixed until the 3.x pre-release
        # https://github.com/zero323/pyspark-stubs/pull/138 623b0c0330ef
        return pat.distinct()  # type: ignore

    @classmethod
    def _pick_cols(cls, spark: SparkSession_T,
                   naaccr_text_lines: DataFrame,
                   cols: List[str]) -> DataFrame:
        dd = ddictDF(spark)
        pat_tmr = naaccr_read_fwf(
            naaccr_text_lines,
            dd.where(dd.naaccrId.isin(cols)))
        pat_tmr = naaccr_dates(pat_tmr,
                               [c for c in pat_tmr.columns
                                if c.startswith('date')])
        return pat_tmr

    @classmethod
    def with_tumor_id(cls, data: DataFrame,
                      name: str = 'recordId',
                      extra: List[str] = ['dateOfDiagnosis',
                                          'dateCaseCompleted'],
                      # keep recordId length consistent
                      extra_default: Opt[sq.Column] = None) -> DataFrame:
        # ISSUE: performance: add encounter_num column here?
        if extra_default is None:
            extra_default = func.lit('0000-00-00')
        id_col = func.concat(data.patientSystemIdHosp,
                             data.tumorRecordNumber,
                             *[func.coalesce(data[col], extra_default)
                               for col in extra])
        return data.withColumn(name, id_col)

    @classmethod
    def with_rownum(cls, tumors: DataFrame,
                    start: int = 1,
                    new_col: str = 'encounter_num',
                    key_col: str = 'recordId') -> DataFrame:
        tumors = tumors.withColumn(
            new_col,
            func.lit(start) +
            func.row_number().over(Window.orderBy(key_col)))
        return tumors

    @classmethod
    def export_patient_ids(cls, df: DataFrame, spark: SparkSession_T,
                           cdw: 'Account', schema: str,
                           tmp_table: str = 'NAACCR_PMAP',
                           id_col: str = 'patientIdNumber') -> None:
        log.info('writing %s to %s', id_col, tmp_table)
        cdw.wr(df.select(id_col).distinct().write,  # type: ignore
               tmp_table, mode='overwrite')

    @classmethod
    def with_patient_num(cls, df: DataFrame, spark: SparkSession_T,
                         cdw: 'Account', schema: str,
                         source: str,  # assumed injection-safe
                         tmp_table: str = 'NAACCR_PMAP',
                         id_col: str = 'patientIdNumber') -> DataFrame:
        cls.export_patient_ids(df, spark, cdw, schema,
                               id_col=id_col, tmp_table=tmp_table)
        q = f'''(
            select ea."{id_col}", pmap.PATIENT_NUM
            from {tmp_table} ea
            join {schema}.PATIENT_MAPPING pmap
            on pmap.patient_ide_source = '{source}'
            and ltrim(pmap.patient_ide, '0') = ltrim(ea."{id_col}", '0')
        )'''
        src_map = cdw.rd(spark.read, q)
        out = df.join(src_map, df[id_col] == src_map[id_col], how='left')
        out = out.drop(src_map[id_col])
        out = out.withColumnRenamed('PATIENT_NUM', 'patient_num')
        return out


# pat_tmr.cache()
if IO_TESTING:
    _pat_tmr = TumorKeys.with_rownum(TumorKeys.with_tumor_id(
        TumorKeys.pat_tmr(_spark, _naaccr_text_lines)))
    _patients = TumorKeys.patients(_spark, _naaccr_text_lines)
IO_TESTING and (_pat_tmr, _patients)

(DataFrame[naaccrRecordVersion: string, npiRegistryId: string, tumorRecordNumber: string, patientIdNumber: string, patientSystemIdHosp: string, sex: string, ageAtDiagnosis: string, dateOfBirth: date, sequenceNumberCentral: string, dateOfDiagnosis: date, primarySite: string, sequenceNumberHospital: string, dateOfInptAdm: date, dateOfInptDisch: date, classOfCase: string, dateCaseInitiated: date, dateCaseCompleted: date, dateCaseLastChanged: date, dateCaseReportExported: date, dateOfLastContact: date, vitalStatus: string, recordId: string, encounter_num: int],
 DataFrame[naaccrRecordVersion: string, npiRegistryId: string, patientIdNumber: string, patientSystemIdHosp: string, sex: string, dateOfBirth: date, dateCaseReportExported: date, dateOfLastContact: date, vitalStatus: string])

In [39]:
IO_TESTING and _pat_tmr.limit(15).toPandas()

Unnamed: 0,naaccrRecordVersion,npiRegistryId,tumorRecordNumber,patientIdNumber,patientSystemIdHosp,sex,ageAtDiagnosis,dateOfBirth,sequenceNumberCentral,dateOfDiagnosis,...,dateOfInptDisch,classOfCase,dateCaseInitiated,dateCaseCompleted,dateCaseLastChanged,dateCaseReportExported,dateOfLastContact,vitalStatus,recordId,encounter_num
0,180,,1,65,,1,50,1931-09-13,1,1981-12-04,...,,,,,,,2013-05-03,1,011981-12-040000-00-00,2
1,180,,1,38,,2,53,1954-07-18,1,2008-06-27,...,,,,,,,2017-03-08,1,012008-06-270000-00-00,3
2,180,,1,16,,2,60,1949-09-13,0,2010-04-25,...,,,,,,,2010-04-25,1,012010-04-250000-00-00,4
3,180,,1,12,,1,30,1983-05-05,0,2013-12-16,...,,,,,,,2013-12-16,1,012013-12-160000-00-00,5
4,180,,1,25,,1,70,1943-02-11,0,2013-12-18,...,,,,,,,2013-12-18,1,012013-12-180000-00-00,6
5,180,,1,68,,1,63,1950-08-30,0,2014-05-29,...,,,,,,,2014-05-29,1,012014-05-290000-00-00,7
6,180,,1,59,,1,43,1971-04-28,0,2014-09-24,...,,,,,,,2014-09-24,1,012014-09-240000-00-00,8
7,180,,1,29,,2,81,1933-07-05,0,2014-10-03,...,,,,,,,2014-10-03,1,012014-10-030000-00-00,9
8,180,,1,62,,1,73,1941-07-21,0,2014-10-06,...,,,,,,,2014-10-06,1,012014-10-060000-00-00,10
9,180,,1,95,,1,62,1952-10-10,0,2014-12-11,...,,,,,,,2014-12-11,1,012014-12-110000-00-00,11


In [40]:
IO_TESTING and _patients.limit(10).toPandas()

Unnamed: 0,naaccrRecordVersion,npiRegistryId,patientIdNumber,patientSystemIdHosp,sex,dateOfBirth,dateCaseReportExported,dateOfLastContact,vitalStatus
0,180,,37,,1,1947-03-30,,2018-11-23,1
1,180,,56,,2,1944-04-13,,2015-12-06,1
2,180,,41,,1,1942-01-24,,2018-09-06,1
3,180,,13,,2,1975-12-08,,2018-09-16,1
4,180,,68,,1,1950-08-30,,2014-05-29,1
5,180,,72,,2,1939-02-28,,2019-03-08,1
6,180,,103,,1,1965-01-09,,2016-11-13,1
7,180,,60,,2,1946-01-16,,2018-07-13,1
8,180,,61,,2,1974-10-10,,2017-07-16,1
9,180,,39,,1,1969-03-29,,2019-04-12,1


##  Observations

In [41]:
def melt(df: DataFrame,
         id_vars: List[str], value_vars: List[str],
         var_name: str = 'variable', value_name: str = 'value') -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    # ack: user6910411 Jan 2017 https://stackoverflow.com/a/41673644

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = func.array(*(
        func.struct(func.lit(c).alias(var_name), func.col(c).alias(value_name))
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", func.explode(_vars_and_vals))

    cols = [func.col(v) for v in id_vars] + [
        func.col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

In [42]:
if IO_TESTING:
    _ty = _spark.read.csv('heron_load/tumor_item_type.csv',
                          header=True, inferSchema=True)
    _ty.cache()
IO_TESTING and _ty.limit(5).toPandas()

Unnamed: 0,naaccrNum,sectionId,section,naaccrId,naaccrName,length,source,loinc_num,scale_typ,AnswerListId,scheme,valtype_cd,phi_id_kind
0,380,1,Cancer Identification,sequenceNumberCentral,Sequence Number--Central,2,SEER,21853-7,Ord,LL210-6,,@,
1,390,1,Cancer Identification,dateOfDiagnosis,Date of Diagnosis,8,SEER/CoC,21854-5,Qn,,,D,
2,391,1,Cancer Identification,dateOfDiagnosisFlag,Date of Diagnosis Flag,2,NAACCR,59516-5,Nom,LL929-1,,@,
3,400,1,Cancer Identification,primarySite,Primary Site,4,SEER/CoC,21855-2,Nom,,,@,
4,410,1,Cancer Identification,laterality,Laterality,1,SEER/CoC,,Nom,,laterality,@,


**ISSUE**: performance: whenever we change cardinality, consider persisting the data. e.g. stack_obs

In [43]:
def stack_obs(records: DataFrame, ty: DataFrame,
              known_valtype_cd: List[str] = ['@', 'D', 'N', 'Ni', 'T'],
              key_cols: List[str] = TumorKeys.key4 + TumorKeys.dtcols) -> DataFrame:
    ty = ty.select('naaccrNum', 'naaccrId', 'valtype_cd')
    ty = ty.where(ty.valtype_cd.isin(known_valtype_cd))
    value_vars = [row.naaccrId for row in ty.collect()]
    obs = melt(records, key_cols, value_vars, var_name='naaccrId', value_name='raw_value')
    obs = obs.where("trim(raw_value) != ''")
    obs = obs.join(ty, ty.naaccrId == obs.naaccrId).drop(ty.naaccrId)
    return obs


if IO_TESTING:
    _raw_obs = TumorKeys.with_tumor_id(naaccr_dates(stack_obs(_extract, _ty), TumorKeys.dtcols))
    _raw_obs.createOrReplaceTempView('naaccr_obs_raw')
IO_TESTING and _raw_obs.limit(10).toPandas()

Unnamed: 0,patientSystemIdHosp,tumorRecordNumber,patientIdNumber,abstractedBy,dateOfBirth,dateOfDiagnosis,dateOfLastContact,dateCaseCompleted,dateCaseLastChanged,naaccrId,raw_value,naaccrNum,valtype_cd,recordId
0,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,sequenceNumberCentral,00,380,@,012017-10-190000-00-00
1,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,dateOfDiagnosis,20171019,390,D,012017-10-190000-00-00
2,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,primarySite,C504,400,@,012017-10-190000-00-00
3,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,laterality,9,410,@,012017-10-190000-00-00
4,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,grade,9,440,@,012017-10-190000-00-00
5,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,dateConclusiveDx,20171019,443,D,012017-10-190000-00-00
6,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,multTumRptAsOnePrim,99,444,@,012017-10-190000-00-00
7,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,dateOfMultTumors,20171019,445,D,012017-10-190000-00-00
8,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,multiplicityCounter,99,446,N,012017-10-190000-00-00
9,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,diagnosticConfirmation,1,490,@,012017-10-190000-00-00


In [44]:
if IO_TESTING:
    _script1 = SqlScript('naaccr_txform.sql',
                         res.read_text(heron_load, 'naaccr_txform.sql'),
                         [('tumor_item_value', ['naaccr_obs_raw'])])
    create_objects(_spark, _script1, naaccr_obs_raw=_raw_obs)
IO_TESTING and _spark.table('tumor_item_value').limit(10).toPandas()

INFO:tumor_reg_ont:naaccr_txform.sql: naaccr_obs_raw = DataFrame[patientSystemIdHosp: string, tumorRecordNumber: string, patientIdNumber: string, abstractedBy: string, dateOfBirth: date, dateOfDiagnosis: date, dateOfLastContact: date, dateCaseCompleted: date, dateCaseLastChanged: date, naaccrId: string, raw_value: string, naaccrNum: int, valtype_cd: string, recordId: string]


INFO:tumor_reg_ont:naaccr_txform.sql: create tumor_item_value


Unnamed: 0,patientSystemIdHosp,tumorRecordNumber,patientIdNumber,abstractedBy,dateOfBirth,dateOfDiagnosis,dateOfLastContact,dateCaseCompleted,dateCaseLastChanged,naaccrId,raw_value,naaccrNum,valtype_cd,recordId,identified_only,code_value,numeric_value,date_value,text_value
0,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,sequenceNumberCentral,00,380,@,012017-10-190000-00-00,False,00,,,
1,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,dateOfDiagnosis,20171019,390,D,012017-10-190000-00-00,False,,,2017-10-19,
2,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,primarySite,C504,400,@,012017-10-190000-00-00,False,C504,,,
3,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,laterality,9,410,@,012017-10-190000-00-00,False,9,,,
4,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,grade,9,440,@,012017-10-190000-00-00,False,9,,,
5,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,dateConclusiveDx,20171019,443,D,012017-10-190000-00-00,False,,,2017-10-19,
6,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,multTumRptAsOnePrim,99,444,@,012017-10-190000-00-00,False,99,,,
7,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,dateOfMultTumors,20171019,445,D,012017-10-190000-00-00,False,,,2017-10-19,
8,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,multiplicityCounter,99,446,N,012017-10-190000-00-00,False,,99.0,,
9,,1,10,,1951-04-05,2017-10-19,2017-10-19,,,diagnosticConfirmation,1,490,@,012017-10-190000-00-00,False,1,,,


In [45]:
if IO_TESTING:
    _script1 = SqlScript('naaccr_txform.sql',
                         res.read_text(heron_load, 'naaccr_txform.sql'),
                         [('tumor_reg_facts', ['record_layout', 'section'])])
    create_objects(_spark, _script1,
                   record_layout=_spark.createDataFrame(NAACCR_Layout.fields),
                   section=_spark.createDataFrame(NAACCR_I2B2.per_section))
IO_TESTING and _spark.table('tumor_reg_facts').limit(10).toPandas()

INFO:tumor_reg_ont:naaccr_txform.sql: record_layout = DataFrame[end: bigint, length: bigint, naaccr-item-num: bigint, name: string, section: string, start: bigint]


INFO:tumor_reg_ont:naaccr_txform.sql: section = DataFrame[sectionid: bigint, section: string]


INFO:tumor_reg_ont:naaccr_txform.sql: create tumor_reg_facts


Unnamed: 0,recordId,patientIdNumber,naaccrId,concept_cd,provider_id,start_date,modifier_cd,instance_num,valtype_cd,tval_char,nval_num,valueflag_cd,units_cd,end_date,location_cd,update_date
0,012017-10-190000-00-00,10,causeOfDeath,NAACCR|1910:0000,@,2017-10-19,@,1,@,,,,,2017-10-19,@,2017-10-19
1,012018-07-100000-00-00,11,causeOfDeath,NAACCR|1910:0000,@,2018-07-10,@,1,@,,,,,2018-07-10,@,2018-07-10
2,012013-12-160000-00-00,12,causeOfDeath,NAACCR|1910:0000,@,2013-12-16,@,1,@,,,,,2013-12-16,@,2013-12-16
3,012018-09-160000-00-00,13,causeOfDeath,NAACCR|1910:0000,@,2018-09-16,@,1,@,,,,,2018-09-16,@,2018-09-16
4,012017-03-280000-00-00,14,causeOfDeath,NAACCR|1910:0000,@,2017-03-28,@,1,@,,,,,2017-03-28,@,2017-03-28
5,012017-09-020000-00-00,15,causeOfDeath,NAACCR|1910:0000,@,2017-09-02,@,1,@,,,,,2017-09-02,@,2017-09-02
6,012010-04-250000-00-00,16,causeOfDeath,NAACCR|1910:0000,@,2010-04-25,@,1,@,,,,,2010-04-25,@,2010-04-25
7,012016-11-050000-00-00,17,causeOfDeath,NAACCR|1910:0000,@,2016-11-05,@,1,@,,,,,2016-11-05,@,2016-11-05
8,012018-04-200000-00-00,18,causeOfDeath,NAACCR|1910:0000,@,2018-04-20,@,1,@,,,,,2018-04-20,@,2018-04-20
9,012017-02-100000-00-00,19,causeOfDeath,NAACCR|1910:0000,@,2017-02-10,@,1,@,,,,,2017-02-10,@,2017-02-10


In [46]:
IO_TESTING and _spark.table('tumor_reg_facts').where("valtype_cd != '@'").limit(5).toPandas()

Unnamed: 0,recordId,patientIdNumber,naaccrId,concept_cd,provider_id,start_date,modifier_cd,instance_num,valtype_cd,tval_char,nval_num,valueflag_cd,units_cd,end_date,location_cd,update_date
0,012017-10-190000-00-00,10,dateOfLastContact,NAACCR|1750:,@,2017-10-19,@,1,D,2017-10-19,,,,2017-10-19,@,2017-10-19
1,012018-07-100000-00-00,11,dateOfLastContact,NAACCR|1750:,@,2018-07-10,@,1,D,2018-07-10,,,,2018-07-10,@,2018-07-10
2,012013-12-160000-00-00,12,dateOfLastContact,NAACCR|1750:,@,2013-12-16,@,1,D,2013-12-16,,,,2013-12-16,@,2013-12-16
3,012018-09-160000-00-00,13,dateOfLastContact,NAACCR|1750:,@,2018-09-16,@,1,D,2018-09-16,,,,2018-09-16,@,2018-09-16
4,012017-03-280000-00-00,14,dateOfLastContact,NAACCR|1750:,@,2017-03-28,@,1,D,2017-03-28,,,,2017-03-28,@,2017-03-28


In [47]:
class ItemObs:
    script = SqlScript('naaccr_txform.sql',
                       res.read_text(heron_load, 'naaccr_txform.sql'),
                       [
                           ('tumor_item_value', ['naaccr_obs_raw']),
                           ('tumor_reg_facts', ['record_layout', 'section']),
                       ])

    extract_id_view = 'naaccr_extract_id'

    @classmethod
    def make(cls, spark: SparkSession_T, extract: DataFrame) -> DataFrame:
        item_ty = NAACCR_I2B2.item_views_in(spark)

        raw_obs = TumorKeys.with_tumor_id(naaccr_dates(
            stack_obs(extract, item_ty),
            TumorKeys.dtcols))

        views = create_objects(
            spark, cls.script,
            naaccr_obs_raw=raw_obs,
            # ISSUE: refactor item_views_in
            record_layout=spark.createDataFrame(NAACCR_Layout.fields),
            section=spark.createDataFrame(NAACCR_I2B2.per_section))

        return list(views.values())[-1]

    @classmethod
    def make_extract_id(cls,
                        spark: SparkSession_T,
                        extract: DataFrame) -> DataFrame:
        extract_id = TumorKeys.with_tumor_id(
            naaccr_dates(extract, TumorKeys.dtcols))
        extract_id.createOrReplaceTempView(cls.extract_id_view)
        return spark.table(cls.extract_id_view)


if IO_TESTING:
    _obs = ItemObs.make(_spark, _extract)
IO_TESTING and _obs.limit(5).toPandas()

INFO:tumor_reg_ont:naaccr_txform.sql: naaccr_obs_raw = DataFrame[patientSystemIdHosp: string, tumorRecordNumber: string, patientIdNumber: string, abstractedBy: string, dateOfBirth: date, dateOfDiagnosis: date, dateOfLastContact: date, dateCaseCompleted: date, dateCaseLastChanged: date, naaccrId: string, raw_value: string, naaccrNum: bigint, valtype_cd: string, recordId: string]


INFO:tumor_reg_ont:naaccr_txform.sql: record_layout = DataFrame[end: bigint, length: bigint, naaccr-item-num: bigint, name: string, section: string, start: bigint]


INFO:tumor_reg_ont:naaccr_txform.sql: section = DataFrame[sectionid: bigint, section: string]


INFO:tumor_reg_ont:naaccr_txform.sql: create tumor_item_value


INFO:tumor_reg_ont:naaccr_txform.sql: create tumor_reg_facts


Unnamed: 0,recordId,patientIdNumber,naaccrId,concept_cd,provider_id,start_date,modifier_cd,instance_num,valtype_cd,tval_char,nval_num,valueflag_cd,units_cd,end_date,location_cd,update_date
0,012016-11-130000-00-00,103,causeOfDeath,NAACCR|1910:0000,@,2016-11-13,@,1,@,,,,,2016-11-13,@,2016-11-13
1,012015-02-010000-00-00,102,causeOfDeath,NAACCR|1910:0000,@,2015-02-01,@,1,@,,,,,2015-02-01,@,2015-02-01
2,012017-11-230000-00-00,101,causeOfDeath,NAACCR|1910:0000,@,2017-11-23,@,1,@,,,,,2017-11-23,@,2017-11-23
3,012018-09-110000-00-00,100,causeOfDeath,NAACCR|1910:0000,@,2018-09-11,@,1,@,,,,,2018-09-11,@,2018-09-11
4,012017-10-040000-00-00,99,causeOfDeath,NAACCR|1910:0000,@,2017-10-04,@,1,@,,,,,2017-10-04,@,2017-10-04


In [48]:
IO_TESTING and ItemObs.make_extract_id(_spark, _extract).limit(5).toPandas()

Unnamed: 0,recordType,registryType,naaccrRecordVersion,npiRegistryId,registryId,tumorRecordNumber,patientIdNumber,patientSystemIdHosp,addrAtDxCity,addrAtDxState,...,rxTextSurgery,rxTextRadiation,rxTextRadiationOther,rxTextChemo,rxTextHormone,rxTextBrm,rxTextOther,textRemarks,textPlaceOfDiagnosis,recordId
0,I,,180,,,1,10,,...,,...,,,,,,,,,,012017-10-190000-00-00
1,I,,180,,,1,11,,...,,...,,,,,,,,,,012018-07-100000-00-00
2,I,,180,,,1,12,,...,,...,,,,,,,,,,012013-12-160000-00-00
3,I,,180,,,1,13,,...,,...,,,,,,,,,,012018-09-160000-00-00
4,I,,180,,,1,14,,...,,...,,,,,,,,,,012017-03-280000-00-00


## SEER Site Recode

In [49]:
class SEER_Recode:
    script = SqlScript('seer_recode.sql',
                       res.read_text(heron_load, 'seer_recode.sql'),
                       [('seer_recode_aux', ['naaccr_extract_id']),
                        ('seer_recode_facts', [])])

    @classmethod
    def make(cls, spark: SparkSession_T, extract: DataFrame) -> DataFrame:
        extract_id = ItemObs.make_extract_id(spark, extract)
        views = create_objects(spark, cls.script,
                               naaccr_extract_id=extract_id)
        return list(views.values())[-1]


IO_TESTING and SEER_Recode.make(_spark, _extract).limit(5).toPandas()

INFO:tumor_reg_ont:seer_recode.sql: naaccr_extract_id = DataFrame[recordType: string, registryType: string, naaccrRecordVersion: string, npiRegistryId: string, registryId: string, tumorRecordNumber: string, patientIdNumber: string, patientSystemIdHosp: string, addrAtDxCity: string, addrAtDxState: string, addrAtDxPostalCode: string, countyAtDx: string, countyAtDxAnalysis: string, stateAtDxGeocode19708090: string, countyAtDxGeocode1990: string, censusTract19708090: string, censusBlockGrp197090: string, censusCodSys19708090: string, censusTrCert19708090: string, stateAtDxGeocode2000: string, countyAtDxGeocode2000: string, censusTract2000: string, censusBlockGroup2000: string, censusTrCertainty2000: string, stateAtDxGeocode2010: string, countyAtDxGeocode2010: string, censusTract2010: string, censusBlockGroup2010: string, censusTrCertainty2010: string, stateAtDxGeocode2020: string, countyAtDxGeocode2020: string, censusTract2020: string, censusBlockGroup2020: string, censusTractCertainty2020

INFO:tumor_reg_ont:seer_recode.sql: create seer_recode_aux


INFO:tumor_reg_ont:seer_recode.sql: create seer_recode_facts


Unnamed: 0,recordId,patientIdNumber,naaccrId,concept_cd,provider_id,start_date,modifier_cd,instance_num,valtype_cd,tval_char,nval_num,valueflag_cd,units_cd,end_date,location_cd,update_date
0,012017-10-190000-00-00,10,@,SEER_SITE:26000,@,2017-10-19,@,1,@,@,,,,2017-10-19,@,2017-10-19
1,012018-07-100000-00-00,11,@,SEER_SITE:22030,@,2018-07-10,@,1,@,@,,,,2018-07-10,@,2018-07-10
2,012013-12-160000-00-00,12,@,SEER_SITE:21052,@,2013-12-16,@,1,@,@,,,,2013-12-16,@,2013-12-16
3,012018-09-160000-00-00,13,@,SEER_SITE:20050,@,2018-09-16,@,1,@,@,,,,2018-09-16,@,2018-09-16
4,012017-03-280000-00-00,14,@,SEER_SITE:29010,@,2017-03-28,@,1,@,@,,,,2017-03-28,@,2017-03-28


In [50]:
class SiteSpecificFactors:
    sql = res.read_text(heron_load, 'csschema.sql')
    script1 = SqlScript('csschema.sql', sql,
                        [('tumor_cs_schema', ['naaccr_extract_id'])])
    script2 = SqlScript('csschema.sql', sql,
                        [('cs_site_factor_facts', ['cs_obs_raw'])])

    items = [it for it in NAACCR1.items_180()
             if it['naaccrName'].startswith('CS Site-Specific Factor')]

    @classmethod
    def valtypes(cls) -> pd.DataFrame:
        factor_nums = [d['naaccrNum'] for d in cls.items]
        item_ty = NAACCR_I2B2.tumor_item_type[['naaccrNum', 'naaccrId', 'valtype_cd']]
        item_ty = item_ty[item_ty.naaccrNum.isin(factor_nums)]
        return item_ty

    @classmethod
    def make(cls, spark: SparkSession_T, extract: DataFrame) -> DataFrame:
        with_schema = cls.make_tumor_schema(spark, extract)
        ty_df = spark.createDataFrame(cls.valtypes())

        raw_obs = stack_obs(with_schema, ty_df,
                            key_cols=TumorKeys.key4 + TumorKeys.dtcols + ['recordId', 'cs_schema_name'])

        views = create_objects(spark, cls.script2,
                               cs_obs_raw=raw_obs)
        return list(views.values())[-1]

    @classmethod
    def make_tumor_schema(cls,
                          spark: SparkSession_T,
                          extract: DataFrame) -> DataFrame:
        views = create_objects(spark, cls.script1,
                               naaccr_extract_id=ItemObs.make_extract_id(spark, extract))
        return list(views.values())[-1]


if IO_TESTING:
    _ssf_facts = SiteSpecificFactors.make(_spark, _extract)
    assert _obs.columns == _ssf_facts.columns

IO_TESTING and _ssf_facts.limit(7).toPandas()

INFO:tumor_reg_ont:csschema.sql: naaccr_extract_id = DataFrame[recordType: string, registryType: string, naaccrRecordVersion: string, npiRegistryId: string, registryId: string, tumorRecordNumber: string, patientIdNumber: string, patientSystemIdHosp: string, addrAtDxCity: string, addrAtDxState: string, addrAtDxPostalCode: string, countyAtDx: string, countyAtDxAnalysis: string, stateAtDxGeocode19708090: string, countyAtDxGeocode1990: string, censusTract19708090: string, censusBlockGrp197090: string, censusCodSys19708090: string, censusTrCert19708090: string, stateAtDxGeocode2000: string, countyAtDxGeocode2000: string, censusTract2000: string, censusBlockGroup2000: string, censusTrCertainty2000: string, stateAtDxGeocode2010: string, countyAtDxGeocode2010: string, censusTract2010: string, censusBlockGroup2010: string, censusTrCertainty2010: string, stateAtDxGeocode2020: string, countyAtDxGeocode2020: string, censusTract2020: string, censusBlockGroup2020: string, censusTractCertainty2020: s

INFO:tumor_reg_ont:csschema.sql: create tumor_cs_schema


INFO:tumor_reg_ont:csschema.sql: cs_obs_raw = DataFrame[patientSystemIdHosp: string, tumorRecordNumber: string, patientIdNumber: string, abstractedBy: string, dateOfBirth: date, dateOfDiagnosis: date, dateOfLastContact: date, dateCaseCompleted: date, dateCaseLastChanged: date, recordId: string, cs_schema_name: string, naaccrId: string, raw_value: string, naaccrNum: bigint, valtype_cd: string]


INFO:tumor_reg_ont:csschema.sql: create cs_site_factor_facts


Unnamed: 0,recordId,patientIdNumber,naaccrId,concept_cd,provider_id,start_date,modifier_cd,instance_num,valtype_cd,tval_char,nval_num,valueflag_cd,units_cd,end_date,location_cd,update_date
0,012015-02-010000-00-00,102,@,CS|Testis|7:999,@,2015-02-01,@,1,@,,,,,2015-02-01,@,2015-02-01
1,012014-12-110000-00-00,95,@,CS|Skin|7:999,@,2014-12-11,@,1,@,,,,,2014-12-11,@,2014-12-11
2,012015-08-240000-00-00,75,@,CS|Lung|7:988,@,2015-08-24,@,1,@,,,,,2015-08-24,@,2015-08-24
3,012014-05-290000-00-00,68,@,CS|Prostate|7:054,@,2014-05-29,@,1,@,,,,,2014-05-29,@,2014-05-29
4,022013-05-030000-00-00,65,@,CS|ParotidGland|7:020,@,2013-05-03,@,1,@,,,,,2013-05-03,@,2013-05-03
5,012014-10-060000-00-00,62,@,CS|Rectum|7:050,@,2014-10-06,@,1,@,,,,,2014-10-06,@,2014-10-06
6,012014-09-240000-00-00,59,@,CS|Lymphoma|7:988,@,2014-09-24,@,1,@,,,,,2014-09-24,@,2014-09-24


## Oracle DB Access

We use `PYSPARK_SUBMIT_ARGS` to get JDBC jar in both
`spark.driver.extraClassPath` and `--jars`:

In [51]:
if IO_TESTING:
    from os import environ as _environ
    log.info(_environ['PYSPARK_SUBMIT_ARGS'])

INFO:__main__:--driver-class-path /home/dconnolly/Downloads/ojdbc8.jar --jars /home/dconnolly/Downloads/ojdbc8.jar


In [52]:
IO_TESTING and _spark.sparkContext.getConf().get('spark.driver.extraClassPath')

In [53]:
if IO_TESTING:
    def _set_pw(name: str = 'ID CDW') -> None:
        from os import environ
        from getpass import getpass
        password = getpass(name)
        environ[name] = password

    # _set_pw()

In [54]:
class Account:
    def __init__(self, user: str, password: str,
                 url: str = 'jdbc:oracle:thin:@localhost:8621:nheronB2',
                 driver: str = "oracle.jdbc.OracleDriver") -> None:
        self.url = url
        db = url.split(':')[-1]
        self.label = f'{self.__class__.__name__}({user}@{db})'
        self.__properties = {"user": user,
                             "password": password,
                             "driver": driver}

    def __repr__(self) -> str:
        return self.label

    def rd(self, io: sq.DataFrameReader, table: str) -> DataFrame:
        return io.jdbc(self.url, table,
                       properties=self.__properties)

    def wr(self, io: sq.DataFrameWriter, table: str,
           mode: Opt[str] = None) -> None:
        io.jdbc(self.url, table,
                properties=self.__properties,
                mode=mode)


DB_TESTING = IO_TESTING and 'ID CDW' in _environ
if DB_TESTING:
    _cdw = Account(_environ['LOGNAME'], _environ['ID CDW'])

DB_TESTING and _cdw.rd(_spark.read, "global_name").toPandas()

False


  - **ISSUE**: column name capitalization: `concept_cd` vs.
    `CONCEPT_CD`, `dateOfDiagnosis` vs. `DATEOFDIAGNOSIS`
    vs. `DATE_OF_DIAGNOSIS`.

In [55]:
def case_fold(df: DataFrame) -> DataFrame:
    """Fold column names to upper-case, following (Oracle) SQL norms.

    See also: upper_snake_case in pcornet_cdm
    """
    return df.toDF(*[n.upper() for n in df.columns])

In [56]:
if DB_TESTING:
    _patients_mapped = TumorKeys.with_patient_num(_patients, _spark, _cdw, 'NIGHTHERONDATA', 'SMS@kumed.com')

DB_TESTING and _patients_mapped.limit(5).toPandas()

False

## Use Case: GPC Breast Cancer Survey

The NAACCR format has 500+ items. To provide initial focus, let's use
the variables from the 2016 GPC breast cancer survey:

In [57]:
class CancerStudy:
    bc_variable = pd.read_csv(res.open_text(bc_qa, 'bc-variable.csv'))


IO_TESTING and _spark.createDataFrame(
    CancerStudy.bc_variable).limit(5).toPandas()

Unnamed: 0,id,item_key,concept_path,name_char,name
0,0,\\KUMC_naaccr\i2b2\naaccr\SEER Site\Breast\,\i2b2\naaccr\SEER Site\Breast\,"Breast [10,281 facts; 9,729 patients]",Breast
1,1,\\KUMC_naaccr\i2b2\naaccr\S:6 Hospital-Specifi...,\i2b2\naaccr\S:6 Hospital-Specific\0610 Class ...,30 [58 facts; 58 patients],30
2,2,\\KUMC_naaccr\i2b2\naaccr\S:6 Hospital-Specifi...,\i2b2\naaccr\S:6 Hospital-Specific\0610 Class ...,31 [&lt;10 facts],31
3,3,\\KUMC_naaccr\i2b2\naaccr\S:6 Hospital-Specifi...,\i2b2\naaccr\S:6 Hospital-Specific\0610 Class ...,"32 [14,310 facts; 14,026 patients]",32
4,4,\\KUMC_naaccr\i2b2\naaccr\S:6 Hospital-Specifi...,\i2b2\naaccr\S:6 Hospital-Specific\0610 Class ...,33 [&lt;10 facts],33


In [58]:
def itemNumOfPath(bc_var: DataFrame,
                  item: str = 'item') -> DataFrame:
    digits = func.regexp_extract('concept_path',
                                 r'\\i2b2\\naaccr\\S:[^\\]+\\(\d+)', 1)
    items = bc_var.select(digits.cast('int').alias(item))   # TODO: .dropna().distinct()
    return items.sort(item)


IO_TESTING and itemNumOfPath(_spark.createDataFrame(
    CancerStudy.bc_variable)).limit(5).toPandas()

Unnamed: 0,item
0,
1,
2,
3,
4,160.0


In [59]:
def _selectedItems(ddict: DataFrame, items: DataFrame) -> DataFrame:
    selected = ddict.join(items,
                          ddict.naaccrNum == items.item).drop(items.item)
    return selected.sort(selected.length.desc(), selected.naaccrNum)


if IO_TESTING:
    _bc_ddict = _selectedItems(
        ddictDF(_spark),
        itemNumOfPath(_spark.createDataFrame(CancerStudy.bc_variable)),
    ).select('naaccrId', 'naaccrNum', 'parentXmlElement', 'length')

IO_TESTING and (
    _bc_ddict.select('naaccrId', 'naaccrNum', 'parentXmlElement', 'length')
    .toPandas().set_index(['naaccrNum', 'naaccrId'])
)

Unnamed: 0_level_0,Unnamed: 1_level_0,parentXmlElement,length
naaccrNum,naaccrId,Unnamed: 2_level_1,Unnamed: 3_level_1
240,dateOfBirth,Patient,8
390,dateOfDiagnosis,Tumor,8
1750,dateOfLastContact,Patient,8
1860,recurrenceDate1st,Tumor,8
400,primarySite,Tumor,4
2869,csSiteSpecificFactor15,Tumor,3
2876,csSiteSpecificFactor22,Tumor,3
2877,csSiteSpecificFactor23,Tumor,3
2880,csSiteSpecificFactor1,Tumor,3
2890,csSiteSpecificFactor2,Tumor,3


### Patients, Encounters, and Observations per Variable

  - **ISSUE**: naaccr-xml test data has no data on classOfCase etc.
    at least not the 100 tumor sample.

In [60]:
def bc_var_facts(coded_facts: DataFrame, ddict: DataFrame) -> DataFrame:
    return coded_facts.join(
        ddict.select('naaccrId'),
        coded_facts.naaccrId == ddict.naaccrId,
    ).drop(ddict.naaccrId)


def data_summary(spark: SparkSession_T, obs: DataFrame) -> DataFrame:
    obs.createOrReplaceTempView('summary_input')  # ISSUE: CLOBBER!
    return spark.sql('''
    select naaccrId as variable
         , count(distinct patientIdNumber) as pat_qty
         , count(distinct recordId) as enc_qty
         , count(*) as fact_qty
    from summary_input
    group by naaccrId
    order by 2 desc, 3 desc, 4 desc
    ''')


def bc_var_summary(spark: SparkSession_T,
                   obs: DataFrame, ddict: DataFrame) -> DataFrame:
    agg = data_summary(
        spark,
        bc_var_facts(obs, ddict)
    )
    dd = ddict.select('naaccrId').withColumnRenamed('naaccrId', 'variable')
    return (dd
            .join(agg, dd.variable == agg.variable, how='left_outer')
            .drop(agg.variable))


IO_TESTING and bc_var_summary(
    _spark, _obs, _bc_ddict).where(
        'fact_qty is null').toPandas()

Unnamed: 0,variable,variable.1,pat_qty,enc_qty,fact_qty
0,classOfCase,,,,
1,classOfCase,,,,
2,classOfCase,,,,
3,classOfCase,,,,
4,classOfCase,,,,
5,classOfCase,,,,
6,classOfCase,,,,
7,classOfCase,,,,
8,classOfCase,,,,
9,classOfCase,,,,


In [61]:
IO_TESTING and bc_var_summary(
    _spark, _obs, _bc_ddict).where(
        'fact_qty is not null').toPandas()

Unnamed: 0,variable,variable.1,pat_qty,enc_qty,fact_qty
0,csSiteSpecificFactor2,csSiteSpecificFactor2,18,18,18
1,derivedAjcc6T,derivedAjcc6T,17,17,17
2,race4,race4,94,100,100
3,csMetsAtDx,csMetsAtDx,18,18,18
4,csSiteSpecificFactor23,csSiteSpecificFactor23,18,18,18
5,grade,grade,94,100,100
6,csSiteSpecificFactor1,csSiteSpecificFactor1,18,18,18
7,csMetsEval,csMetsEval,18,18,18
8,derivedAjcc7T,derivedAjcc7T,16,16,16
9,primarySite,primarySite,94,100,100


In [62]:
IO_TESTING and _obs.where("naaccrId == 'dateOfDiagnosis'").limit(5).toPandas()

Unnamed: 0,recordId,patientIdNumber,naaccrId,concept_cd,provider_id,start_date,modifier_cd,instance_num,valtype_cd,tval_char,nval_num,valueflag_cd,units_cd,end_date,location_cd,update_date
0,012016-11-130000-00-00,103,dateOfDiagnosis,NAACCR|390:,@,2016-11-13,@,1,D,2016-11-13,,,,2016-11-13,@,2016-11-13
1,012015-02-010000-00-00,102,dateOfDiagnosis,NAACCR|390:,@,2015-02-01,@,1,D,2015-02-01,,,,2015-02-01,@,2015-02-01
2,012017-11-230000-00-00,101,dateOfDiagnosis,NAACCR|390:,@,2017-11-23,@,1,D,2017-11-23,,,,2017-11-23,@,2017-11-23
3,012018-09-110000-00-00,100,dateOfDiagnosis,NAACCR|390:,@,2018-09-11,@,1,D,2018-09-11,,,,2018-09-11,@,2018-09-11
4,012017-10-040000-00-00,99,dateOfDiagnosis,NAACCR|390:,@,2017-10-04,@,1,D,2017-10-04,,,,2017-10-04,@,2017-10-04


#### TODO: Code labels; e.g. 1 = Male; 2 = Female

In [63]:
def pivot_obs_by_enc(skinny_obs: DataFrame,
                     pivot_on: str = 'naaccrId',  # cheating... not really in i2b2 observation_fact
                     # TODO: nval_num etc. for value cols?
                     value_col: str = 'concept_cd',
                     key_cols: List[str] = ['recordId', 'patientIdNumber']) -> DataFrame:
    groups = skinny_obs.select(pivot_on, value_col, *key_cols).groupBy(*key_cols)
    wide = groups.pivot(pivot_on).agg(func.first(value_col))
    return wide


IO_TESTING and pivot_obs_by_enc(_obs.where(
    _obs.naaccrId.isin(['dateOfDiagnosis', 'primarySite', 'sex', 'dateOfBirth'])
)).limit(5).toPandas().set_index(['recordId', 'patientIdNumber'])

Unnamed: 0_level_0,Unnamed: 1_level_0,dateOfDiagnosis,primarySite,sex
recordId,patientIdNumber,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
012015-08-240000-00-00,75,NAACCR|390:,NAACCR|400:C341,NAACCR|220:1
012015-12-210000-00-00,36,NAACCR|390:,NAACCR|400:C809,NAACCR|220:1
012018-11-110000-00-00,80,NAACCR|390:,NAACCR|400:C160,NAACCR|220:1
012015-05-240000-00-00,35,NAACCR|390:,NAACCR|400:C779,NAACCR|220:2
012018-12-220000-00-00,91,NAACCR|390:,NAACCR|400:C509,NAACCR|220:2
