In [None]:
#!/usr/bin/env python
# coding: utf-8

"""
Reads MMI Pharmindex CSV files from the directory specified by the /datadir_mmi/
variable and writes corresponding FHIR Medication resources as json files 
(see https://simplifier.net/guide/MedizininformatikInitiative-ModulMedikation-
ImplementationGuide/Medication-duplicate-2) to the directory specified by the
/resultsdir/ variable.
"""

#  use logging module

from collections import defaultdict
from itertools import tee
import json
import os

import pandas as pd

In [None]:
import os

path = os.getcwd()

In [None]:
path

'/content'

In [None]:
datadir = path +'/data/'

In [None]:
datadir

'/content/data/'

In [None]:
datadir_mmi = f'{datadir}20200815MAIN/'

'/content/data/20200815MAIN/'

In [None]:
#datadir = './data/'
#datadir_mmi = f'{datadir}20200815MAIN/'
resultsdir =  f'{datadir}results/'

edqm_mapping_filename = 'EDQM-Standard-Terms_MMI-Mapping.csv'
edqm_mapping_encoding = 'windows-1252'
target_encoding = 'utf-8'
edqm_rename_columns = {
    'MAP_EDQM-Code': 'code',
    'MAP_EDQM-Name_DE': 'display',
}

filename_out = lambda pzn: f"{resultsdir}{pzn}.json"

extension = 'CSV'

column_separator = ';'

column_name_suffix_separator = '_'

id_ = 'ID'

clinicalpackage_name = 'CLINICALPACKAGE'
package_name = 'PACKAGE'
product_name = 'PRODUCT'
item_name = 'ITEM'
item_atc_name = 'ITEM_ATC'
item_compositionelement_name = 'ITEM_COMPOSITIONELEMENT'
compositionelement_name = 'COMPOSITIONELEMENT'
compositionelementequi_name = 'COMPOSITIONELEMENTEQUI'
molecule_name = 'MOLECULE'

catalogentry_name = 'CATALOGENTRY'

joins = (
    (
        package_name,
        product_name,
        item_name,        
        item_compositionelement_name,
        compositionelement_name,
        compositionelementequi_name,
        molecule_name,
    ),
    (
        item_name,
        item_atc_name,
    ),    
)

joins_clinical = (
    (
        clinicalpackage_name,
        product_name,
        item_name,        
        item_compositionelement_name,
        compositionelement_name,
        compositionelementequi_name,
        molecule_name,
    ),
    (
        item_name,
        item_atc_name,
    ),    
)

joins_active = tuple(
    tuple(
        table_name for table_name in join if table_name != compositionelementequi_name
    ) for join in joins
)

table_names = set(
    table_name for join in joins for table_name in join
)
table_names.add(catalogentry_name)
table_names.add(compositionelementequi_name)

def read(
    package_name,
    datadir=datadir_mmi,
    extension=extension,
    column_separator=column_separator,
):
    return pd.read_csv(
        f'{datadir}{package_name}.{extension}',
        sep=column_separator,
        dtype={
            # 
        },
    )

# source: https://docs.python.org/3/library/itertools.html
def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

def replace_comma_with_dot(x):
    if type(x) == float:
        return str(x)
    else:
        return x.replace(',', '.')

# source (resource example): https://simplifier.net/guide/MedizininformatikInitiative-ModulMedikation-ImplementationGuide/Medication-duplicate-2
def medication_resource(
    id_,

    pzn_code,
    pzn_display,

    atc_code,
    atc_display,

    edqm_code,
    edqm_display,
):
    return {
        "resourceType": "Medication",
        "id": f"{id_}",
        "meta": {
            "profile":  [
                #"https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/StructureDefinition/Medication"
                #"WHO ATC not included"
                # "under ingredient element ASK and CAS from MMI Pharm index are included. UNII and Snomed CT are not included because i dont have their source"
                #     "Unique ingredient identifier not included http://fdasis.nlm.nih.gov"
                #      "Snomed CT: http://snomed.info/sct "
            ]
        },
        "code": {
            "coding":  [
                {
                    "system": "http://fhir.de/CodeSystem/ifa/pzn",
                    "code": f"{pzn_code}", 
                    "display": f"{pzn_display}"
                },
                {
                    "system": "http://fhir.de/CodeSystem/bfarm/atc",
                    "code": f"{atc_code}",
                    "display": f"{atc_display}"
                }
            ]
        },
        "status": "active",
        "form": {
            "coding":  [
                {
                    "system": "http://standardterms.edqm.eu",
                    "code": f"{edqm_code}",
                    "display": f"{edqm_display}"
                }
            ]
        },
        "ingredient":  [
        ]
    }

