# Parsing Dicom files using Snowpark

Here is a quick gist for parsing Dicom file (dcm), which is present in an external stage (ex: S3) and saving the content into Snowflake table.

## Solution overview
Snowpark can read files from the stage, [doc](https://docs.snowflake.com/en/LIMITEDACCESS/snowpark-python.html#working-with-files-in-a-stage).It is also easy to read dicom file using pydicom, [doc](https://pydicom.github.io/pydicom/stable/index.html).
With this mind, I wanted to see what would it take Snowpark to read the dicom file, which is stored in an external stage and store the records into a Snowflake table.

The dicom file is read and its content is stored in Snowflake as a JSON structure. 

**Pre-requisite**
 - the external stage should be enabled as a directory table.
 
### Code Logic
 - Connect to Snowflake
 - Retreive the list of dicom files hosted in the external stage.
 - Import each dicom file into session
 - Import python additional libraries into session
 - Create the UDF, which would parse the dicom file.
 - Iterate the stage directory table, but invoking the above defined udf.
 - If successful add the data into a staging table.

### Hurdles
Before rushing into the code, the steps was not simple as originally thought. The following are some hurdles that I had to overcome, to make the solution work.
#### Pydicom library
The pydicom library is not part of Snowflake python packages. Hence it needs to 
be imported dynamically. To overcome this 
 - we upload the library in the external stage
 - at runtime, we extract the libraries into a temporary folder
 - we add the folder to system path, this allows the python classes to be imported.

#### Staging Dicom file
Currently python udf, cannot read files from the stage unless it is part of the import. To overcome this, I iterate through the stage folder where the dcm files are present and add each file as an import. 

**Note:** I am sure there could be some limitations on the number of files that can be imported and or total combined size of files.

## Thoughts
Call me ignoramus, I honestly don't know much of the importance of these Dicom files as I have not been much exposed in HCLs. But based on ask from fellow Snowflakers, I thought why not try out and share. Hence if you can connect and let me know how if this helps out, it will be great!!


---

In [14]:
# Initialize the Snowpark session

import os ,json
import dotenv 
import pandas as pd
from snowflake.snowpark import Session

#Load the login information from env file
dotenv.load_dotenv('./sflk.env')

#Create a snowpark session
connection_parameters = {
  "account": os.getenv('DEMO_ACCOUNT'),
  "user": os.getenv('DEMO_USER'),
  "password": os.getenv('DEMO_PWD'),
  "role": "sysadmin",
  "warehouse": os.getenv('DEMO_WH'),
  "database": os.getenv('DEMO_DB'),
  "schema": os.getenv('DEMO_SCH')
}

session = Session.builder.configs(connection_parameters).create()
#print(session.sql("select current_account() ,current_warehouse(), current_database(), current_schema()").collect())

In [15]:

# Iterate the stage 'stg_hl7data', specifically the folder 'datasets/dicom' and retrieve the list of files
stage_name = '@stg_hl7data'

session.sql(f'''alter stage stg_hl7data refresh;''').collect()

data_files = session.sql(f'''
    select 
        concat('{stage_name}/' ,relative_path) as full_path
    from directory({stage_name} )
    where relative_path like 'datasets/dicom/%';
''').collect()

# Clear any previous imports
session.clear_imports()

# Import each file into the session
for data_fl_row in data_files:
    fl_path = f'{data_fl_row[0]}'
    #print(f'Adding file to imXLS : {fl_path}')
    session.add_import(fl_path)
    
# Add the additional required libraries needed for Pandas
libs_to_extract = ['pydicom-master.zip']
for lib in libs_to_extract:
    session.add_import(f'{stage_name}/pyfn_lib/{lib}')

# List out the imports, for debugging purposes.
session.get_imports()

['@stg_hl7data/datasets/dicom/rtdose.dcm',
 '@stg_hl7data/datasets/dicom/rtdose_1frame.dcm',
 '@stg_hl7data/datasets/dicom/rtdose_expb.dcm',
 '@stg_hl7data/datasets/dicom/rtdose_expb_1frame.dcm',
 '@stg_hl7data/datasets/dicom/rtdose_rle.dcm',
 '@stg_hl7data/datasets/dicom/rtdose_rle_1frame.dcm',
 '@stg_hl7data/datasets/dicom/rtplan.dcm',
 '@stg_hl7data/datasets/dicom/rtplan_truncated.dcm',
 '@stg_hl7data/datasets/dicom/rtstruct.dcm',
 '@stg_hl7data/pyfn_lib/pydicom-master.zip']

In [16]:
##
# Define the udf, which would parse the pydicom file
#

from snowflake.snowpark.udf import *
from snowflake.snowpark.types import Variant;
from snowflake.snowpark.functions import *
from snowflake.snowpark import *


'''
The UDF has the following paramter:
 - p_dicom_file_path : the full path to the dicom file. ex: @stg_hl7data/datasets/dicom/rtplan.dcm
'''
@udf(session=session ,name="dicomparser_snowpy" ,replace=True)
def dicomparser_snowpy(p_dicom_file_path: str) -> Variant:
    import os ,sys ,json ,tarfile
    import importlib.util
    from pathlib import Path

    # Extract the third party libraries into tmp folder and dynamically import
    IMPORT_DIR = sys._xoptions["snowflake_import_directory"]
    TARGET_FOLDER = f'/tmp/dicomparser_snowpy' + str(os.getpid())    
    libs_to_extract = ['pydicom-master.zip']
    for lib in libs_to_extract:
        PACKAGE_FNAME = lib.replace('.zip', '')
        TARGET_LIB_PATH = f'{TARGET_FOLDER}/{PACKAGE_FNAME}/'
        Path(f'{TARGET_LIB_PATH}').mkdir(parents=True, exist_ok=True)
        
        with zipfile.ZipFile(f'{IMPORT_DIR}{lib}', 'r') as zip_ref:
            zip_ref.extractall(TARGET_FOLDER)
    
        #Add the extracted folder to sys path
        sys.path.insert(0 ,TARGET_LIB_PATH )
    
    #Import should be done, only after inserting the target_lib_path into the path
    import pydicom

    # an utility method to list the content of the 
    # a directory. meant for debugging needs. for ex: to find the
    # list of files in the import directory
    def list_directory(p_dir):
        fls = []
        for root, dirs, files in os.walk(p_dir):
            for file in files:
                fls.append(f'{root}{dirs}{file}')
        
        dat = {}
        dat['files'] = fls
        return json.dumps(dat)

    # The udf to handle the parsing
    def udf(p_file_path):
        #TODO wrap this in a try/catch for better code
        
        fl_full_path = f'{IMPORT_DIR}{p_file_path}'
        ds = pydicom.dcmread(fl_full_path, force=True)
        return ds.to_json()
    
    # -------------- MAIN ---------------
    return udf(p_dicom_file_path)
    # return list_directory(IMPORT_DIR)

In [17]:
session.sql('''select dicomparser_snowpy('rtplan.dcm')''').collect()

[Row(DICOMPARSER_SNOWPY('RTPLAN.DCM')='"{\\"00080012\\": {\\"Value\\": [\\"20030903\\"], \\"vr\\": \\"DA\\"}, \\"00080013\\": {\\"Value\\": [\\"150031\\"], \\"vr\\": \\"TM\\"}, \\"00080016\\": {\\"Value\\": [\\"1.2.840.10008.5.1.4.1.1.481.5\\"], \\"vr\\": \\"UI\\"}, \\"00080018\\": {\\"Value\\": [\\"1.2.777.777.77.7.7777.7777.20030903150023\\"], \\"vr\\": \\"UI\\"}, \\"00080020\\": {\\"Value\\": [\\"20030716\\"], \\"vr\\": \\"DA\\"}, \\"00080030\\": {\\"Value\\": [\\"153557\\"], \\"vr\\": \\"TM\\"}, \\"00080050\\": {\\"vr\\": \\"SH\\"}, \\"00080060\\": {\\"Value\\": [\\"RTPLAN\\"], \\"vr\\": \\"CS\\"}, \\"00080070\\": {\\"Value\\": [\\"Manufacturer name here\\"], \\"vr\\": \\"LO\\"}, \\"00080080\\": {\\"Value\\": [\\"Here\\"], \\"vr\\": \\"LO\\"}, \\"00080090\\": {\\"vr\\": \\"PN\\"}, \\"00081010\\": {\\"Value\\": [\\"COMPUTER002\\"], \\"vr\\": \\"SH\\"}, \\"00081040\\": {\\"Value\\": [\\"Radiation Therap\\"], \\"vr\\": \\"LO\\"}, \\"00081070\\": {\\"Value\\": [{\\"Alphabetic\\": \\"op

In [18]:
df = session.sql('''
select 
    relative_path,
    
    -- the absolute path
    concat('@stg_hl7data/' ,relative_path) as full_path,
    
    -- we need to get the xls file name from the path, we achieve this by
    -- spliting the relative path into array and taking the last component 
    -- which is the file name
    split(relative_path, '/') as fl_splits,
    get(fl_splits, 
        array_size(fl_splits)-1)::varchar as dcm_fl
        
    -- involve the udf to parse the dcm file
    ,parse_json(
        dicomparser_snowpy(dcm_fl) 
    ) as dicom_fl_parsed
from directory( @stg_hl7data )
where relative_path like 'datasets/dicom/%'
''')
df.show(1)

---------------------------------------------------------------------------------------------------------------------------------------------------------
|"RELATIVE_PATH"            |"FULL_PATH"                             |"FL_SPLITS"     |"DCM_FL"    |"DICOM_FL_PARSED"                                   |
---------------------------------------------------------------------------------------------------------------------------------------------------------
|datasets/dicom/rtdose.dcm  |@stg_hl7data/datasets/dicom/rtdose.dcm  |[               |rtdose.dcm  |{                                                   |
|                           |                                        |  "datasets",   |            |  "00080012": {                                     |
|                           |                                        |  "dicom",      |            |    "Value": [                                      |
|                           |                                        |  "rtd

In [20]:
# store as table
df.write.mode("overwrite").save_as_table("dicom_parsed_from_stage")

df_tbl = session.table("dicom_parsed_from_stage")

display(df_tbl.select('DICOM_FL_PARSED').to_pandas())

Unnamed: 0,DICOM_FL_PARSED
0,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
1,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
2,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
3,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
4,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
5,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
6,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
7,"{\n ""00080012"": {\n ""Value"": [\n ""200..."
8,"{\n ""00080005"": {\n ""Value"": [\n ""ISO..."


In [21]:
#close the snowpark session
session.close()