-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add functions to query a dicomtags dataframe from NBIA
- Loading branch information
Showing
2 changed files
with
380 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
from pydicom.datadict import dictionary_VR | ||
from pydicom.datadict import tag_for_keyword | ||
from pydicom._dicom_dict import DicomDictionary | ||
import pandas as pd | ||
from typing import Any, Union, List | ||
|
||
|
||
def convert_element_to_int(element_str: str) -> int: | ||
""" | ||
Converts a DICOM element string representation to an integer. | ||
Args: | ||
element_str (str): The DICOM element string representation. | ||
Returns: | ||
int: The converted integer value. | ||
Examples: | ||
>>> convert_element_to_int('(0028,0010)') | ||
2621456 | ||
Raises: | ||
ValueError: If the element format is invalid. | ||
""" | ||
if element_str.startswith(">"): | ||
return -1 | ||
|
||
elements: list[str] = element_str.strip("()'").split(",") | ||
|
||
# Check if the element has the correct structure | ||
if len(elements) != 2: | ||
raise ValueError( | ||
f"Invalid element format. Element must have the structure '(<INT>,<INT>)': {element_str}" | ||
) | ||
|
||
# Convert each element from string to integer | ||
elements_int: list[int] | ||
elements_int = [int(elem, base=16) for elem in elements] | ||
|
||
# Combine the integers into a single integer | ||
combined_int: int = (elements_int[0] << 16) + elements_int[1] | ||
|
||
return combined_int | ||
|
||
|
||
def convert_int_to_element(combined_int: int) -> str: | ||
""" | ||
Converts a combined integer into a DICOM element string representation. | ||
Args: | ||
combined_int (int): The combined integer to be converted. | ||
Returns: | ||
str: The DICOM element string representation of the combined integer. | ||
Examples: | ||
>>> convert_int_to_element(0x00080060) | ||
(0008,0060) | ||
>>> convert_int_to_element(524384) | ||
(0008,0060) | ||
""" | ||
assert isinstance(combined_int, int), "combined_int must be an integer." | ||
|
||
if combined_int == -1: | ||
return "Unknown" | ||
|
||
# Split the integer into two parts | ||
# i.e 131073 should become (2,1) | ||
part1: int = combined_int >> 16 # shift right by 16 bits | ||
part2: int = combined_int & 0xFFFF # bitwise AND with 0xFFFF (16 bits) | ||
|
||
# Convert the integers to hex strings | ||
part1_str: str = hex(part1)[2:] | ||
part2_str: str = hex(part2)[2:] | ||
|
||
# (2,1) should become (0002,0001) | ||
part1_str = part1_str.zfill(4) | ||
part2_str = part2_str.zfill(4) | ||
|
||
# capitalize any lowercase letters | ||
part1_str = part1_str.upper() | ||
part2_str = part2_str.upper() | ||
|
||
# Combine the hex strings into a single string | ||
combined_str: str = f"({part1_str},{part2_str})" | ||
|
||
return combined_str | ||
|
||
|
||
def LOOKUP_TAG(keyword: str) -> int: | ||
""" | ||
Looks up a DICOM tag based on the keyword. A wrapper around the pydicom's `tag_for_keyword` function. | ||
""" | ||
tag: int | None = tag_for_keyword(keyword=keyword) | ||
if tag is None: | ||
raise (ValueError(f"Tag not found for keyword: {keyword}")) | ||
return tag | ||
|
||
|
||
def element_VR_lookup(element_str: str) -> tuple[int, str]: | ||
""" | ||
Looks up the VR (Value Representation) for a given DICOM element. | ||
Args: | ||
element_str (str): The DICOM element as a string. | ||
Returns: | ||
tuple[int, str]: A tuple containing the combined integer representation of the element and its VR. | ||
""" | ||
|
||
combined_int: int = convert_element_to_int(element_str=element_str) | ||
if combined_int == -1: | ||
return (-1, "Unknown") | ||
|
||
VR: str | ||
try: | ||
VR = dictionary_VR(tag=combined_int) | ||
except KeyError: | ||
VR = "Unknown,KeyError" | ||
|
||
return (combined_int, VR) | ||
|
||
|
||
def getSeriesModality(series_tags_df: pd.DataFrame) -> str: | ||
""" | ||
Retrieves the modality of a DICOM series. | ||
Args: | ||
series_tags_df (pd.DataFrame): A DataFrame containing DICOM series tags. | ||
Returns: | ||
str: The modality of the DICOM series. | ||
Raises: | ||
ValueError: If the modality tag is not found in the DICOM dictionary. | ||
""" | ||
modality_tag: int | None | ||
modality_tag = LOOKUP_TAG(keyword="Modality") | ||
|
||
if modality_tag is None: | ||
raise ValueError("Modality tag not found in the DICOM dictionary.") | ||
|
||
modality_tag_element: str = convert_int_to_element(combined_int=modality_tag) | ||
|
||
modality_row: pd.DataFrame = series_tags_df[ | ||
series_tags_df["element"] == modality_tag_element | ||
] | ||
modality: str = modality_row["data"].values[0] | ||
|
||
return modality | ||
|
||
|
||
def subsetSeriesTags(series_tags_df: pd.DataFrame, element: str) -> pd.DataFrame: | ||
""" | ||
Subsets a DataFrame containing DICOM series tags based on the start and end elements. | ||
""" | ||
|
||
locs: pd.DataFrame | ||
locs = series_tags_df[series_tags_df["element"].str.contains(element)] | ||
|
||
if len(locs) == 0: | ||
raise ValueError("Element not found in the series tags.") | ||
|
||
if len(locs) > 2: | ||
raise ValueError("More than two elements found in the series tags.") | ||
|
||
return series_tags_df.iloc[locs.index[0] : locs.index[1]] | ||
|
||
|
||
def getReferencedFrameOfReferenceSequence(series_tags_df: pd.DataFrame) -> pd.DataFrame: | ||
modality = getSeriesModality(series_tags_df=series_tags_df) | ||
if modality != "RTSTRUCT": | ||
raise ValueError("Series is not an RTSTRUCT.") | ||
|
||
tag: int = LOOKUP_TAG(keyword="ReferencedFrameOfReferenceSequence") | ||
|
||
ReferencedFrameOfReferenceSequence_element: str = convert_int_to_element( | ||
combined_int=tag | ||
) | ||
|
||
df: pd.DataFrame = subsetSeriesTags( | ||
series_tags_df=series_tags_df, | ||
element=ReferencedFrameOfReferenceSequence_element, | ||
) | ||
|
||
return df | ||
|
||
|
||
def getReferencedSeriesUIDS(series_tags_df: pd.DataFrame) -> List[str]: | ||
""" | ||
Given a DataFrame containing DICOM series tags, retrieves the SeriesInstanceUIDs of the referenced series. | ||
Useful for RTSTRUCT DICOM files to find the series that the RTSTRUCT references. | ||
TODO:: implement SEG and RTDOSE | ||
Args: | ||
series_tags_df (pd.DataFrame): A DataFrame containing DICOM series tags. | ||
Returns: | ||
List[str]: A list of SeriesInstanceUIDs of the referenced series. | ||
Raises: | ||
ValueError: If the series is not an RTSTRUCT. | ||
""" | ||
|
||
# "SeriesInstanceUID" ---LOOKUP_TAG--> 2097166 ---convert_int_to_element--> (0020,000E) | ||
SeriesInstanceUIDtag: int = LOOKUP_TAG(keyword="SeriesInstanceUID") | ||
SeriesInstanceUID_element: str = convert_int_to_element( | ||
combined_int=SeriesInstanceUIDtag | ||
) | ||
|
||
search_space: pd.DataFrame = getReferencedFrameOfReferenceSequence( | ||
series_tags_df=series_tags_df | ||
) | ||
|
||
value: pd.DataFrame = search_space[ | ||
search_space["element"].str.contains(SeriesInstanceUID_element) | ||
] | ||
|
||
UIDS: list[str] = value["data"].to_list() | ||
|
||
return UIDS | ||
|
||
|
||
if __name__ == "__main__": | ||
from nbiatoolkit import NBIAClient | ||
|
||
test = convert_element_to_int("(0028,0010)") | ||
|
||
print(test) | ||
|
||
print(convert_int_to_element(test)) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
import pytest | ||
from src.nbiatoolkit import NBIAClient | ||
from src.nbiatoolkit.dicomtags.tags import convert_int_to_element | ||
from src.nbiatoolkit.dicomtags.tags import convert_element_to_int | ||
from src.nbiatoolkit.dicomtags.tags import LOOKUP_TAG | ||
from src.nbiatoolkit.dicomtags.tags import * | ||
|
||
|
||
def test_convert_int_to_element(): | ||
# Test case 1: combined_int = 0x00080060 | ||
result = convert_int_to_element(0x00080060) | ||
assert result == "(0008,0060)", f"Expected (0008,0060), but got {result}" | ||
|
||
# Test case 2: combined_int = 524384 | ||
result = convert_int_to_element(524384) | ||
assert result == "(0008,0060)", f"Expected (0008,0060), but got {result}" | ||
|
||
# Test case 3: combined_int = -1 | ||
result = convert_int_to_element(-1) | ||
assert result == "Unknown", f"Expected Unknown, but got {result}" | ||
|
||
# Test case 4: combined_int = 0 | ||
result = convert_int_to_element(0) | ||
assert result == "(0000,0000)", f"Expected (0000,0000), but got {result}" | ||
|
||
# Test case 5: combined_int = 65535 | ||
result = convert_int_to_element(65535) | ||
assert result == "(0000,FFFF)", f"Expected (0000,FFFF), but got {result}" | ||
|
||
print("All test cases passed!") | ||
|
||
|
||
def test_convert_element_to_int(): | ||
# Test case 1: element_str = '(0028,0010)' | ||
result = convert_element_to_int("(0028,0010)") | ||
assert result == 2621456, f"Expected 2621456, but got {result}" | ||
|
||
# Test case 2: element_str = '(0008,0060)' | ||
result = convert_element_to_int("(0008,0060)") | ||
assert result == 524384, f"Expected 524384, but got {result}" | ||
|
||
# Test case 3: element_str = '(0000,0000)' | ||
result = convert_element_to_int("(0000,0000)") | ||
assert result == 0, f"Expected 0, but got {result}" | ||
|
||
# Test case 4: element_str = '(0000,FFFF)' | ||
result = convert_element_to_int("(0000,FFFF)") | ||
assert result == 65535, f"Expected 65535, but got {result}" | ||
|
||
# Test case 5: element_str = '>Unknown' | ||
result = convert_element_to_int(">Unknown") | ||
assert result == -1, f"Expected -1, but got {result}" | ||
|
||
print("All test cases passed!") | ||
|
||
|
||
def test_LOOKUP_TAG(): | ||
# Test case 1: keyword = "PatientName" | ||
result = LOOKUP_TAG("PatientName") | ||
assert result == 0x00100010, f"Expected 0x00100010, but got {result}" | ||
|
||
# Test case 2: keyword = "StudyDate" | ||
result = LOOKUP_TAG("StudyDate") | ||
assert result == 0x00080020, f"Expected 0x00080020, but got {result}" | ||
|
||
# Test case 3: keyword = "UnknownKeyword" | ||
try: | ||
LOOKUP_TAG("UnknownKeyword") | ||
except ValueError as e: | ||
assert ( | ||
str(e) == "Tag not found for keyword: UnknownKeyword" | ||
), f"Expected 'Tag not found for keyword: UnknownKeyword', but got {str(e)}" | ||
else: | ||
assert False, "Expected ValueError to be raised for unknown keyword" | ||
|
||
print("All test cases passed!") | ||
|
||
|
||
def test_element_VR_lookup(): | ||
# Test case 1: element_str = '(0028,0010)' | ||
result = element_VR_lookup("(0028,0010)") | ||
assert result == (2621456, "US"), f"Expected (2621456, 'US'), but got {result}" | ||
|
||
# Test case 2: element_str = '(0008,0060)' | ||
result = element_VR_lookup("(0008,0060)") | ||
assert result == (524384, "CS"), f"Expected (524384, 'CS'), but got {result}" | ||
|
||
# Test case 3: element_str = '(0000,0000)' | ||
result = element_VR_lookup("(0000,0000)") | ||
assert result == (0, "UL"), f"Expected (0, 'UL'), but got {result}" | ||
|
||
# Test case 4: element_str = '(0000,FFFF)' | ||
result = element_VR_lookup("(0000,FFFF)") | ||
assert result == ( | ||
65535, | ||
"Unknown,KeyError", | ||
), f"Expected (65535, 'Unknown,KeyError'), but got {result}" | ||
|
||
# Test case 5: element_str = '>Unknown' | ||
result = element_VR_lookup(">Unknown") | ||
assert result == (-1, "Unknown"), f"Expected (-1, 'Unknown'), but got {result}" | ||
|
||
print("All test cases passed!") | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def client(): | ||
return NBIAClient(return_type="dataframe") | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def series(client): | ||
return client.getSeries(Collection="Pediatric-CT-SEG", Modality="RTSTRUCT") | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def RTSTRUCT_Series(series): | ||
return series[series["Modality"] == "RTSTRUCT"] | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def RTSTRUCT_Tags(client, RTSTRUCT_Series): | ||
seriesUID = RTSTRUCT_Series["SeriesInstanceUID"].values[0] | ||
assert seriesUID is not None, "Expected seriesUID to be returned, but got None" | ||
|
||
tags_df = client.getDICOMTags(seriesUID) | ||
assert tags_df is not None, "Expected tags to be returned, but got None" | ||
|
||
return tags_df | ||
|
||
|
||
def test_getDICOMTags(RTSTRUCT_Tags): | ||
seriesUIDS = getReferencedSeriesUIDS(RTSTRUCT_Tags) | ||
expected = ["1.3.6.1.4.1.14519.5.2.1.133742245714270925254982946723351496764"] | ||
assert seriesUIDS == expected, f"Expected {expected}, but got {seriesUIDS}" | ||
|
||
|
||
def test_getSeriesModality(RTSTRUCT_Tags): | ||
modality = getSeriesModality(RTSTRUCT_Tags) | ||
assert modality == "RTSTRUCT", f"Expected 'RTSTRUCT', but got {modality}" | ||
|
||
# Test case 2: modality tag not found | ||
RTSTRUCT_Tags.drop( | ||
RTSTRUCT_Tags[RTSTRUCT_Tags["element"] == "(0008,0060)"].index, inplace=True | ||
) | ||
|
||
with pytest.raises(IndexError) as e: | ||
getSeriesModality(RTSTRUCT_Tags) |