## Generate the Data Check Documents for the Proper Data Control Insurance Checks

In [1]:
# Note: output files will be placed in the working dir

#PC: 
database_dir = r"E:\TriNetX\\"   # Location where the database files are stored 
working_dir = r"C:\Users\reblo\Box\Residency Personal Files\Scholarly Work\Locke Research Projects\TriNetX Code\Hypercapnia TriNetX CSV Processing\Working\\" #location where to read and right from (faster = better if space allows)

#Mac 
#database_dir = r"/Volumes/LOCKE STUDY/TriNetX"   # Location where the database files are stored 
#working_dir = r"/Users/blocke/TriNetX Working/"

In [2]:
import time
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from datetime import datetime
import gc
import dask.dataframe as dd
import dask
import logging
from dask.distributed import Client, LocalCluster
from dask import config
import h5py

#Create an output directory if it's not already there
os.makedirs(os.path.join(working_dir[:-1], "data_checks"), exist_ok=True)

### Make HD5 Files with each type of data element

#### Vital Signs

In [None]:
#Vital Signs
start_time = time.time()
store_path = os.path.join(working_dir[:-1], 'vitals_unique_encounters.h5')

if os.path.exists(store_path):
    try:
        # Attempt to open and then immediately close the file
        store = pd.HDFStore(store_path)
        store.close()
    except Exception as e:
        print(f"Failed to close the file: {e}")
    os.remove(store_path)  # Ensure a fresh start

#num_spreadsheets = 10
num_spreadsheets = 853

columns = ["patient_id","encounter_id","code_system","code","principal_diagnosis_indicator","admitting_diagnosis","reason_for_visit","date","derived_by_TriNetX","source_id"]

try: 
    store = pd.HDFStore(store_path)
    # Process each CSV and store directly to HDF5
    for i in range(1, num_spreadsheets + 1):
        print(f'{i:04}')  
        file_path = f"{database_dir}Vital Signs/vital_signs{i:04}.csv"
        chunk = pd.read_csv(file_path,
            names=columns,          # Override column names
            usecols=["encounter_id"],  # Only read the "encounter_id" column
            dtype={"encounter_id": str},  # Ensure "encounter_id" is read as a string
            skiprows=1 if i == 1 else 0   # Skip the first row only for the first file
        )
        chunk.drop_duplicates(subset=["encounter_id"], inplace=True)
        store.append('unique_encounters', chunk, format='table', data_columns=True, index=False, min_itemsize={'encounter_id': 12})
finally: 
    store.close()

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Diagnoses

In [None]:
#Diagnoses 
start_time = time.time()

store_path = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')
if os.path.exists(store_path):
    try:
        # Attempt to open and then immediately close the file
        store = pd.HDFStore(store_path)
        store.close()
    except Exception as e:
        print(f"Failed to close the file: {e}")
    os.remove(store_path)  # Ensure a fresh start

num_spreadsheets = 1273

columns = ["patient_id","encounter_id","code_system","code","principal_diagnosis_indicator","admitting_diagnosis","reason_for_visit","date","derived_by_TriNetX","source_id"]

try: 
    store = pd.HDFStore(store_path)
    # Process each CSV and store directly to HDF5
    for i in range(1, num_spreadsheets + 1):
        print(f'{i:04}')  
        file_path = f"{database_dir}Diagnosis/diagnosis{i:04}.csv"
        chunk = pd.read_csv(file_path,
            names=columns,          # Override column names
            usecols=["encounter_id"],  # Only read the "encounter_id" column
            dtype={"encounter_id": str},  # Ensure "encounter_id" is read as a string
            skiprows=1 if i == 1 else 0   # Skip the first row only for the first file
        )
        chunk.drop_duplicates(subset=["encounter_id"], inplace=True)
        store.append('unique_encounters', chunk, format='table', data_columns=True, index=False, min_itemsize={'encounter_id': 12})
finally: 
    store.close()

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Labs

In [None]:
# Labs
start_time = time.time()

store_path = os.path.join(working_dir[:-1], 'lab_unique_encounters.h5')
if os.path.exists(store_path):
    try:
        # Attempt to open and then immediately close the file
        store = pd.HDFStore(store_path)
        store.close()
    except Exception as e:
        print(f"Failed to close the file: {e}")
    os.remove(store_path)  # Ensure a fresh start

#num_spreadsheets = 10
num_spreadsheets = 2334

columns = ["patient_id","encounter_id","code_system","code","date","value","text_value","units_of_measure","derived_by_TriNetX","source_id"]

try: 
    store = pd.HDFStore(store_path)
    # Process each CSV and store directly to HDF5
    for i in range(1, num_spreadsheets + 1):
        print(f'{i:04}')  
        file_path = f"{database_dir}Lab Results/lab_results{i:04}.csv"
        chunk = pd.read_csv(file_path,
            names=columns,          # Override column names
            usecols=["encounter_id"],  # Only read the "encounter_id" column
            dtype={"encounter_id": str},  # Ensure "encounter_id" is read as a string
            skiprows=1 if i == 1 else 0   # Skip the first row only for the first file
        )
        chunk.drop_duplicates(subset=["encounter_id"], inplace=True)
        store.append('unique_encounters', chunk, format='table', data_columns=True, index=False, min_itemsize={'encounter_id': 12})
finally: 
    store.close()

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Procedures

In [None]:
# Procedures
start_time = time.time()