def medication_item_codeable_concept(
    ask_code,
    ask_display,
    
    cas_code,
    cas_display,
):    
        return {
            "coding":  [
                {
                    "system": "http://fhir.de/CodeSystem/ask",
                    "code": f"{ask_code}",
                    "display": f"{ask_display}"
                },
                {
                    "system": "urn:oid:2.16.840.1.113883.6.61",
                    "code": f"{cas_code}",
                    "display": f"{cas_display}"
                }
            ]
        }

def medication_strength(
    strength_numer_value,
    strength_numer_system,
    strength_numer_code,
    strength_numer_unit,
    
    strength_denom_value,
    strength_denom_system,
    strength_denom_code,
    strength_denom_unit,
):            
    return {
        "numerator": {
            "value": strength_numer_value,
            "system": f"{strength_numer_system}",
            "code": f"{strength_numer_code}",
            "unit": f"{strength_numer_unit}"
        },
        "denominator": {
            "value": strength_denom_value,
            "system": f"{strength_denom_system}",
            "code": f"{strength_denom_code}",
            "unit": f"{strength_denom_unit}"
        }
    }

def medication_ingredient_active(
    molecule_id,    
    
    ask_code,
    ask_display,

    cas_code,
    cas_display,

    strength_numer_value,
    strength_numer_code,
    strength_numer_system,
    strength_numer_unit,

    strength_denom_value,
    strength_denom_system,
    strength_denom_code,
    strength_denom_unit,
):
    return {
        "id": f"{molecule_id}",
        "extension": [
            {
                "url": "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/StructureDefinition/wirkstofftyp",
                "valueCoding": {
                    "system": "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/CodeSystem/wirkstofftyp",
                    "code": "PIN",
                    "display": "precise ingredient"
                }
            }
        ],
        
        "isActive": True,
        
        "itemCodeableConcept": medication_item_codeable_concept(
            ask_code,
            ask_display,
            
            cas_code,
            cas_display,
        ),
        
        "strength": medication_strength(
            strength_numer_value,
            strength_numer_system,
            strength_numer_code,
            strength_numer_unit,
            strength_denom_value,
            strength_denom_system,
            strength_denom_code,
            strength_denom_unit,
        ),

    }

def medication_ingredient_equi(
    molecule_id,
    active_molecule_id,
    
    ask_code,
    ask_display,

    cas_code,
    cas_display,

    strength_numer_value,
    strength_numer_code,
    strength_numer_system,
    strength_numer_unit,

    strength_denom_value,
    strength_denom_system,
    strength_denom_code,
    strength_denom_unit,
):
    return {
        "id": f"{molecule_id}",
        "extension": [
            { 
                "extension": [
                    {
                        "url": "elementUri",
                        "valueUri": f"#{active_molecule_id}"
                    }
                ],
                "url": "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/StructureDefinition/wirkstoffrelation"
            },
            {
                "url": "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/StructureDefinition/wirkstofftyp",
                "valueCoding": {
                    "system": "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/CodeSystem/wirkstofftyp",
                    "code": "IN",
                    "display": "ingredient"
                }
            },
        ],
        
        "itemCodeableConcept": medication_item_codeable_concept(
            ask_code,
            ask_display,
            
            cas_code,
            cas_display,
        ),
        
        "strength": medication_strength(
            strength_numer_value,
            strength_numer_system,
            strength_numer_code,
            strength_numer_unit,
            
            strength_denom_value,
            strength_denom_system,
            strength_denom_code,
            strength_denom_unit,
        ),
    }

def filter_list(f, xs: list):
    res = []
    for x in xs:
        if type(x) is list:
            res.append(filter_list(f, x))
        elif type(x) is dict:
            res.append(filter_dict(f, x))
        elif f(x):
            res.append(x)
        else:
            pass
    return res

def filter_dict(f, d: dict):
    res = {}
    for k,v in d.items():
        if type(v) is list:
            res[k] = filter_list(f, v)
        elif type(v) is dict:
            res[k] = filter_dict(f, v)
        elif f(v):
            res[k] = v
        else:
            pass
    return res

def filter_missing(d: dict):
    return filter_dict(
        lambda x: not (pd.isna(x) or x in ('None', 'nan')), d
    )

d = {
    'A0k': {
        'A01k': 'A01v',
        'A02k': 'A02v',
        'A03k': None,
    },
    'A1k': 'A1v',
    'A2k': float('nan'),
}
assert filter_missing(d) == {'A0k': {'A01k': 'A01v', 'A02k': 'A02v'}, 'A1k': 'A1v'}

