<h3>
Check System Resources and Capabilities
</h3>

In [1]:
import psutil
import platform
import os

# Check system information
print("=== SYSTEM INFORMATION ===")
print(f"System: {platform.system()}")
print(f"Machine: {platform.machine()}")
print(f"Processor: {platform.processor()}")
print(f"Python Version: {platform.python_version()}")

# Check memory
memory = psutil.virtual_memory()
print(f"\n=== MEMORY INFORMATION ===")
print(f"Total Memory: {memory.total / (1024**3):.2f} GB")
print(f"Available Memory: {memory.available / (1024**3):.2f} GB")
print(f"Memory Usage: {memory.percent:.1f}%")

# Check CPU
print(f"\n=== CPU INFORMATION ===")
print(f"Physical CPU Cores: {psutil.cpu_count(logical=False)}")
print(f"Logical CPU Cores: {psutil.cpu_count(logical=True)}")
print(f"Current CPU Usage: {psutil.cpu_percent(interval=1)}%")

# Check disk space
disk = psutil.disk_usage('/')
print(f"\n=== DISK INFORMATION ===")
print(f"Total Disk Space: {disk.total / (1024**3):.2f} GB")
print(f"Free Disk Space: {disk.free / (1024**3):.2f} GB")
print(f"Disk Usage: {(disk.used / disk.total) * 100:.1f}%")

print(f"\n=== RECOMMENDED SETTINGS FOR YOUR SYSTEM ===")
recommended_workers = max(1, psutil.cpu_count(logical=False) - 1)
recommended_memory_per_worker = max(2, int((memory.available / (1024**3)) / recommended_workers))
print(f"Recommended Dask Workers: {recommended_workers}")
print(f"Recommended Memory per Worker: {recommended_memory_per_worker} GB")

=== SYSTEM INFORMATION ===
System: Darwin
Machine: x86_64
Processor: i386
Python Version: 3.11.7

=== MEMORY INFORMATION ===
Total Memory: 16.00 GB
Available Memory: 1.62 GB
Memory Usage: 89.9%

=== CPU INFORMATION ===
Physical CPU Cores: 10
Logical CPU Cores: 10
Current CPU Usage: 16.8%

=== DISK INFORMATION ===
Total Disk Space: 460.43 GB
Free Disk Space: 107.47 GB
Disk Usage: 2.3%

=== RECOMMENDED SETTINGS FOR YOUR SYSTEM ===
Recommended Dask Workers: 9
Recommended Memory per Worker: 2 GB


<h3>Install required packages
</h3>

In [2]:
# Check if required packages are installed
import sys
import subprocess

required_packages = [
    'dask[complete]',
    'pandas', 
    'numpy',
    'pyarrow',  # For Parquet support
    'psutil',   # For monitoring
    'bokeh',    # For Dask dashboard
]

print("=== CHECKING INSTALLED PACKAGES ===")
for package in required_packages:
    try:
        if package == 'dask[complete]':
            import dask
            print(f"✓ dask: {dask.__version__}")
        elif package == 'pandas':
            import pandas as pd
            print(f"✓ pandas: {pd.__version__}")
        elif package == 'numpy':
            import numpy as np
            print(f"✓ numpy: {np.__version__}")
        elif package == 'pyarrow':
            import pyarrow as pa
            print(f"✓ pyarrow: {pa.__version__}")
        elif package == 'psutil':
            import psutil
            print(f"✓ psutil: {psutil.__version__}")
        elif package == 'bokeh':
            import bokeh
            print(f"✓ bokeh: {bokeh.__version__}")
    except ImportError:
        print(f"✗ {package}: NOT INSTALLED")
        print(f"Install with: pip install {package}")

print(f"\n=== PYTHON ENVIRONMENT ===")
print(f"Python executable: {sys.executable}")
print(f"Current working directory: {os.getcwd()}")

=== CHECKING INSTALLED PACKAGES ===
✓ dask: 2025.2.0
✓ pandas: 2.2.3
✓ numpy: 1.26.4
✓ pyarrow: 19.0.1
✓ psutil: 7.0.0
✓ bokeh: 3.6.2

=== PYTHON ENVIRONMENT ===
Python executable: /opt/anaconda3/bin/python
Current working directory: /Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /CODE


<h3>Basic DASK import and version check
</h3>

In [5]:
import dask
import dask.dataframe as dd
import dask.bag as db
from dask.distributed import Client, LocalCluster