store_path = os.path.join(working_dir[:-1], 'proc_unique_encounters.h5')
if os.path.exists(store_path):
    try:
        # Attempt to open and then immediately close the file
        store = pd.HDFStore(store_path)
        store.close()
    except Exception as e:
        print(f"Failed to close the file: {e}")
    os.remove(store_path)  # Ensure a fresh start

#num_spreadsheets = 10
num_spreadsheets = 714

columns = ["patient_id","encounter_id","code_system","code","principal_procedure_indicator","date","derived_by_TriNetX","source_id"]

try: 
    store = pd.HDFStore(store_path)
    # Process each CSV and store directly to HDF5
    for i in range(1, num_spreadsheets + 1):
        print(f'{i:04}')  
        file_path = f"{database_dir}Procedure/procedure{i:04}.csv"
        chunk = pd.read_csv(file_path,
            names=columns,          # Override column names
            usecols=["encounter_id"],  # Only read the "encounter_id" column
            dtype={"encounter_id": str},  # Ensure "encounter_id" is read as a string
            skiprows=1 if i == 1 else 0   # Skip the first row only for the first file
        )
        chunk.drop_duplicates(subset=["encounter_id"], inplace=True)
        store.append('unique_encounters', chunk, format='table', data_columns=True, index=False, min_itemsize={'encounter_id': 12})
finally: 
    store.close()

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Meds

In [None]:
# Meds
start_time = time.time()

store_path = os.path.join(working_dir[:-1], 'med_unique_encounters.h5')
if os.path.exists(store_path):
    try:
        # Attempt to open and then immediately close the file
        store = pd.HDFStore(store_path)
        store.close()
    except Exception as e:
        print(f"Failed to close the file: {e}")
    os.remove(store_path)  # Ensure a fresh start

#num_spreadsheets = 10
num_spreadsheets = 2991

columns = ["patient_id","encounter_id","unique_id","code_system","code","start_date","route","brand","strength","derived_by_TriNetX","source_id"]

try: 
    store = pd.HDFStore(store_path)
    # Process each CSV and store directly to HDF5
    for i in range(1, num_spreadsheets + 1):
        print(f'{i:04}')  
        file_path = f"{database_dir}Medications/medication{i:04}.csv"
        chunk = pd.read_csv(file_path,
            names=columns,          # Override column names
            usecols=["encounter_id"],  # Only read the "encounter_id" column
            dtype={"encounter_id": str},  # Ensure "encounter_id" is read as a string
            skiprows=1 if i == 1 else 0   # Skip the first row only for the first file
        )
        chunk.drop_duplicates(subset=["encounter_id"], inplace=True)
        store.append('unique_encounters', chunk, format='table', data_columns=True, index=False, min_itemsize={'encounter_id': 12})
finally: 
    store.close()

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()


### De-duplicate

#### H5 Structure Check Code

In [6]:
def print_structure_and_count(name, obj):
    if isinstance(obj, h5py.Dataset):
        print(f"{name}: {len(obj)} entries")
    else:
        print(name)

def check_and_print_structure(file_path):
    print(f"Structure of {os.path.basename(file_path)}:")
    with h5py.File(file_path, 'r') as f:
        f.visititems(print_structure_and_count)

diag_path = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')
vitals_path = os.path.join(working_dir[:-1], 'vitals_unique_encounters.h5')
lab_path = os.path.join(working_dir[:-1], 'lab_unique_encounters.h5')
med_path = os.path.join(working_dir[:-1], 'med_unique_encounters.h5')
proc_path = os.path.join(working_dir[:-1], 'proc_unique_encounters.h5')

# List of all HDF5 paths
paths = [diag_path, vitals_path, lab_path, med_path, proc_path]

for path in paths:
    check_and_print_structure(path)

Structure of diag_unique_encounters.h5:
unique_encounters
unique_encounters/table: 249836036 entries
Structure of vitals_unique_encounters.h5:
unique_encounters
unique_encounters/table: 101603643 entries
Structure of lab_unique_encounters.h5:
unique_encounters
unique_encounters/table: 100800073 entries
Structure of med_unique_encounters.h5:
unique_encounters
unique_encounters/table: 159229970 entries
Structure of proc_unique_encounters.h5:
unique_encounters
unique_encounters/table: 166304142 entries


### New Attempt: try to merge the databases first

In [26]:
import os
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import dask

# Define paths
diag_path = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')
vitals_path = os.path.join(working_dir[:-1], 'vitals_unique_encounters.h5')
output_path = os.path.join(working_dir[:-1], 'merged_unique_encounters.csv')

# Configure Dask LocalCluster
memory_per_worker = '6GB'
cluster = LocalCluster(
    n_workers=4,               # Number of worker processes
    threads_per_worker=1,      # Number of threads per worker
    memory_limit=memory_per_worker,  # Memory limit per worker
    processes=True,            # Use separate processes for each worker
    dashboard_address=':8787'  # Dashboard address for monitoring
)
client = Client(cluster)

# Adjust memory spilling settings
dask.config.set({
    'distributed.worker.memory.target': 0.70,
    'distributed.worker.memory.spill': 0.80,
    'distributed.worker.memory.pause': 0.80,
    'distributed.worker.memory.terminate': 0.95,
    'distributed.scheduler.allowed-failures': 10,
})

# Read the data using Dask
try:
    diag_ddf = dd.read_hdf(diag_path, 'unique_encounters/table')
    vitals_ddf = dd.read_hdf(vitals_path, 'unique_encounters/table')
    print("Successfully read H5 files into Dask DataFrames.")
except Exception as e:
    print(f"Error reading H5 files into Dask DataFrames: {e}")