In [None]:
pwd

'/content'

In [None]:
with open(f'{datadir}{edqm_mapping_filename}', encoding=edqm_mapping_encoding) as f:
    edqm = pd.read_csv(f, sep=';')
    
edqm.rename(
    columns=edqm_rename_columns,
    inplace=True,
)

def decode(text, source_encoding=edqm_mapping_encoding, target_encoding=target_encoding):
    if type(text) == float: return text    
    try:
        return text.encode(source_encoding).decode(target_encoding)
    except:
        # TODO make sure `text` is already encoded with `target_encoding`
        # in our use case, `text´ is not consistently encoded – this is an ad-hoc workaround
        return text

edqm.display = edqm.display.apply(
    decode
)

In [None]:
tables = {
    table_name: read(
        table_name
    ).rename(
        columns={
            id_: table_name+id_
        }
    )
     for table_name in table_names
}
for table_name, table in tables.items():
    table.rename(
        columns=dict(zip(
            table.columns,
            (c if c.endswith(id_) else c+column_name_suffix_separator+table_name for c in table.columns),
        )),
        inplace=True,
    )

  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)
  exec(code_obj, self.user_global_ns, self.user_ns)


In [None]:
print(f'ACTIVE substances. Starting with table {joins_active[0][0]}.')
res_active = tables[joins_active[0][0]]
for join in joins_active:
    for table1_name,table2_name in pairwise(join):        
        table1_columns = tables[table1_name].columns
        table2_columns = tables[table2_name].columns
        table1_id = table1_name+id_
        if table1_id in table1_columns and table1_id in table2_columns:
            index = table1_id
        else:
            index = table2_name+id_        
        print(f'Joining {table2_name} using {index}', end=' ') 
        res_active = res_active.join(
            tables[table2_name].set_index(index),
            on=index,
        )
        print('done.')
print('All ACTIVE substance tables joined.')
# only consider active substances
res_active = res_active[res_active['MOLECULETYPECODE_COMPOSITIONELEMENT'] == 'A']

ACTIVE substances. Starting with table PACKAGE.
Joining PRODUCT using PRODUCTID done.
Joining ITEM using PRODUCTID done.
Joining ITEM_COMPOSITIONELEMENT using ITEMID done.
Joining COMPOSITIONELEMENT using COMPOSITIONELEMENTID done.
Joining MOLECULE using MOLECULEID done.
Joining ITEM_ATC using ITEMID done.
All ACTIVE substance tables joined.


In [None]:
print(f'EQUIVALENT substances. Starting with table {joins[0][0]}.')
res = tables[joins[0][0]]
for join in joins:
    for table1_name,table2_name in pairwise(join):        
        table1_columns = tables[table1_name].columns
        table2_columns = tables[table2_name].columns
        table1_id = table1_name+id_
        if table1_id in table1_columns and table1_id in table2_columns:
            index = table1_id
        else:
            index = table2_name+id_        
        # use EQMOLECULEID to join COMPESITIONELEMENTEQUI with MOLECULE
        if index == 'MOLECULEID':
            assert table2_name == molecule_name
            print(f'Joining {table2_name} using (EQ){index}', end=' ') 
            res = res.join(
                tables[table2_name].set_index(index),
                on='EQMOLECULEID',
            )            
        else:
            print(f'Joining {table2_name} using {index}', end=' ') 
            res = res.join(
                tables[table2_name].set_index(index),
                on=index,
            )
        print('done.')
print('All EQUIVALENT substance tables joined.')

# only consider active substances (and their equivalents)
res = res[res['MOLECULETYPECODE_COMPOSITIONELEMENT'] == 'A']

EQUIVALENT substances. Starting with table PACKAGE.
Joining PRODUCT using PRODUCTID done.
Joining ITEM using PRODUCTID done.
Joining ITEM_COMPOSITIONELEMENT using ITEMID done.
Joining COMPOSITIONELEMENT using COMPOSITIONELEMENTID done.
Joining COMPOSITIONELEMENTEQUI using COMPOSITIONELEMENTID done.
Joining MOLECULE using (EQ)MOLECULEID done.
Joining ITEM_ATC using ITEMID done.
All EQUIVALENT substance tables joined.


In [None]:
pzns = [int(x) for x in res_active.PZN_PACKAGE.unique() if pd.notna(x)]
n = len(pzns)

assert n == len(set(pzns))

In [None]:
catalogentry = defaultdict(lambda: {
                'code': None,
                'display': None,
            })
for (index,row) in tables[catalogentry_name].iterrows():    
    catalogentry[row['CATALOGID'],row['CODE_CATALOGENTRY']] = {
        'code': row['CODE_CATALOGENTRY'],
        'display': row['NAME_CATALOGENTRY'],
    }    