print("=== TESTING DASK IMPORTS ===")
print(f"Dask version: {dask.__version__}")
print("All Dask components imported successfully")

print("\n=== MEMORY WARNING CHECK ===")
import psutil
memory = psutil.virtual_memory()
available_gb = memory.available / (1024**3)
print(f"Available Memory: {available_gb:.1f} GB")

if available_gb < 2:
    print("WARNING: Very low memory available")
    print("Consider closing other applications")
else:
    print("Memory looks sufficient for basic testing")

print("\nReady to proceed with Dask cluster creation")

=== TESTING DASK IMPORTS ===
Dask version: 2025.2.0
All Dask components imported successfully

Available Memory: 1.6 GB
Consider closing other applications

Ready to proceed with Dask cluster creation


In [None]:
from dask.distributed import Client, LocalCluster
import dask

print("=== CREATING DASK CLUSTER ===")

# Very conservative settings for low memory
cluster = LocalCluster(
    n_workers=2,           # Only 2 workers due to memory constraints
    threads_per_worker=2,  # 2 threads each
    memory_limit='500MB',  # Very conservative memory per worker
    dashboard_address=':8787'
)

print("Cluster created successfully")

# Connect client
client = Client(cluster)
print("Client connected to cluster")

# Show cluster info
print(f"\n=== CLUSTER INFORMATION ===")
print(f"Dashboard link: {client.dashboard_link}")
print(f"Number of workers: {len(client.scheduler_info()['workers'])}")

# Simple test computation
print(f"\n=== TESTING COMPUTATION ===")
import dask.array as da

# Small test array
x = da.ones((100, 100), chunks=(50, 50))
result = x.sum().compute()
print(f"Test computation result: {result}")

print(f"\n=== SUCCESS ===")
print("Dask cluster is running and ready")
print("You can open the dashboard link in your browser")
print("Keep this running for the next steps")

=== CREATING DASK CLUSTER ===
Cluster created successfully
Client connected to cluster

=== CLUSTER INFORMATION ===
Dashboard link: http://127.0.0.1:8787/status
Number of workers: 2

=== TESTING COMPUTATION ===
Test computation result: 10000.0

=== SUCCESS ===
Dask cluster is running and ready
You can open the dashboard link in your browser
Keep this running for the next steps


2025-07-04 01:56:33,774 - distributed.scheduler - ERROR - Task ('flatten-from_sequence-process_single_file-to_dataframe-9b0f3775507126bc1fcd6829e69483d4', 0) marked as failed because 4 workers died while trying to run it
2025-07-04 01:56:33,774 - distributed.scheduler - ERROR - Task ('flatten-from_sequence-process_single_file-to_dataframe-9b0f3775507126bc1fcd6829e69483d4', 1) marked as failed because 4 workers died while trying to run it


<h3>Discovering all data files</h3>

In [7]:
import os
import glob
from pathlib import Path

print("=== DISCOVERING DATA FILES ===")

# Your specific data directories
data_dir_2024 = Path("/Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2024")
data_dir_2025 = Path("/Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2025")

print(f"2024 data directory: {data_dir_2024}")
print(f"2025 data directory: {data_dir_2025}")

# Check if directories exist
print(f"\n=== DIRECTORY EXISTENCE CHECK ===")
dir_2024_exists = data_dir_2024.exists()
dir_2025_exists = data_dir_2025.exists()

print(f"2024 directory exists: {dir_2024_exists}")
print(f"2025 directory exists: {dir_2025_exists}")

# Find all .gz files
all_files = []

if dir_2024_exists:
    files_2024 = list(data_dir_2024.glob("*.gz"))
    print(f"Found {len(files_2024)} .gz files in 2024 directory")
    all_files.extend(files_2024)

if dir_2025_exists:
    files_2025 = list(data_dir_2025.glob("*.gz"))
    print(f"Found {len(files_2025)} .gz files in 2025 directory")
    all_files.extend(files_2025)

print(f"\n=== FILE DISCOVERY RESULTS ===")
print(f"Total .gz files found: {len(all_files)}")

if len(all_files) > 0:
    # Show first few files from each year
    print(f"\nFirst few 2024 files:")
    files_2024_sorted = sorted([f for f in all_files if "2024" in str(f)])[:3]
    for file_path in files_2024_sorted:
        print(f"  {file_path.name}")
    
    print(f"\nFirst few 2025 files:")
    files_2025_sorted = sorted([f for f in all_files if "2025" in str(f)])[:3]
    for file_path in files_2025_sorted:
        print(f"  {file_path.name}")
    
    # Show file size info
    total_size = sum(f.stat().st_size for f in all_files if f.exists())
    print(f"\nTotal data size: {total_size / (1024**3):.2f} GB")
    