# Drop the index column if it exists
try:
    diag_ddf = diag_ddf[['encounter_id']]
    vitals_ddf = vitals_ddf[['encounter_id']]
except Exception as e:
    print(f"Error selecting columns: {e}")

# Convert 'encounter_id' to string and trim whitespace
try:
    diag_ddf['encounter_id'] = diag_ddf['encounter_id'].astype(str).str.strip()
    vitals_ddf['encounter_id'] = vitals_ddf['encounter_id'].astype(str).str.strip()
    print("Successfully converted 'encounter_id' to strings and trimmed whitespace.")
except Exception as e:
    print(f"Error processing 'encounter_id': {e}")


# Perform the merge operation to keep only common "encounter_id"
try:
    merged_ddf = diag_ddf.merge(vitals_ddf, on='encounter_id', how='inner')
    print("Successfully merged DataFrames.")
except Exception as e:
    print(f"Error merging DataFrames: {e}")

# Persist the intermediate result to avoid recomputation
try:
    merged_ddf = merged_ddf.persist()
    print("Successfully persisted merged DataFrame.")
except Exception as e:
    print(f"Error persisting DataFrame: {e}")

# Write the result to a single CSV file
try:
    merged_ddf.to_csv(output_path, single_file=True, index=False)
    print(f"Merged unique encounters saved to: {output_path}")
except Exception as e:
    print(f"Error writing CSV: {e}")

# Close the Dask client
client.close()
cluster.close()


Perhaps you already have a cluster running?
Hosting the HTTP server on port 51772 instead

+-------------+-----------+-----------+----------+
| Package     | Client    | Scheduler | Workers  |
+-------------+-----------+-----------+----------+
| dask        | 2023.11.0 | 2023.11.0 | 2024.5.1 |
| distributed | 2023.11.0 | 2023.11.0 | 2024.5.1 |
+-------------+-----------+-----------+----------+
2024-05-27 19:07:24,716 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "c:\Users\reblo\anaconda3\Lib\site-packages\distributed\protocol\core.py", line 160, in loads
    sub_header, sub_frames, deserializers=deserializers
  File "c:\Users\reblo\anaconda3\Lib\site-packages\msgpack\fallback.py", line 136, in unpackb
    raise ExtraData(ret, unpacker._get_extradata())
msgpack.exceptions.ExtraData: unpack(b) received extra data.
2024-05-27 19:07:24,752 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most rece

Successfully read H5 files into Dask DataFrames.
Successfully converted 'encounter_id' to strings and trimmed whitespace.
Successfully merged DataFrames.
Successfully persisted merged DataFrame.
Error writing CSV: _write_csv-1f56df5a-bb08-486f-8273-67bfaa5218dd


In [9]:
# Define paths
diag_path = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')
vitals_path = os.path.join(working_dir[:-1], 'vitals_unique_encounters.h5')
output_path = os.path.join(working_dir[:-1], 'merged_unique_encounters.csv')

# Configure Dask LocalCluster
memory_per_worker = '6GB'
cluster = LocalCluster(
    n_workers=4,               # Number of worker processes
    threads_per_worker=1,      # Number of threads per worker
    memory_limit=memory_per_worker,  # Memory limit per worker
    processes=True,            # Use separate processes for each worker
    dashboard_address=':8787'  # Dashboard address for monitoring
)
client = Client(cluster)

# Adjust memory spilling settings
dask.config.set({
    'distributed.worker.memory.target': 0.70,
    'distributed.worker.memory.spill': 0.80,
    'distributed.worker.memory.pause': 0.80,
    'distributed.worker.memory.terminate': 0.95,
    'distributed.scheduler.allowed-failures': 10,
})

try: 
    # Read the data using Dask
    diag_ddf = dd.read_hdf(diag_path, 'unique_encounters')
    vitals_ddf = dd.read_hdf(vitals_path, 'unique_encounters')

    # Ensure the columns are named consistently if not already
    diag_ddf = diag_ddf.rename(columns={diag_ddf.columns[0]: 'encounter_id'})
    vitals_ddf = vitals_ddf.rename(columns={vitals_ddf.columns[0]: 'encounter_id'})
    print(diag_ddf.shape)
    print(diag_ddf.head())
    print(vitals_ddf.shape)
    print(vitals_ddf.head())
except Exception as e:
    print(f"Error during reading {e}")

try:
    # Perform the merge operation to keep only common "unique_encounters"
    merged_ddf = diag_ddf.merge(vitals_ddf, on='unique_encounters', how='inner')

    # Persist the intermediate result to avoid recomputation
    merged_ddf = merged_ddf.persist()

    # Write the result to a single CSV file
    merged_ddf.to_csv(output_path, single_file=True, index=False)

    print(f"Merged unique encounters saved to: {output_path}")
except Exception as e:
    print(f"Error during compute or writing to CSV {e}")
# Close the Dask client
client.close()
cluster.close()


(Delayed('int-c8a2ddc3-c486-43e0-947c-c9a83f6b097c'), 1)
  encounter_id
0          GRB
1           Gh
2          GBB
3          GhB
4           GR
(Delayed('int-6d6da18e-4f2f-4288-a9d4-526ec25da1fd'), 1)
    encounter_id
0             GR
1            GhB
17            Gh
21           GBB
183          GhN
Error during compute or writing to CSV 'unique_encounters'


#### Lab

In [None]:
# Small enough to just use pandas
start_time = time.time()

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_lab_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'lab_unique_encounters.h5')