In [None]:
print('Building mapping PZN -> ACTIVE substance info', end=' ')
pzn_rows_active = {
    pzn: res_active[res_active.PZN_PACKAGE == pzn] for pzn in pzns
}
print('done.')

Building mapping PZN -> ACTIVE substance info done.


In [None]:
print('Building mapping PZN -> EQUIVALENT substance info', end=' ')
pzn_rows = {
    pzn: res[res.PZN_PACKAGE == pzn] for pzn in pzns
}
print('done.')

Building mapping PZN -> EQUIVALENT substance info done.


In [None]:
# 1
# Refactor loop 
for j,pzn in enumerate(pzns):
#for j,pzn in enumerate([
#    13978380, # example with more equivalent than active substances
#]):
    fhir = {}
    print("TEST", filename_out(pzn)) 
    fno = filename_out(pzn)
    if os.path.isfile(fno):
        continue
    else:
        print(f'{100*j/n:05.1f}% done\r', end='')
        
    rows_active = pzn_rows_active[pzn] 
    
    if not len(rows_active):
        continue
        
    rows = pzn_rows[pzn]
    
    #try:
    #    assert len(rows) <= len(rows_active)
    #except AssertionError:        
    #    print(f'ERROR for PZN {pzn}: more equivalent than active substances.')
    #    continue
        
    assert len(rows_active)
    
    # create resource with basic info
    
    row_active = rows_active.iloc[0] 
    
    atc_code = row_active['ATCCODE_ITEM_ATC']

    if pd.isna(atc_code):
        print(f'INFO  for PZN {pzn}: no ATC code – skipping..')
        continue

    catalogentry_row_atc = catalogentry[row_active['ATCCATALOGID'], atc_code]

    catalogentry_row_base_unit = catalogentry[
        row_active['BASEMOLECULEUNITCATALOGID'],
        row_active['BASEMOLECULEUNITCODE_ITEM'],
    ]

    catalogentry_row_pharmform = edqm[
        (edqm.CATALOGID == row_active['PHARMFORMCATALOGID']) &
        (edqm.CODE == float(row_active['PHARMFORMCODE_ITEM']))
    ]    
    
    catalogentry_row_pharmform_mmi = catalogentry[
        row_active['PHARMFORMCATALOGID'],        
        '{:03.0f}'.format(row_active['PHARMFORMCODE_ITEM']),        
    ]    
    
    if not len(catalogentry_row_pharmform):
        print(f"ERROR for PZN {pzn}: missing EDQM info. Package: `{row_active['NAME_PACKAGE']}`")
        continue
        
    try:
        catalogentry_row_pharmform_code = int(
            catalogentry_row_pharmform['code'].values[0]
        )
    except:
        assert pd.isna(catalogentry_row_pharmform['code'].values[0])
        catalogentry_row_pharmform_code = None
        print(
            f"ERROR for PZN {pzn}: missing EDQM info. Package: `{row_active['NAME_PACKAGE']}`. Mapping: `{catalogentry_row_pharmform['display'].values[0]}`. \
MMI: `{catalogentry_row_pharmform_mmi['display']}`."
        )

    fhir = filter_missing(medication_resource(
        id_=None,
        pzn_code=pzn,
        pzn_display=row_active['NAME_PACKAGE'],
        atc_code=catalogentry_row_atc['code'],
        atc_display=catalogentry_row_atc['display'],
        edqm_code=catalogentry_row_pharmform_code,
        edqm_display=catalogentry_row_pharmform['display'].values[0],
    ))
    
    # add active ingredients info
    
    for i in range(
        len(rows_active)
    ):
        row_active = rows_active.iloc[i]
        
        molecule_unit_code_active = row_active['MOLECULEUNITCODE_COMPOSITIONELEMENT']                
        catalogentry_row_unit_active = catalogentry[row_active['MOLECULEUNITCATALOGID'], molecule_unit_code_active]

        fhir['ingredient'].append(
            filter_missing(medication_ingredient_active(
                molecule_id=int(row_active['MOLECULEID']),
                ask_code=(row_active['ASKNUMBER_MOLECULE']),
                ask_display=(row_active['NAME_MOLECULE']),
                cas_code=(row_active['CASREGISTRATIONNUMBER_MOLECULE']),
                cas_display=(row_active['NAME_MOLECULE']),
                strength_numer_system='http://unitsofmeasure.org',
                strength_numer_value=float(replace_comma_with_dot(
                    row_active['MASSFROM_COMPOSITIONELEMENT']
                )),
                strength_numer_code=molecule_unit_code_active,
                strength_numer_unit=catalogentry_row_unit_active['display'],
                strength_denom_value=float(replace_comma_with_dot(row_active['BASECOUNT_ITEM'])),
                strength_denom_code=None if float(strength_denom_value) == 1 else row_active['BASEMOLECULEUNITCODE_ITEM'],
                strength_denom_system=None if pd.isna(strength_denom_code) else 'http://unitsofmeasure.org',                
                strength_denom_unit=None if float(strength_denom_value) == 1 else catalogentry_row_base_unit['display'],
        )))  
   
    # add equivalent ingredients info
        
    for i in range(    
        len(rows)
    ):        
        row = rows.iloc[i]
        
        #  check
        molecule_id = int(row['MOLECULEID'])
        eq_molecule_id = row['EQMOLECULEID']
        
        if pd.isna(eq_molecule_id):
            continue # no equivalent ingredient
        else:
            eq_molecule_id = int(eq_molecule_id)
            try:
                assert molecule_id != eq_molecule_id
            except:
                print(f"ERROR for PZN {pzn}: molecule_id and eq_molecule_id are the same. Package: `{row_active['NAME_PACKAGE']}` \
Active-molec.: `{row_active['NAME_MOLECULE']}`. Active–unit: `{catalogentry_row_unit_active['display']}`. Equivt-molec.: `{row['NAME_MOLECULE']}`. Equivt–unit. `{catalogentry_row_unit_equivt['display']}`.")
                continue
                
        molecule_unit_code_equivt = row['EQMOLECULEUNITCODE_COMPOSITIONELEMENTEQUI']
        catalogentry_row_unit_equivt = catalogentry[row['EQMOLECULEUNITCATALOGID'], molecule_unit_code_equivt]
        
        fhir['ingredient'].append(
        filter_missing(medication_ingredient_equi(
            molecule_id=eq_molecule_id,
            active_molecule_id=molecule_id,
            ask_code=row['ASKNUMBER_MOLECULE'],
            #ask_display=(name_molecule:=row['NAME_MOLECULE']),
            ask_display=row['NAME_MOLECULE'],
            cas_code=row['CASREGISTRATIONNUMBER_MOLECULE'],
            cas_display=(row['NAME_MOLECULE']),
            strength_numer_system='http://unitsofmeasure.org',
            strength_numer_value=float(replace_comma_with_dot(
                row['EQMASSFROM_COMPOSITIONELEMENTEQUI']
            )),
            strength_numer_code=molecule_unit_code_equivt,
            strength_numer_unit=catalogentry_row_unit_equivt['display'],
            strength_denom_value=float(replace_comma_with_dot(row['BASECOUNT_ITEM'])),
            strength_denom_code=None if float(strength_denom_value) == 1 else row['BASEMOLECULEUNITCODE_ITEM'],      
            strength_denom_system=None if pd.isna(strength_denom_code) else 'http://unitsofmeasure.org',                        
            strength_denom_unit=None if float(strength_denom_value) == 1 else catalogentry_row_base_unit['display'],
        )))
        
    if fhir:
        fhir_pzns = set(d['code'] for d in fhir['code']['coding'] if d['system'].endswith('pzn'))
        assert len(fhir_pzns) == 1
        assert fno.split('/')[-1].rstrip('.json') == str(pzn) == list(fhir_pzns)[0]
        with open(fno, 'w', encoding="utf-8") as f:
            json.dump(fhir, f, indent=4, ensure_ascii=False)
            del fhir

[1;30;43mDie letzten 5000 Zeilen der Streamingausgabe wurden abgeschnitten.[0m
TEST /content/data/results/16042615.json
TEST /content/data/results/14026983.json
TEST /content/data/results/15882508.json
TEST /content/data/results/15735144.json
TEST /content/data/results/15874727.json
TEST /content/data/results/13828166.json
TEST /content/data/results/15741541.json
TEST /content/data/results/15741469.json
TEST /content/data/results/14330965.json
TEST /content/data/results/15586098.json
TEST /content/data/results/13828108.json
TEST /content/data/results/15569355.json
TEST /content/data/results/16042354.json
TEST /content/data/results/9941313.json
TEST /content/data/results/15896491.json
TEST /content/data/results/14375063.json
TEST /content/data/results/9941359.json
TEST /content/data/results/16035259.json
TEST /content/data/results/15741386.json
TEST /content/data/results/16042609.json
TEST /content/data/results/15586052.json
TEST /content/data/results/15415698.json
TEST /content/data/

In [None]:
!zip -r "/content/data/output.zip" "/content/data/results"