Skip to content

Commit

Permalink
Merge pull request #54 from jjjermiah/organize_project_todo
Browse files Browse the repository at this point in the history
Organize project todo
  • Loading branch information
jjjermiah committed Jan 28, 2024
2 parents d3e47e2 + 1686564 commit 5841645
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 215 deletions.
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__version__ = "0.10.7"

setup(
name="nbiatoolkit",
version=__version__,
# And so on...
)
name="nbiatoolkit",
version=__version__,
# And so on...
)
10 changes: 2 additions & 8 deletions src/nbiatoolkit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## __init__.py
# Path: projects/nbia-toolkit/src/__init__.py

# this is the __init__.py file
# this is the __init__.py file
# this file is run when the package is imported
# this file is used to import all the modules in the package
# this file is used to define the __all__ variable
Expand All @@ -16,10 +16,4 @@
from .utils.nbia_endpoints import NBIA_ENDPOINTS

# define the __all__ variable
__all__ = [
'NBIAClient',
'OAuth2',
'setup_logger',
'NBIA_ENDPOINTS',
'__version__'
]
__all__ = ["NBIAClient", "OAuth2", "setup_logger", "NBIA_ENDPOINTS", "__version__"]
47 changes: 24 additions & 23 deletions src/nbiatoolkit/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
import time
from typing import Union


class OAuth2:
"""
OAuth2 class for handling authentication and access token retrieval.
This class provides methods to authenticate with the NBIA API using OAuth2
and retrieve the access token required for accessing the API.
Defaults to using the NBIA Guest for accessing public collections.
If you have a username and password which has been granted access
to the collections tagged with "limited access" you can use those
credentials to access those collections.
Attributes
----------
Expand All @@ -39,7 +40,7 @@ class OAuth2:
Methods
-------
getToken()
Authenticates with the API. Returns API headers containing the
Authenticates with the API. Returns API headers containing the
access token.
Example Usage
Expand All @@ -66,7 +67,9 @@ class OAuth2:
this class, please open an issue on the GitHub repository.
"""

def __init__(self, username: str = "nbia_guest", password: str = "", client_id: str = "NBIA"):
def __init__(
self, username: str = "nbia_guest", password: str = "", client_id: str = "NBIA"
):
"""
Initialize the OAuth2 class.
Expand Down Expand Up @@ -104,7 +107,7 @@ def getToken(self) -> Union[dict, int]:
>>> from nbiatoolkit import OAuth2
>>> oauth = OAuth2()
>>> api_headers = oauth.getToken()
>>> requests.get(url=query_url, headers=api_headers)
"""
# Check if the access token is valid and not expired
Expand All @@ -113,37 +116,35 @@ def getToken(self) -> Union[dict, int]:

# Prepare the request data
data = {
'username': self.username,
'password': self.password,
'client_id': self.client_id,
'grant_type': 'password'
"username": self.username,
"password": self.password,
"client_id": self.client_id,
"grant_type": "password",
}
token_url = 'https://services.cancerimagingarchive.net/nbia-api/oauth/token'
token_url = "https://services.cancerimagingarchive.net/nbia-api/oauth/token"

response = requests.post(token_url, data=data)


try:
response = requests.post(token_url, data=data)
response.raise_for_status() # Raise an HTTPError for bad responses
except requests.exceptions.RequestException as e:
self.access_token = -1
raise requests.exceptions.RequestException(\
f'Failed to get access token. Status code:\
{response.status_code}') from e
raise requests.exceptions.RequestException(
f"Failed to get access token. Status code:\
{response.status_code}"
) from e
else:
# Code to execute if there is no exception
token_data = response.json()
self.access_token = token_data.get('access_token')
self.access_token = token_data.get("access_token")

self.api_headers = {
'Authorization': f'Bearer {self.access_token}'
}
self.api_headers = {"Authorization": f"Bearer {self.access_token}"}