# Read the data using Pandas
pdf = pd.read_hdf(input_hdf_path, 'unique_encounters')

# Remove duplicates
pdf = pdf.drop_duplicates()

# Write the DataFrame to a CSV file
pdf.to_csv(output_csv_path, index=False)

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Vital Signs

In [None]:
# Vital Signs - small enough to do simply
start_time = time.time()

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_vitals_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'vitals_unique_encounters.h5')

# Read the data using Dask
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
ddf = ddf.drop_duplicates() # Remove duplicates
ddf = ddf.compute() # Compute to remove duplicates effectively
ddf.to_csv(output_csv_path, index = False)

#start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()


#### Procedures


In [None]:
# Meds 
start_time = time.time()

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_proc_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'proc_unique_encounters.h5')

# Read the data using Dask
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
ddf = ddf.drop_duplicates() # Remove duplicates
ddf = ddf.compute() # Compute to remove duplicates effectively
ddf.to_csv(output_csv_path, index = False)

#start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Medications

In [None]:
# Meds 
start_time = time.time()

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_med_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'med_unique_encounters.h5')

# Read the data using Dask
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
ddf = ddf.drop_duplicates() # Remove duplicates
ddf = ddf.compute() # Compute to remove duplicates effectively
ddf.to_csv(output_csv_path, index = False)

#start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

#### Diagnoses

In [3]:
# Troubleshoot code block
# Try this one first? 
#  WHICH DIAGNOSIS VERSION WORKS? - Allegedly this one does on mac. 

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
input_hdf_path = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')

# Read the data using Dask
logger.info("Reading data from HDF5")
# This is a hack to delay the compute
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')

# Sample the data to check if it's a memory issue
logger.info("Taking a sample of the data for testing")
sample_ddf = ddf.sample(frac=0.1).compute()
logger.info(f"Sample size: {sample_ddf.shape}")

# Check for duplicates in the sample
logger.info("Dropping duplicates in the sample")
sample_ddf = sample_ddf.drop_duplicates()

# Test writing the sample to CSV
sample_output_path = os.path.join(working_dir[:-1], 'sample_clean_diag_unique_encounters.csv')
logger.info(f"Writing sample data to {sample_output_path}")
sample_ddf.to_csv(sample_output_path, index=False)

# If the sample works, proceed with the full dataset
logger.info("Dropping duplicates in the full dataset")
ddf = ddf.drop_duplicates()
logger.info("Computing the full dataset to remove duplicates")
try:
    ddf = ddf.compute()
    logger.info("Full dataset computed successfully")

    # Write the full dataset to CSV
    logger.info(f"Writing full dataset to {output_csv_path}")
    ddf.to_csv(output_csv_path, index=False)
except Exception as e:
    logger.error("Error during compute or writing to CSV", exc_info=True)

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()

INFO:__main__:Reading data from HDF5
INFO:__main__:Dropping duplicates in the full dataset
INFO:__main__:Computing the full dataset to remove duplicates


### Create Screens

#### Ambulatory

In [6]:
start_time = time.time()

# Ambulatory
# Read data from HDF5 files using Dask
diag_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
print(diag_path)
vitals_path = os.path.join(working_dir[:-1], 'clean_vitals_unique_encounters.csv')
print(vitals_path)
output_csv_path = os.path.join(working_dir[:-1], "data_checks", "amb_enc_screen.csv")
print(output_csv_path)

# Configure Dask LocalCluster
memory_per_worker = '6.00GB'  # Adjust to fit within total memory (7.75GB / 6 workers = ~1.29GB)
cluster = LocalCluster(
    n_workers=1,               # Number of worker processes (matching your 6pc 8mac cores)
    threads_per_worker=1,      # Number of threads per worker to avoid GIL issues
    memory_limit=memory_per_worker,  # Memory limit per worker
    processes=True,            # Use separate processes for each worker
    dashboard_address=':8787'  # Dashboard address for monitoring
)

# Create a Dask client
client = Client(cluster)

# Adjust memory spilling settings
dask.config.set({
    'distributed.worker.memory.target': 0.60,    # Spill to disk at 60% memory usage
    'distributed.worker.memory.spill': 0.70,     # Spill to disk at 70% memory usage
    'distributed.worker.memory.pause': 0.80,     # Pause worker at 80% memory usage
    'distributed.worker.memory.terminate': 0.95, # Terminate worker at 95% memory usage
    'distributed.scheduler.allowed-failures': 10, # Set the allowed failures to 10
})


try:
    # This head hack 
    diag_ddf = dd.read_csv(diag_path, usecols=['encounter_id']).head(100000000000, compute=False)
    vitals_ddf = dd.read_csv(vitals_path, usecols=['encounter_id']).head(100000000000, compute=False)
    print("Successfully read CSV files into Dask DataFrames.")
except Exception as e:
    print(f"Error reading CSV files into Dask DataFrames: {e}")

try:
    intersection_ddf = diag_ddf.merge(vitals_ddf, on='encounter_id', how='inner')
    print("Successfully merged DataFrames.")
except Exception as e:
    print(f"Error merging DataFrames: {e}")

try:
    # intersection_ddf = intersection_ddf.persist()  
    # Persist to avoid recomputation if I were to use this for multiple things - but currently don't
    intersection_ddf.to_csv(output_csv_path, single_file=True, index=False)
    print(f"Intersection CSV saved to: {output_csv_path}")
except Exception as e:
    print(f"Error writing CSV: {e}")

client.close()
cluster.close()

