In [0]:
import json
import os

# Parameter for whether to overwrite existing files
overwrite_existing = dbutils.widgets.get("overwrite_results_table")
print(f"Overwrite existing files: {overwrite_existing}")

# List .txt files in Unity Catalog volume
src_paths = [
    f.path for f in dbutils.fs.ls(dbutils.widgets.get("volume_path"))
    if f.path.lower().endswith(".txt")
]
print(f"Found {len(src_paths)} source text files")

# List existing files in the destination to avoid copying duplicates
existing_files = set()
try:
    existing_files_info = dbutils.fs.ls("dbfs:/tmp/")
    existing_files = {f.name for f in existing_files_info}
    print(f"Found {len(existing_files)} existing files in destination")
except Exception as e:
    print(f"No existing files found in destination: {str(e)}")

copied_paths = []
skipped_paths = []

# Copy each file to /dbfs/tmp and collect real local-accessible paths
for src in src_paths:
    filename = src.split("/")[-1]
    dst_java = f"/dbfs/tmp/{filename}"
    dst_spark = f"dbfs:/tmp/{filename}"
    
    # Check if file already exists
    file_exists = filename in existing_files
    
    if file_exists and not overwrite_existing:
        print(f"Skipping existing file: {filename}")
        
        # If the file exists locally, add it to the paths for processing
        if os.path.exists(dst_java):
            print(f"  ✓ File exists locally: {dst_java}")
            copied_paths.append(dst_java)
        else:
            print(f"  ⚠️ File exists in DBFS but not locally: {dst_java}")
            # Try to copy it locally to ensure consistency
            try:
                dbutils.fs.cp(dst_spark, dst_spark, recurse=False)  # Copy to itself to ensure local file exists
                if os.path.exists(dst_java):
                    print(f"  ✓ Successfully ensured local file: {dst_java}")
                    copied_paths.append(dst_java)
                else:
                    print(f"  ❌ Failed to ensure local file: {dst_java}")
            except Exception as e:
                print(f"  ❌ Error ensuring local file: {str(e)}")
        
        skipped_paths.append(dst_java)
    else:
        # Copy the file (either it's new or we're overwriting)
        action = "Overwriting" if file_exists else "Copying"
        print(f"{action} file: {filename}")
        
        try:
            dbutils.fs.cp(src, dst_spark, recurse=False)
            
            # Verify the file exists locally
            if os.path.exists(dst_java):
                file_size = os.path.getsize(dst_java)
                print(f"  ✓ Successfully copied: {dst_java} ({file_size} bytes)")
                copied_paths.append(dst_java)
            else:
                print(f"  ⚠️ File copied to DBFS but not found locally: {dst_java}")
                # Try to ensure the local file exists
                try:
                    dbutils.fs.cp(dst_spark, dst_spark, recurse=False)
                    if os.path.exists(dst_java):
                        print(f"  ✓ Successfully ensured local file: {dst_java}")
                        copied_paths.append(dst_java)
                except Exception as e:
                    print(f"  ❌ Error ensuring local file: {str(e)}")
        except Exception as e:
            print(f"  ❌ Error copying file: {str(e)}")

# Remove duplicates while preserving order
unique_paths = []
seen = set()
for path in copied_paths:
    if path not in seen:
        seen.add(path)
        unique_paths.append(path)

# Summary
print("\nSummary:")
print(f"Total source files: {len(src_paths)}")
print(f"Files copied: {len(copied_paths) - len(skipped_paths)}")
print(f"Files skipped (already exist): {len(skipped_paths)}")
print(f"Total files for processing: {len(unique_paths)}")

# Store for downstream use
dbutils.jobs.taskValues.set("file_paths", unique_paths)
print("\nPaths stored for downstream processing:")