self.expiry_time = time.ctime(time.time() + token_data.get('expires_in'))
self.refresh_token = token_data.get('refresh_token')
self.refresh_expiry = token_data.get('refresh_expires_in')
self.scope = token_data.get('scope')
self.expiry_time = time.ctime(time.time() + token_data.get("expires_in"))
self.refresh_token = token_data.get("refresh_token")
self.refresh_expiry = token_data.get("refresh_expires_in")
self.scope = token_data.get("scope")

return self.api_headers

Expand All @@ -169,4 +170,4 @@ def headers(self):
api_headers : dict or None
The authentication headers containing the access token.
"""
return self.api_headers
return self.api_headers
7 changes: 1 addition & 6 deletions src/nbiatoolkit/dicomsort/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,4 @@
from .dicomsort import DICOMSorter
from .helper_functions import parseDICOMKeysFromFormat, sanitizeFileName, truncateUID

__all__ = [
"DICOMSorter",
"parseDICOMKeysFromFormat",
"sanitizeFileName",
"truncateUID"
]
__all__ = ["DICOMSorter", "parseDICOMKeysFromFormat", "sanitizeFileName", "truncateUID"]
101 changes: 55 additions & 46 deletions src/nbiatoolkit/dicomsort/dicomsort.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import re, os, sys, shutil
import pydicom
import pydicom

from pydicom.filereader import InvalidDicomError

from .helper_functions import parseDICOMKeysFromFormat, sanitizeFileName, truncateUID


class DICOMSorter:

def __init__(
self, sourceDir: str, destinationDir: str, targetPattern: str = None,
truncateUID: bool = True, sanitizeFilename: bool = True
):
self,
sourceDir: str,
destinationDir: str,
targetPattern: str = None,
truncateUID: bool = True,
sanitizeFilename: bool = True,
):
"""
Initialize the DICOMSorter with a target pattern.
"""
self.targetPattern = targetPattern
self.sourceDir = sourceDir
self.destinationDir = destinationDir
self.truncateUID=truncateUID
self.sanitizeFilename=sanitizeFilename
self.truncateUID = truncateUID
self.sanitizeFilename = sanitizeFilename

def generateFilePathFromDICOMAttributes(
self, dataset: pydicom.dataset.FileDataset
Expand All @@ -33,30 +36,37 @@ def generateFilePathFromDICOMAttributes(
# Prepare the replacements dictionary with sanitized attribute values
for key in keys:
# Retrieve the attribute value if it exists or default to a placeholder string
value = str(getattr(dataset, key, 'Unknown' + key))

value = truncateUID(value) if key.endswith("UID") and self.truncateUID else value

replacements[key] = sanitizeFileName(value) if self.sanitizeFilename else value
value = str(getattr(dataset, key, "Unknown" + key))

value = (
truncateUID(value)
if key.endswith("UID") and self.truncateUID
else value
)

replacements[key] = (
sanitizeFileName(value) if self.sanitizeFilename else value
)

# Build the target path by interpolating the replacement values
return fmt % replacements



def sortSingleDICOMFile(
self, filePath: str, option: str, overwrite: bool = False
) -> bool:
assert option in ["copy", "move"], "Invalid option: symlink not implemented yet"
try:

try:
dataset = pydicom.dcmread(filePath, stop_before_pixels=True)
except InvalidDicomError as e:
print(f"Error reading file {filePath}: {e}")
return False
except TypeError as e:
print(f"Error reading file {filePath}: is ``None`` or of an unsupported type.")
return False

print(
f"Error reading file {filePath}: is ``None`` or of an unsupported type."
)
return False

name = self.generateFilePathFromDICOMAttributes(dataset)
assert name is not None and isinstance(name, str)

Expand All @@ -65,53 +75,52 @@ def sortSingleDICOMFile(
if os.path.exists(targetFilename) and not overwrite:
print(f"Source File: {filePath}\n")
print(f"File {targetFilename} already exists. ")
sys.exit("Pattern is probably not unique or overwrite is set to False. Exiting.")

sys.exit(
"Pattern is probably not unique or overwrite is set to False. Exiting."
)

os.makedirs(os.path.dirname(targetFilename), exist_ok=True)

match option:
case "copy":
shutil.copyfile(src = filePath, dst=targetFilename)
shutil.copyfile(src=filePath, dst=targetFilename)
case "move":
shutil.move(src = filePath, dst=targetFilename)
shutil.move(src=filePath, dst=targetFilename)

return True

def sortDICOMFiles(
self, option: str = "copy", overwrite: bool = False
) -> bool:

def sortDICOMFiles(self, option: str = "copy", overwrite: bool = False) -> bool:
dicom_file_paths = self._get_dicom_files()

results = [self.sortSingleDICOMFile(file, option, overwrite) for file in dicom_file_paths]
results = [
self.sortSingleDICOMFile(file, option, overwrite)
for file in dicom_file_paths
]

return all(results)


def _get_dicom_files(self) -> list[str]:
dicom_file_paths = []
# Iterate over all files in the source directory
for root, dirs, files in os.walk(self.sourceDir):
for f in files:
dicom_file_paths.append(os.path.join(root, f)) if f.endswith(".dcm") else None
dicom_file_paths.append(os.path.join(root, f)) if f.endswith(
".dcm"
) else None

return dicom_file_paths


# Test case
# if __name__ == "__main__":

# sorter = DICOMSorter(
# sourceDir = sourceDir,
# destinationDir=destinationDir,
# targetPattern=pattern,
# truncateUID=True,
# sanitizeFilename=True,
# overwrite=True
# )

# sorter.sortDICOMFiles(option="move")




# sorter = DICOMSorter(
# sourceDir = sourceDir,
# destinationDir=destinationDir,
# targetPattern=pattern,
# truncateUID=True,
# sanitizeFilename=True,
# overwrite=True
# )

# sorter.sortDICOMFiles(option="move")
17 changes: 10 additions & 7 deletions src/nbiatoolkit/dicomsort/helper_functions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import re
from typing import Tuple, List


def parseDICOMKeysFromFormat(targetPattern: str) -> Tuple[str, List[str]]:
"""
Parse the target pattern to create a format string with named placeholders
Expand All @@ -19,13 +21,14 @@ def parseDICOMKeysFromFormat(targetPattern: str) -> Tuple[str, List[str]]:
"""

# Compile the regex pattern for efficiency
dicom_key_pattern = re.compile(r'%([A-Za-z]+)')
dicom_key_pattern = re.compile(r"%([A-Za-z]+)")
keys = dicom_key_pattern.findall(targetPattern)
# Use the same compiled pattern for replacing
formatted_pattern = dicom_key_pattern.sub(r'%(\1)s', targetPattern)
formatted_pattern = dicom_key_pattern.sub(r"%(\1)s", targetPattern)

return formatted_pattern, keys


def sanitizeFileName(fileName: str) -> str:
"""
Sanitize the file name by replacing potentially dangerous characters.
Expand All @@ -35,14 +38,14 @@ def sanitizeFileName(fileName: str) -> str:
# Define a pattern for disallowed filename characters and their replacements
disallowed_characters_pattern = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
# Replace disallowed characters with an underscore
sanitized_name = disallowed_characters_pattern.sub('_', fileName)
sanitized_name = disallowed_characters_pattern.sub("_", fileName)

# replace spaces with underscores
sanitized_name = sanitized_name.replace(" ", "_")

# Remove subsequent underscores
sanitized_name = re.sub(r'(_{2,})', '_', sanitized_name)
sanitized_name = re.sub(r"(_{2,})", "_", sanitized_name)

return sanitized_name


Expand Down
4 changes: 1 addition & 3 deletions src/nbiatoolkit/logger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from .logger import setup_logger

__all__ = [
'setup_logger'
]
__all__ = ["setup_logger"]

0 comments on commit 5841645

Please sign in to comment.