# Data check
try:
    output_csv = pd.read_csv(output_csv_path, usecols=['encounter_id'])
    print(f"Shape of output: {output_csv.shape}")
except Exception as e:
    print(f"Error reading output CSV file: {e}")

end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()


C:\Users\reblo\Box\Residency Personal Files\Scholarly Work\Locke Research Projects\TriNetX Code\Hypercapnia TriNetX CSV Processing\Working\clean_diag_unique_encounters.csv
C:\Users\reblo\Box\Residency Personal Files\Scholarly Work\Locke Research Projects\TriNetX Code\Hypercapnia TriNetX CSV Processing\Working\clean_vitals_unique_encounters.csv
C:\Users\reblo\Box\Residency Personal Files\Scholarly Work\Locke Research Projects\TriNetX Code\Hypercapnia TriNetX CSV Processing\Working\data_checks\amb_enc_screen.csv
Successfully read CSV files into Dask DataFrames.
Successfully merged DataFrames.
Intersection CSV saved to: C:\Users\reblo\Box\Residency Personal Files\Scholarly Work\Locke Research Projects\TriNetX Code\Hypercapnia TriNetX CSV Processing\Working\data_checks\amb_enc_screen.csv
Shape of output: (2659269, 1)
Executed in 0 hours, 0 minutes, and 15.94 seconds.


5311

#### Inpatient and Emergency

In [5]:
start_time = time.time()

# Read data from HDF5 files using Dask
diag_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
vitals_path = os.path.join(working_dir[:-1], 'clean_vitals_unique_encounters.csv')
med_path = os.path.join(working_dir[:-1], 'clean_med_unique_encounters.csv')
proc_path = os.path.join(working_dir[:-1], 'clean_proc_unique_encounters.csv')
lab_path = os.path.join(working_dir[:-1], 'clean_lab_unique_encounters.csv') 

output_csv_path = os.path.join(working_dir[:-1], "data_checks", "inp_enc_screen.csv")

# Commented out if prior code block still running
# Configure Dask LocalCluster
memory_per_worker = '6.00GB'  # Adjust to fit within total memory (7.75GB / 6 workers = ~1.29GB)
cluster = LocalCluster(
    n_workers=1,               # Number of worker processes (matching your 6pc 8mac cores)
    threads_per_worker=1,      # Number of threads per worker to avoid GIL issues
    memory_limit=memory_per_worker,  # Memory limit per worker
    processes=True,            # Use separate processes for each worker
    dashboard_address=':8787'  # Dashboard address for monitoring
)

# Create a Dask client
client = Client(cluster)

# Adjust memory spilling settings
dask.config.set({
    'distributed.worker.memory.target': 0.60,    # Spill to disk at 60% memory usage
    'distributed.worker.memory.spill': 0.70,     # Spill to disk at 70% memory usage
    'distributed.worker.memory.pause': 0.80,     # Pause worker at 80% memory usage
    'distributed.worker.memory.terminate': 0.95, # Terminate worker at 95% memory usage
    'distributed.scheduler.allowed-failures': 10, # Set the allowed failures to 10
})

# Read the data using Dask and ensure 'encounter_id' is read as a string
try:
    # This head hack makes it so the reads are queued, rather than bringing everything in initially.
    diag_ddf = dd.read_csv(diag_path, usecols=['encounter_id'], dtype={'encounter_id': 'str'}).head(100000000000, compute=False).dropna(subset=['encounter_id'])
    vitals_ddf = dd.read_csv(vitals_path, usecols=['encounter_id'], dtype={'encounter_id': 'str'}).head(100000000000, compute=False).dropna(subset=['encounter_id'])
    med_ddf = dd.read_csv(med_path, usecols=['encounter_id'], dtype={'encounter_id': 'str'}).head(100000000000, compute=False).dropna(subset=['encounter_id'])
    lab_ddf = dd.read_csv(lab_path, usecols=['encounter_id'], dtype={'encounter_id': 'str'}).head(100000000000, compute=False).dropna(subset=['encounter_id'])
    proc_ddf = dd.read_csv(proc_path, usecols=['encounter_id'], dtype={'encounter_id': 'str'}).head(100000000000, compute=False).dropna(subset=['encounter_id'])
    print("Successfully read CSV files into Dask DataFrames.")
except Exception as e:
    print(f"Error reading CSV files into Dask DataFrames: {e}")


try:
    ddf_shape = diag_ddf.shape
    print(f"Diag DF shape {ddf_shape}")

    intersection_ddf = diag_ddf.merge(vitals_ddf, on='encounter_id', how='inner')
    intersection_ddf = intersection_ddf.persist()
    ddf_shape = intersection_ddf.shape
    print(f"post vitals DF shape {ddf_shape}")

    intersection_ddf = intersection_ddf.merge(med_ddf, on='encounter_id', how='inner')
    intersection_ddf = intersection_ddf.persist()
    ddf_shape = intersection_ddf.shape
    print(f"post encounters DF shape {ddf_shape}")

    intersection_ddf = intersection_ddf.merge(proc_ddf, on='encounter_id', how='inner')
    intersection_ddf = intersection_ddf.persist()
    ddf_shape = intersection_ddf.shape
    print(f"post procDF shape {ddf_shape}")

    intersection_ddf = intersection_ddf.merge(lab_ddf, on='encounter_id', how='inner')
    intersection_ddf = intersection_ddf.persist()
    ddf_shape = intersection_ddf.shape
    print(f"post lab DF shape {ddf_shape}")
    print("Successfully merged DataFrames.")
except Exception as e:
    print(f"Error merging DataFrames: {e}")