else:
    print("No .gz files found in the specified directories")

# Store file paths for next step
if len(all_files) > 0:
    file_paths = [str(f) for f in all_files]
    print(f"\nReady to process {len(file_paths)} files")
else:
    print("Cannot proceed without data files")

=== DISCOVERING DATA FILES ===
2024 data directory: /Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2024
2025 data directory: /Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2025

=== DIRECTORY EXISTENCE CHECK ===
2024 directory exists: True
2025 directory exists: True
Found 238 .gz files in 2024 directory
Found 93 .gz files in 2025 directory

=== FILE DISCOVERY RESULTS ===
Total .gz files found: 331

First few 2024 files:
  2024-04-03-prusa.gz
  2024-04-04-prusa.gz
  2024-04-05-prusa.gz

First few 2025 files:
  2025-01-20-prusa.gz
  2025-01-21-prusa.gz
  2025-01-22-prusa.gz

Total data size: 0.56 GB

Ready to process 331 files


<h2>Analysing File structure and Creating processing bathes</h2>
<h3>Analyzes all your files, sorts them by date, calculates sizes, and creates processing batches of 20 files each for efficient processing.</h3>

In [8]:
import os
from pathlib import Path
from datetime import datetime

print("=== ANALYZING FILE STRUCTURE ===")

# Get all files again (from previous step)
data_dir_2024 = Path("/Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2024")
data_dir_2025 = Path("/Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2025")

all_files = []
all_files.extend(list(data_dir_2024.glob("*.gz")))
all_files.extend(list(data_dir_2025.glob("*.gz")))

print(f"Processing {len(all_files)} files")

# Analyze file details
file_details = []
for file_path in all_files:
    try:
        # Extract date from filename
        filename = file_path.name
        date_part = filename.split('-prusa.gz')[0]  # Get YYYY-MM-DD part
        file_date = datetime.strptime(date_part, "%Y-%m-%d")
        file_size = file_path.stat().st_size
        
        file_details.append({
            'path': str(file_path),
            'filename': filename,
            'date': file_date,
            'size_mb': file_size / (1024**2),
            'year': file_date.year,
            'month': file_date.month
        })
    except Exception as e:
        print(f"Error processing {file_path.name}: {e}")

print(f"Successfully analyzed {len(file_details)} files")

# Sort by date
file_details.sort(key=lambda x: x['date'])

print(f"\n=== DATE RANGE ANALYSIS ===")
if file_details:
    first_date = file_details[0]['date']
    last_date = file_details[-1]['date']
    print(f"Date range: {first_date.strftime('%Y-%m-%d')} to {last_date.strftime('%Y-%m-%d')}")
    
    # Show file size statistics
    sizes = [f['size_mb'] for f in file_details]
    avg_size = sum(sizes) / len(sizes)
    min_size = min(sizes)
    max_size = max(sizes)
    
    print(f"Average file size: {avg_size:.2f} MB")
    print(f"File size range: {min_size:.2f} MB to {max_size:.2f} MB")

# Create processing batches (groups of files)
print(f"\n=== CREATING PROCESSING BATCHES ===")

batch_size = 20  # Process 20 files at a time
batches = []

for i in range(0, len(file_details), batch_size):
    batch_files = file_details[i:i+batch_size]
    batch_size_mb = sum(f['size_mb'] for f in batch_files)
    
    batches.append({
        'batch_id': len(batches) + 1,
        'files': batch_files,
        'file_count': len(batch_files),
        'total_size_mb': batch_size_mb,
        'date_range': f"{batch_files[0]['date'].strftime('%Y-%m-%d')} to {batch_files[-1]['date'].strftime('%Y-%m-%d')}"
    })

print(f"Created {len(batches)} processing batches")
print(f"Files per batch: {batch_size} (except last batch may have fewer)")

# Show first few batches
print(f"\n=== BATCH PREVIEW ===")
for i, batch in enumerate(batches[:3]):
    print(f"Batch {batch['batch_id']}: {batch['file_count']} files, {batch['total_size_mb']:.1f} MB, {batch['date_range']}")

if len(batches) > 3:
    print(f"... and {len(batches) - 3} more batches")

