In [4]:
import os
import shutil
import pandas as pd
from datetime import datetime
from support import db_read, count_folders_and_files, copy_files_make_zip, process_path

dbpath = "sqlite:///D:/DICOM DATABASE/DICOM IMPORT/ctkDICOM.sql"
root_dir = "D:\\DICOM DATABASE\\DICOM IMPORT"
output_dir = "D:\\python\\ddis zip testing"

In [12]:
def db_read(dbpath):
    """
    Read data from the specified SQLite database file and perform necessary data processing.

    Parameters:
    - dbpath (str): Path to the SQLite database file.

    Returns:
    - pd.DataFrame: Processed DataFrame containing relevant information from the database.

    This function reads tables from an SQLite database file, performs joins and filtering,
    and returns a DataFrame with selected columns.

    """
    try:
        # Read Images, Series, Studies, and Patients tables from the database
        images = pd.read_sql_table("Images", dbpath)
        series = pd.read_sql_table("Series", dbpath)

        study_selected_columns = ['StudyInstanceUID', 'StudyID', 'PatientsUID']
        studies = pd.read_sql_table("Studies", dbpath, columns=study_selected_columns)

        patients_selected_columns = ['UID', 'PatientID', 'PatientsName']
        patients = pd.read_sql_table("Patients", dbpath, columns=patients_selected_columns)

        # Unique Series Instance UID in Images table
        unique_images = images.drop_duplicates("SeriesInstanceUID")

        # join unique_images dataframe and series table based on the SeriesInstanceUID
        images_series = pd.merge(unique_images, series, on="SeriesInstanceUID", how="inner")

        # join images_series dataframe and studies table based on the StudyInstanceUID
        images_series_studies = pd.merge(studies, images_series, on="StudyInstanceUID", how="inner")
        unique_images_series_studies = images_series_studies.drop_duplicates("StudyInstanceUID")

        unique_images_series_studies_patients = pd.merge(unique_images_series_studies, patients, left_on="PatientsUID", right_on="UID", how="inner")
        unique_images_series_studies_patients_ = unique_images_series_studies_patients  #.drop_duplicates("PatientsUID")

        colnames = ["PatientsUID", "PatientsName", "PatientID", "SOPInstanceUID", "StudyInstanceUID", "SeriesInstanceUID", "StudyID", "Modality", "Filename", "DisplayedFieldsUpdatedTimestamp_x"]
        db_info = unique_images_series_studies_patients_[colnames]

        # db_info['DisplayedFieldsUpdatedTimestamp_x'] = pd.to_datetime(db_info['DisplayedFieldsUpdatedTimestamp_x'])

        # Print or use the DataFrame as needed
        return db_info

    except Exception as e:
        # Handle any exceptions that might occur during data processing
        print(f"DB connection and processing: {e}")
        return None


In [None]:
data = db_read(dbpath)
data['DisplayedFieldsUpdatedTimestamp_x'] = pd.to_datetime(data['DisplayedFieldsUpdatedTimestamp_x'])
date = date = datetime.today().date()
p = data[data['DisplayedFieldsUpdatedTimestamp_x'].dt.date == date]
p.drop_duplicates("StudyID")

In [18]:
import re

def clean_filename(name):
    """
    Cleans a filename by replacing disallowed characters with underscores.
    
    Args:
        name (str): The input filename.

    Returns:
        str: The cleaned filename with disallowed characters replaced by underscores.
    """
    # Specify the characters you want to replace in the pattern
    pattern = r'[<>:"\\/|?*]'
    replacement = '_'

    # Use re.sub to replace consecutive occurrences of the specified characters with a single '_'
    result = re.sub(r'[_]+', '_', re.sub(pattern, replacement, name))

    # Trim leading and trailing underscores
    result = result.strip('_')

    return result

# Example usage:
input_filename = "name*_/.?*_/file"
cleaned_filename = clean_filename(input_filename)
print(cleaned_filename)


name_._file_