try:
    # intersection_ddf = intersection_ddf.persist()  
    # Persist to avoid recomputation if I were to use this for multiple things - but currently don't
    intersection_ddf.to_csv(output_csv_path, single_file=True, index=False)
    print(f"Intersection CSV saved to: {output_csv_path}")
except Exception as e:
    print(f"Error writing CSV: {e}")

client.close()
cluster.close()

# Data check
try:
    output_csv = pd.read_csv(output_csv_path, usecols=['encounter_id'])
    print(f"Shape of output: {output_csv.shape}")
except Exception as e:
    print(f"Error reading output CSV file: {e}")

#start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()



Successfully read CSV files into Dask DataFrames.
Diag DF shape (Delayed('int-bd0dec00-131d-45e6-ad81-972d72490991'), 1)
post vitals DF shape (Delayed('int-0814e485-58bb-4c5a-8052-4629b8c0bdea'), 1)
post encounters DF shape (Delayed('int-e078815b-e4fc-42ab-8a55-aee486778549'), 1)
post procDF shape (Delayed('int-620a6dac-38e1-4fb6-8994-e1e74f01c0e2'), 1)
post lab DF shape (Delayed('int-3a8bd936-bee6-4ffd-90a1-db8e8f995946'), 1)
Successfully merged DataFrames.
Intersection CSV saved to: C:\Users\reblo\Box\Residency Personal Files\Scholarly Work\Locke Research Projects\TriNetX Code\Hypercapnia TriNetX CSV Processing\Working\data_checks\inp_enc_screen.csv
Shape of output: (380772, 1)
Executed in 0 hours, 0 minutes, and 31.27 seconds.


3119

### Other Scratchpad commands that didn't make the cut for various reseasons. 

Preserved in case need to pilfer them later

HDF5 Creation - deprecated version

In [None]:
#LEGACY? 

"""
# Setup the storage file (HDF5)
start_time = time.time()

store_path = os.path.join(working_dir[:-1], 'unique_encounters.h5')
if os.path.exists(store_path):
    try:
        # Attempt to open and then immediately close the file
        store = pd.HDFStore(store_path)
        store.close()
    except Exception as e:
        print(f"Failed to close the file: {e}")
    os.remove(store_path)  # Ensure a fresh start


num_spreadsheets = 1273
output_file = os.path.join(working_dir[:-1], "data_checks", "diagnosis_encounters.csv")
columns = ["patient_id","encounter_id","code_system","code","principal_diagnosis_indicator","admitting_diagnosis","reason_for_visit","date","derived_by_TriNetX","source_id"]

try: 
    store = pd.HDFStore(store_path)
    # Process each CSV and store directly to HDF5
    for i in range(1, num_spreadsheets + 1):
        print(f'{i:04}')  
        file_path = f"{database_dir}Diagnosis/diagnosis{i:04}.csv"
        chunk = pd.read_csv(file_path,
            names=columns,          # Override column names
            usecols=["encounter_id"],  # Only read the "encounter_id" column
            dtype={"encounter_id": str}  # Ensure "encounter_id" is read as a string
        )
        store.append('unique_encounters', chunk, format='table', data_columns=True, index=False, min_itemsize={'encounter_id': 12})
finally: 
    store.close()

# Read data using Dask
dask_df = dd.read_hdf(store_path, 'unique_encounters')

# Remove duplicates
dask_df = dask_df.drop_duplicates()

# Compute and save to CSV
dask_df.compute().to_csv(output_file, index=False)
    
print(f"Unique encounter IDs with diagnoses reported are written to {output_file}")

start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()
"""

Version of Procedure Deduplications That Doesn't add Anything

In [None]:
# MAYBE REMOVE:? Currently seems unsuccesful
"""
# Proc
start_time = time.time()

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_proc_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'proc_unique_encounters.h5')

# Configure Dask LocalCluster
memory_per_worker = '7.50GB'  # Adjust to fit within total memory (7.75GB / 6 workers = ~1.29GB)
cluster = LocalCluster(
    n_workers=1,               # Number of worker processes (matching your 6 cores)
    threads_per_worker=1,      # Number of threads per worker to avoid GIL issues
    memory_limit=memory_per_worker,  # Memory limit per worker
    processes=True,            # Use separate processes for each worker
    dashboard_address=':8787'  # Dashboard address for monitoring
)

# Create a Dask client
client = Client(cluster)
client.get_versions(check=True)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Dask client created with dashboard at: {client.dashboard_link}")

# Adjust memory spilling settings
dask.config.set({
    'distributed.worker.memory.target': 0.70,    # Spill to disk at 60% memory usage
    'distributed.worker.memory.spill': 0.80,     # Spill to disk at 70% memory usage
    'distributed.worker.memory.pause': 0.80,     # Pause worker at 80% memory usage
    'distributed.worker.memory.terminate': 0.95, # Terminate worker at 95% memory usage
    'distributed.scheduler.allowed-failures': 10, # Set the allowed failures to 10
})

ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
ddf = ddf.drop_duplicates() # Remove duplicates
ddf = ddf.compute()
ddf.to_csv(output_csv_path, index = False)

# Read the data using Dask
try: 
    ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
    print("Successfully read H5 files into Dask DataFrames.")
except Exception as e:
    print(f"Error reading H5 files into Dask DataFrames: {e}")

try:
    ddf = ddf.drop_duplicates().persist() # Remove duplicates
    print("Successfully Dropped Duplicates")
except Exception as e:
    print(f"Error Dropping Duplicates: {e}")


try: 
    ddf = ddf.compute() # Compute to remove duplicates effectively
    print("Successfully Compuated.")
except Exception as e:
    print(f"Error Computing: {e}")

try:
    # Single-file replaces the compute stage and so it gets directly written to csv
    ddf.to_csv(output_csv_path, single_file = True, index = False) 
    print(f"Intersection CSV saved to: {output_csv_path}")
except Exception as e:
    print(f"Error writing CSV: {e}")

    
# Close the Dask client
client.close()
cluster.close()

#start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()
"""