print(f"\n=== READY FOR PROCESSING ===")
print(f"Total batches to process: {len(batches)}")
print(f"Estimated processing time: {len(batches) * 2} minutes (2 min per batch)")

=== ANALYZING FILE STRUCTURE ===
Processing 331 files
Successfully analyzed 331 files

=== DATE RANGE ANALYSIS ===
Date range: 2024-04-03 to 2025-04-29
Average file size: 1.74 MB
File size range: 0.00 MB to 3.00 MB

=== CREATING PROCESSING BATCHES ===
Created 17 processing batches
Files per batch: 20 (except last batch may have fewer)

=== BATCH PREVIEW ===
Batch 1: 20 files, 37.3 MB, 2024-04-03 to 2024-05-02
Batch 2: 20 files, 36.0 MB, 2024-05-03 to 2024-05-22
Batch 3: 20 files, 26.7 MB, 2024-05-23 to 2024-06-11
... and 14 more batches

=== READY FOR PROCESSING ===
Total batches to process: 17
Estimated processing time: 34 minutes (2 min per batch)


<h3>Processing first branch</h3>

Takes first 3 files from your data <br>
Reads each .gz file and extracts JSON records<br>
Flattens the data (same as your original approach)<br>
Creates a Dask DataFrame with all records<br>
Shows basic info about the result<br>

In [9]:
import gzip
import json
import dask.bag as db
from pathlib import Path

print("=== PROCESSING FIRST BATCH ===")

# Get first batch of files
data_dir_2024 = Path("/Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2024")
data_dir_2025 = Path("/Users/tusharjoshi/Desktop/ProjectWorkAll/Dissertation /DATA/2025")

all_files = []
all_files.extend(list(data_dir_2024.glob("*.gz")))
all_files.extend(list(data_dir_2025.glob("*.gz")))
all_files.sort()

# Take first 3 files for testing
first_batch = [str(f) for f in all_files[:3]]

print(f"Processing {len(first_batch)} files:")
for filepath in first_batch:
    print(f"  {Path(filepath).name}")

# Process batch with Dask
bag = db.from_sequence(first_batch, npartitions=2)