In [3]:
import pydicom
from datetime import datetime

dbpath = "sqlite:///D:/DICOM DATABASE/DICOM IMPORT/ctkDICOM.sql"
root_dir = "D:\\DICOM DATABASE\\DICOM IMPORT\\dicom\\28642ff3"
output_dir = "D:\\python\\ddis zip testing"


patient_id = "10258"
patient_name = "Avc"
modality = "MR"
study_id = "1025"
date = datetime.today()

def copy_files_make_zip(root_dir, output_dir, patient_id, patient_name, modality, study_id, date):
    """
    Copy DICOM files from multiple folders within a specified root directory to a patient-specific directory,
    and create a ZIP archive of the copied files.

    Parameters:
    - root_dir (str): The root directory containing DICOM files.
    - patient_id (str): The unique identifier for the patient.
    - patient_name (str): The name of the patient.
    - date (str): The date for creating a patient-specific subdirectory.
    """

    formatted_date = date.strftime("%d_%m_%Y")
    patient_dir = os.path.join(root_dir, patient_name, formatted_date)
    # patient_id_ = clean_filename(patient_id)
    patient_id_ = re.sub('/', '_', patient_id)
    zip_name_with_path = os.path.join(output_dir, formatted_date, f"{patient_id_} {patient_name} {modality} {study_id}")

    for path, dirs, filenames in os.walk(root_dir):
        for filename in filenames:
            filepath = os.path.join(path, filename)
            try:
                dcm = pydicom.dcmread(filepath)
                patient_id = dcm.PatientID
                sopinstanceuid = dcm.SOPInstanceUID

                os.makedirs(patient_dir, exist_ok=True)
                shutil.copy(filepath, os.path.join(patient_dir, f"{sopinstanceuid}_{filename}"))
                
            except Exception as e:
                print(f"Error: {e} - File: {filename}")

    shutil.make_archive(base_name=zip_name_with_path, format="zip", root_dir=patient_dir)
    shutil.rmtree(os.path.join(root_dir, patient_name))


copy_files_make_zip(root_dir, output_dir, patient_id, patient_name, modality, study_id, date)

In [4]:
import os
import shutil
import re
from datetime import datetime
import pydicom

root_dir = "D:\\DICOM DATABASE\\DICOM IMPORT\\dicom\\28642ff3"
output_dir = "D:\\python\\ddis zip testing"

patient_id = "10258"
patient_name = "Avc"
modality = "MR"
study_id = "1025"
date = datetime.today()

def copy_files_make_zip(root_dir, output_dir, patient_id, patient_name, modality, study_id, date):
    """
    Copy DICOM files from multiple folders within a specified root directory to a patient-specific directory,
    and create a ZIP archive of the copied files.

    Parameters:
    - root_dir (str): The root directory containing DICOM files.
    - patient_id (str): The unique identifier for the patient.
    - patient_name (str): The name of the patient.
    - date (str): The date for creating a patient-specific subdirectory.
    """

    formatted_date = date.strftime("%d_%m_%Y")
    patient_dir = os.path.join(output_dir, formatted_date, f"{patient_id}_{patient_name}")
    zip_name_with_path = os.path.join(output_dir, formatted_date, f"{patient_id}_{patient_name}_{modality}_{study_id}")

    for path, dirs, filenames in os.walk(root_dir):
        for filename in filenames:
            filepath = os.path.join(path, filename)
            try:
                dcm = pydicom.dcmread(filepath)
                current_patient_id = dcm.PatientID
                sop_instance_uid = dcm.SOPInstanceUID

                if current_patient_id == patient_id:
                    os.makedirs(patient_dir, exist_ok=True)
                    shutil.copy(filepath, os.path.join(patient_dir, f"{sop_instance_uid}_{filename}"))
                
            except Exception as e:
                print(f"Error: {e} - File: {filename}")

    shutil.make_archive(base_name=zip_name_with_path, format="zip", root_dir=patient_dir)
    shutil.rmtree(patient_dir)