Lab dedup

In [None]:
# Labs - due to size, requires partitioning then recombination.
# NOT NEEDED BECAUSE THE REGULAR ONE WORKS FINE
"""
# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_lab_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'lab_unique_encounters.h5')

# Read the data using Dask
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
#ddf = ddf.drop_duplicates() # Remove duplicates
#ddf = ddf.compute() # Compute to remove duplicates effectively

num_partitions = 10
ddf = ddf.repartition(npartitions=num_partitions)

# Function to process each chunk
def process_chunk(chunk):
    return chunk.drop_duplicates()

# Apply the function to each partition
result = ddf.map_partitions(process_chunk)

# Compute the result
computed_result = result.compute()

# Combine the partitions into a single Pandas DataFrame
combined_df = pd.concat(computed_result)

# Remove duplicates again to ensure global uniqueness - this is smaller. 
final_df = combined_df.drop_duplicates()

# Write the combined DataFrame to a single CSV file
final_df.to_csv(output_csv_path, index=False)

#hdf.to_csv(output_hdf_path, index = False)
"""

"""
# Since compute is called, we now work with a Pandas DataFrame
# Check and convert data type explicitly if needed
if ddf['encounter_id'].dtype != 'object':
    ddf['encounter_id'] = ddf['encounter_id'].astype('object')

# Now, write the cleaned data frame back to HDF5
with pd.HDFStore(output_hdf_path, 'w') as store:
    store.put('unique_encounters', ddf, format='table', data_columns=True, index=False)
"""

Versions of the Diagnosis De-duplication that didnt work

In [None]:
# This one doesn't work because the database is too big. 
"""start_time = time.time()

# TODO: WHICH DIAGNOSIS VERSION WORKS? 

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')

# Read the data using Pandas
pdf = pd.read_hdf(input_hdf_path, 'unique_encounters')

# Remove duplicates
pdf = pdf.drop_duplicates()

# Write the DataFrame to a CSV file
pdf.to_csv(output_csv_path, index=False)

#start_time = time.time()
end_time = time.time()
execution_time = end_time - start_time
hours = int(execution_time // 3600)
minutes = int((execution_time % 3600) // 60)
seconds = execution_time % 60
print(f"Executed in {hours} hours, {minutes} minutes, and {seconds:.2f} seconds.")
gc.collect()
"""


In [None]:
"""
# TODO: WHICH DIAGNOSIS VERSION WORKS? 

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Define the working directory and HDF5 paths (update with your actual paths)
output_hdf_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')

# Configure Dask LocalCluster
memory_per_worker = '4.00GB'  # Adjust to fit within total memory (7.75GB / 6 workers = ~1.29GB)
cluster = LocalCluster(
    n_workers=2,               # Number of worker processes (matching your 6 cores)
    threads_per_worker=1,      # Number of threads per worker to avoid GIL issues
    memory_limit=memory_per_worker,  # Memory limit per worker
    processes=True,            # Use separate processes for each worker
    dashboard_address=':8787'  # Dashboard address for monitoring
)

# Create a Dask client
client = Client(cluster)
logger.info(f"Dask client created with dashboard at: {client.dashboard_link}")

# Adjust memory spilling settings
dask.config.set({
    'distributed.worker.memory.target': 0.60,    # Spill to disk at 60% memory usage
    'distributed.worker.memory.spill': 0.70,     # Spill to disk at 70% memory usage
    'distributed.worker.memory.pause': 0.80,     # Pause worker at 80% memory usage
    'distributed.worker.memory.terminate': 0.95, # Terminate worker at 95% memory usage
    'distributed.scheduler.allowed-failures': 10, # Set the allowed failures to 10
})

# Read the data using Dask
logger.info("Reading data from HDF5")
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')

# Split the dataframe into 10 smaller chunks
num_partitions = 10
ddf = ddf.repartition(npartitions=num_partitions)
logger.info(f"Repartitioned dataframe into {num_partitions} partitions")

# Function to process each chunk
def process_chunk(chunk):
    return chunk.drop_duplicates()

# Apply the function to each partition
logger.info("Dropping duplicates in each partition")
result = ddf.map_partitions(process_chunk)

# Compute the result
logger.info("Computing the results for each partition")
computed_result = result.compute()

# Combine results and drop duplicates again to ensure global uniqueness
logger.info("Dropping duplicates from the combined result")
final_result = computed_result.drop_duplicates()

# Write the final result to CSV
logger.info(f"Writing final dataset to {output_hdf_path}")
final_result.to_csv(output_hdf_path, index=False)

# Close the Dask client
client.close()
cluster.close()
"""