def process_single_file(filepath):
    records = []
    with gzip.open(filepath, 'rt', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                record = json.loads(line.strip())
                # Flatten the record
                flat_record = {
                    'timestamp': record.get('date'),
                    'printer_id': record.get('id'),
                    'status': record.get('check')
                }
                # Add data fields
                if record.get('data'):
                    flat_record.update(record['data'])
                records.append(flat_record)
    return records

# Process files and flatten
records_bag = bag.map(process_single_file).flatten()

# Convert to DataFrame
df = records_bag.to_dataframe()

print(f"Batch processed successfully")
print(f"DataFrame shape: {df.shape}")
print(f"Columns: {list(df.columns)}")

=== PROCESSING FIRST BATCH ===
Processing 3 files:
  2024-04-03-prusa.gz
  2024-04-04-prusa.gz
  2024-04-05-prusa.gz
Batch processed successfully
DataFrame shape: (<dask_expr.expr.Scalar: expr=FromGraph(7c89dc5).size() // 15, dtype=int64>, 15)
Columns: ['timestamp', 'printer_id', 'status', 'state', 'tempBed', 'targetBed', 'tempNozzle', 'targetNozzle', 'axisZ', 'axisX', 'axisY', 'flow', 'speed', 'fanHotend', 'fanPrint']


In [None]:
print("=== GETTING DETAILED BATCH INFORMATION ===")

# Get actual row count
total_rows = len(df)
print(f"Total records in batch: {total_rows}")

# Get first few rows to see the data
print(f"\n=== SAMPLE DATA ===")
sample_data = df.head()
print(f"First 5 rows:")
print(sample_data)

print(f"\n=== BATCH PROCESSING SUCCESS ===")
print(f"✓ Successfully processed 3 files")
print(f"✓ Found {total_rows} total records") 
print(f"✓ All expected columns present")
print(f"✓ Data structure matches previous work")
print(f"✓ Ready to process all 331 files")

=== GETTING DETAILED BATCH INFORMATION ===




KilledWorker: Attempted to run task ('flatten-from_sequence-process_single_file-to_dataframe-9b0f3775507126bc1fcd6829e69483d4', 1) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:49586. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

2025-07-04 02:14:18,040 - tornado.application - ERROR - Exception in callback <bound method SystemMonitor.update of <SystemMonitor: cpu: 0 memory: 98 MB fds: 24>>
Traceback (most recent call last):
  File "/Users/tusharjoshi/.local/lib/python3.11/site-packages/tornado/ioloop.py", line 937, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.11/site-packages/distributed/system_monitor.py", line 168, in update
    net_ioc = psutil.net_io_counters()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tusharjoshi/.local/lib/python3.11/site-packages/psutil/__init__.py", line 2148, in net_io_counters
    rawdict = _psplatform.net_io_counters()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 12] Cannot allocate memory
2025-07-04 04:00:24,040 - tornado.application - ERROR - Exception in callback <bound method SystemMonitor.update of <SystemMonitor: cpu: 0 memory: 62 MB fds: 24>>
Traceback (most recent call last):
  File "/Users/tusharjos

<h3>Due to memory constraints, instead of running making batches we will process one file at a time into dask</h3>

In [11]:
# Close current cluster
client.close()
cluster.close()

print("=== RESTARTING DASK WITH MEMORY-AWARE SETTINGS ===")

from dask.distributed import Client, LocalCluster
import dask

# Configure Dask for your system
dask.config.set({
    'distributed.worker.memory.target': 0.6,
    'distributed.worker.memory.spill': 0.7,
    'distributed.worker.memory.pause': 0.8,
    'distributed.worker.memory.terminate': 0.9
})

cluster = LocalCluster(
    n_workers=1,
    threads_per_worker=2,
    memory_limit='800MB',
    dashboard_address=':8787'
)

client = Client(cluster)
print("Dask cluster restarted with proper memory management")
print(f"Dashboard: {client.dashboard_link}")

=== RESTARTING DASK WITH MEMORY-AWARE SETTINGS ===
Dask cluster restarted with proper memory management
Dashboard: http://127.0.0.1:8787/status


<h3>Test Single File processing</h3>

In [14]:
print("=== CORRECT APPROACH: NATURAL MISSING VALUE HANDLING ===")

def process_single_file_natural(filepath):
    records = []
    with gzip.open(filepath, 'rt', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                try:
                    record = json.loads(line.strip())
                    flat_record = {
                        'timestamp': record.get('date'),
                        'printer_id': record.get('id'), 
                        'status': record.get('check')
                    }
                    # Add data fields as-is, let Dask handle types
                    if record.get('data'):
                        flat_record.update(record['data'])
                    records.append(flat_record)
                except:
                    continue
    return records

# Process with natural type inference
bag = db.from_sequence([single_file_path], npartitions=1)
records_bag = bag.map(process_single_file_natural).flatten()

# Convert to dataframe WITHOUT forcing types
df_single = records_bag.to_dataframe(meta=None)  # Let Dask figure out types

print(f"✅ Processing with natural type handling")
print(f"Columns: {list(df_single.columns)}")

# Get sample
sample = df_single.head(5)
print(f"Sample data:")
print(sample)

=== CORRECT APPROACH: NATURAL MISSING VALUE HANDLING ===
✅ Processing with natural type handling
Columns: ['timestamp', 'printer_id', 'status', 'state', 'tempBed', 'targetBed', 'tempNozzle', 'targetNozzle', 'axisZ', 'axisX', 'axisY', 'flow', 'speed', 'fanHotend', 'fanPrint']


2025-07-04 04:24:01,709 - distributed.worker - ERROR - Compute Failed
Key:       ('flatten-from_sequence-process_single_file_natural-to_dataframe-e4c65c0fc795dfa9d8644dbc21ad6817', 0)
State:     executing
Task:  <Task ('flatten-from_sequence-process_single_file_natural-to_dataframe-e4c65c0fc795dfa9d8644dbc21ad6817', 0) _execute_subgraph(...)>
Exception: 'IntCastingNaNError("Cannot convert non-finite values (NA or inf) to integer: Error while type casting for column \'targetBed\'")'
Traceback: '  File "/opt/anaconda3/lib/python3.11/site-packages/dask/bag/core.py", line 2616, in to_dataframe\n    return res.astype(dtypes, **kwargs)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/opt/anaconda3/lib/python3.11/site-packages/pandas/core/generic.py", line 6620, in astype\n    res_col = col.astype(dtype=cdt, copy=copy, errors=errors)\n              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/opt/anaconda3/lib/python3.11/site-packages/pandas/core/generic.py", line 6643, in asty

IntCastingNaNError: Cannot convert non-finite values (NA or inf) to integer: Error while type casting for column 'targetBed'