In [None]:
"""
# TODO: WHICH DIAGNOSIS VERSION WORKS? 
# Diagnoses

# Configure Dask
cluster = LocalCluster(
    n_workers=6,              # Number of workers
    threads_per_worker=1,     # Number of threads per worker
    memory_limit='1.25GB',       # Memory limit per worker
    processes=True,           # Use separate processes for each worker
    dashboard_address=':8787' # Dashboard address http://localhost:8787
)
client = Client(cluster)
client.get_versions(check=True)

config.set({'distributed.worker.memory.target': 0.33,    # Spill to disk at 60% memory usage
            'distributed.worker.memory.spill': 0.40,     # Spill to disk at 70% memory usage
            'distributed.worker.memory.pause': 0.70,     # Pause worker at 80% memory usage
            'distributed.worker.memory.terminate': 0.95, # Terminate worker at 95% memory usage
            'distributed.scheduler.allowed-failures': 10, # Set the allowed failures to 10          
           })

# Define the HDF5 paths
output_hdf_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')

# Read the data using Dask
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')
ddf = ddf.drop_duplicates() # Remove duplicates
ddf = ddf.compute() # Compute to remove duplicates effectively
ddf.to_csv(output_hdf_path, index = False)

# Close the Dask client
client.close()
cluster.close()

# Since compute is called, we now work with a Pandas DataFrame
# Check and convert data type explicitly if needed


if ddf['encounter_id'].dtype != 'object':
    ddf['encounter_id'] = ddf['encounter_id'].astype('object')

# Now, write the cleaned data frame back to HDF5
with pd.HDFStore(output_hdf_path, 'w') as store:
    store.put('unique_encounters', ddf, format='table', data_columns=True, index=False)

"""

In [None]:
"""
# TODO: WHICH DIAGNOSIS VERSION WORKS? 

# Define the HDF5 paths
output_csv_path = os.path.join(working_dir[:-1], 'clean_diag_unique_encounters.csv')
input_hdf_path  = os.path.join(working_dir[:-1], 'diag_unique_encounters.h5')

from dask.distributed import Client
from dask.distributed import Client, LocalCluster
from dask import config

# Configure Dask
cluster = LocalCluster(
    n_workers=1,              # Number of workers
    threads_per_worker=1,     # Number of threads per worker
    memory_limit='7.50GB',       # Memory limit per worker
    processes=True,           # Use separate processes for each worker
    dashboard_address=':8787' # Dashboard address http://localhost:8787
)
client = Client(cluster)
client.get_versions(check=True)

config.set({'distributed.worker.memory.target': 0.50,    # Spill to disk at 60% memory usage
            'distributed.worker.memory.spill': 0.60,     # Spill to disk at 70% memory usage
            'distributed.worker.memory.pause': 0.80,     # Pause worker at 80% memory usage
            'distributed.worker.memory.terminate': 0.95, # Terminate worker at 95% memory usage
            'distributed.scheduler.allowed-failures': 4, # Set the allowed failures to 10          
           })

# Read the data using Dask
ddf = dd.read_hdf(input_hdf_path, 'unique_encounters')

# Repartition the dataframe into smaller chunks
num_partitions = 2
ddf = ddf.repartition(npartitions=num_partitions)

# Function to process each chunk
def process_chunk(chunk):
    return chunk.drop_duplicates()

# Apply the function to each partition
result = ddf.map_partitions(process_chunk)

# Compute the result
computed_result = result.compute()
#computed_result.to_csv(os.path.join(working_dir[:-1], 'dup_diag_unique_encounters.csv'), index=False)
# Remove duplicates again to ensure global uniqueness
computed_result = computed_result.drop_duplicates()

# Write the combined DataFrame to a single CSV file
computed_result.to_csv(output_csv_path, single_file=True, index=False)

print(f"Combined CSV saved to: {output_csv_path}")
"""


Code to make the Inpatient and Emergency Screens break into chunks - no longer needed.

In [None]:

"""
# Repartition the DataFrames into smaller chunks for better parallelism
num_partitions = 16
diag_ddf = diag_ddf.repartition(npartitions=num_partitions)
vitals_ddf = vitals_ddf.repartition(npartitions=num_partitions)
med_ddf = med_ddf.repartition(npartitions=num_partitions)
lab_ddf = lab_ddf.repartition(npartitions=num_partitions)
proc_ddf = proc_ddf.repartition(npartitions=num_partitions)

# Ensure consistent partitioning and indexing
def set_index_and_repartition(ddf):
    ddf = ddf.set_index('encounter_id').repartition(npartitions=num_partitions)
    return ddf

diag_ddf = set_index_and_repartition(diag_ddf)
vitals_ddf = set_index_and_repartition(vitals_ddf)
med_ddf = set_index_and_repartition(med_ddf)
lab_ddf = set_index_and_repartition(lab_ddf)
proc_ddf = set_index_and_repartition(proc_ddf)
"""
"""
# Merge DataFrames sequentially with intermediate persisting
merged = diag_ddf.merge(vitals_ddf, on='encounter_id', how='inner')
merged = merged.persist()

merged = merged.merge(med_ddf, on='encounter_id', how='inner')
merged = merged.persist()

merged = merged.merge(proc_ddf, on='encounter_id', how='inner')
merged = merged.persist()

merged = merged.merge(lab_ddf, on='encounter_id', how='inner')
merged = merged.persist()

# Drop duplicates just in case
merged = merged.drop_duplicates()


# Write the combined DataFrame to a single CSV file directly with Dask
merged.to_csv(output_csv_path, single_file=True, index=False)

print(f"Intersection CSV saved to: {output_csv_path}")

# Close the Dask client
client.close()
cluster.close()
"""

In [None]:
# Code to clean up client
"""
# Start a Dask client with the dashboard
print(client)

# Open the dashboard URL printed in the output and monitor the tasks
# Close the Dask client and cluster
client.close()
cluster.close()
"""