In [5]:
# =============================================================================
# ROBUST SEG-Y ANALYZER FOR PROBLEMATIC FILES LIKE DAT_0023.SGY
# =============================================================================

import struct
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pathlib import Path
import warnings
from typing import Dict, List, Tuple, Union, Optional

warnings.filterwarnings('ignore')

class RobustSEGYVerifier:
    """
    Robust SEG-Y file analyzer that handles corrupted/non-standard files
    """
    
    def __init__(self, filepath: str):
        """Initialize with SEG-Y file path"""
        self.filepath = Path(filepath)
        self.file_size = self.filepath.stat().st_size if self.filepath.exists() else 0
        
        # SEG-Y format constants
        self.TEXTUAL_HEADER_SIZE = 3200
        self.BINARY_HEADER_SIZE = 400
        self.TRACE_HEADER_SIZE = 240
        
        # Data format codes and their properties (including invalid ones)
        self.DATA_FORMATS = {
            0: {'name': 'INVALID - Not defined in SEG-Y standard', 'size': 0, 'type': 'invalid'},
            1: {'name': '4-byte IBM floating point', 'size': 4, 'type': 'ibm_float'},
            2: {'name': '4-byte two\'s complement integer', 'size': 4, 'type': 'int32'},
            3: {'name': '2-byte two\'s complement integer', 'size': 2, 'type': 'int16'},
            4: {'name': '4-byte fixed point with gain', 'size': 4, 'type': 'fixed_point'},
            5: {'name': '4-byte IEEE floating point', 'size': 4, 'type': 'float32'},
            6: {'name': '8-byte IEEE floating point', 'size': 8, 'type': 'float64'},
            7: {'name': 'RESERVED - Not used', 'size': 0, 'type': 'invalid'},
            8: {'name': '1-byte two\'s complement integer', 'size': 1, 'type': 'int8'}
        }
        
        # Trace identification codes
        self.TRACE_ID_CODES = {
            1: 'Seismic data',
            2: 'Dead trace',
            3: 'Dummy trace',
            4: 'Time break',
            5: 'Uphole',
            6: 'Sweep',
            7: 'Timing',
            8: 'Water break'
        }
        
        # Trace sorting codes
        self.SORTING_CODES = {
            0: 'UNKNOWN/INVALID',
            1: 'As recorded (field format)',
            2: 'CDP ensemble',
            3: 'Single fold continuous profile',
            4: 'Horizontally stacked',
            5: 'Common source point',
            6: 'Common receiver point',
            7: 'Common offset point',
            8: 'Common mid-point',
            9: 'Common conversion point'
        }
        
        # Initialize data containers
        self.textual_header = None
        self.binary_header = {}
        self.trace_headers = []
        self.traces_data = []
        self.file_issues = []
    
    def decode_textual_header_robust(self, textual_bytes: bytes) -> tuple:
        """Robust textual header decoding with multiple encoding attempts"""
        encodings_to_try = [
            ('cp500', 'IBM EBCDIC'),
            ('cp037', 'IBM EBCDIC US'),
            ('ascii', 'ASCII'),
            ('utf-8', 'UTF-8'),
            ('latin1', 'Latin-1'),
            ('cp1252', 'Windows-1252')
        ]
        
        best_result = None
        best_encoding = None
        max_printable = 0
        
        for encoding, name in encodings_to_try:
            try:
                decoded = textual_bytes.decode(encoding)
                # Count printable characters
                printable_count = sum(1 for c in decoded if c.isprintable() or c.isspace())
                
                if printable_count > max_printable:
                    max_printable = printable_count
                    best_result = decoded
                    best_encoding = name
                    
            except UnicodeDecodeError:
                continue
        
        if best_result is None:
            # Last resort: decode with errors='replace'
            best_result = textual_bytes.decode('utf-8', errors='replace')
            best_encoding = 'UTF-8 (with replacements)'
        
        return best_result, best_encoding, max_printable
    
    def read_textual_header(self) -> str:
        """Read and decode textual file header with robust error handling"""
        print("=== READING TEXTUAL HEADER (ROBUST MODE) ===")
        
        with open(self.filepath, 'rb') as f:
            textual_bytes = f.read(self.TEXTUAL_HEADER_SIZE)
        
        decoded_text, encoding_used, printable_count = self.decode_textual_header_robust(textual_bytes)
        
        print(f"Best encoding: {encoding_used}")
        print(f"Printable characters: {printable_count}/{len(decoded_text)} ({100*printable_count/len(decoded_text):.1f}%)")
        
        # Parse into 40 lines of 80 characters
        lines = []
        for i in range(40):
            start = i * 80
            end = start + 80
            line = decoded_text[start:end]
            # Clean up non-printable characters for display
            clean_line = ''.join(c if c.isprintable() or c.isspace() else '?' for c in line)
            lines.append(f"C{i+1:02d} {clean_line}")
        
        self.textual_header = '\n'.join(lines)
        
        # Show first few readable lines
        print("First readable lines:")
        readable_lines = 0
        for i, line in enumerate(lines):
            clean_content = line[4:].strip()  # Remove "Cxx " prefix
            if clean_content and not all(c in '? ' for c in clean_content):
                print(f"  {line}")
                readable_lines += 1
                if readable_lines >= 5:
                    break
        
        if readable_lines == 0:
            print("  [No clearly readable text found - file may be corrupted]")
            self.file_issues.append("Textual header appears corrupted or uses unsupported encoding")
        
        return self.textual_header
    
    def read_binary_header_safe(self) -> Dict:
        """Read binary header with comprehensive error checking"""
        print("\n=== READING BINARY HEADER (ROBUST MODE) ===")
        
        with open(self.filepath, 'rb') as f:
            f.seek(self.TEXTUAL_HEADER_SIZE)
            binary_bytes = f.read(self.BINARY_HEADER_SIZE)
        
        if len(binary_bytes) < self.BINARY_HEADER_SIZE:
            self.file_issues.append(f"Binary header truncated: {len(binary_bytes)} bytes (expected {self.BINARY_HEADER_SIZE})")
            return {}
        
        # Parse fields with error handling
        try:
            header_fields = {
                'job_id': struct.unpack('>i', binary_bytes[12:16])[0],
                'line_number': struct.unpack('>i', binary_bytes[16:20])[0],
                'reel_number': struct.unpack('>i', binary_bytes[20:24])[0],
                'traces_per_ensemble': struct.unpack('>h', binary_bytes[22:24])[0],
                'aux_traces_per_ensemble': struct.unpack('>h', binary_bytes[24:26])[0],
                'sample_interval': struct.unpack('>h', binary_bytes[26:28])[0],
                'sample_interval_original': struct.unpack('>h', binary_bytes[28:30])[0],
                'samples_per_trace': struct.unpack('>h', binary_bytes[30:32])[0],
                'samples_per_trace_original': struct.unpack('>h', binary_bytes[32:34])[0],
                'data_sample_format': struct.unpack('>h', binary_bytes[34:36])[0],
                'ensemble_fold': struct.unpack('>h', binary_bytes[36:38])[0],
                'trace_sorting_code': struct.unpack('>h', binary_bytes[38:40])[0],
                'vertical_sum_code': struct.unpack('>h', binary_bytes[40:42])[0],
                'segy_revision': struct.unpack('>h', binary_bytes[64:66])[0],
                'fixed_length_trace_flag': struct.unpack('>h', binary_bytes[66:68])[0],
                'extended_textual_headers': struct.unpack('>h', binary_bytes[68:70])[0]
            }
        except struct.error as e:
            self.file_issues.append(f"Binary header parsing error: {e}")
            return {}
        
        self.binary_header = header_fields
        
        # Validate critical fields
        issues = []
        
        # Check data format
        format_code = header_fields['data_sample_format']
        if format_code not in self.DATA_FORMATS:
            issues.append(f"Unknown data format code: {format_code}")
        elif self.DATA_FORMATS[format_code]['size'] == 0:
            issues.append(f"Invalid data format code: {format_code} - {self.DATA_FORMATS[format_code]['name']}")
        
        # Check samples per trace
        if header_fields['samples_per_trace'] <= 0:
            issues.append(f"Invalid samples per trace: {header_fields['samples_per_trace']}")
        
        # Check sample interval
        if header_fields['sample_interval'] <= 0:
            issues.append(f"Invalid sample interval: {header_fields['sample_interval']}")
        
        # Check SEG-Y revision
        if header_fields['segy_revision'] not in [0, 1, 2]:
            issues.append(f"Unknown SEG-Y revision: {header_fields['segy_revision']}")
        
        self.file_issues.extend(issues)
        
        # Display results
        print("Binary Header Analysis:")
        print(f"  SEG-Y Revision: {header_fields['segy_revision']}")
        print(f"  Data Format: {format_code} - {self.DATA_FORMATS.get(format_code, {}).get('name', 'Unknown')}")
        print(f"  Samples per trace: {header_fields['samples_per_trace']}")
        print(f"  Sample interval: {header_fields['sample_interval']} microseconds")
        print(f"  Trace sorting: {header_fields['trace_sorting_code']} - {self.SORTING_CODES.get(header_fields['trace_sorting_code'], 'Unknown')}")
        
        if issues:
            print("\n⚠️ Issues detected:")
            for issue in issues:
                print(f"    - {issue}")
        
        return header_fields
    
    def analyze_file_structure_safe(self) -> Dict:
        """Analyze file structure with safety checks for invalid data"""
        print("\n=== FILE STRUCTURE ANALYSIS (SAFE MODE) ===")
        
        if not self.binary_header:
            self.read_binary_header_safe()
        
        analysis = {
            'file_size_actual': self.file_size,
            'header_size': self.TEXTUAL_HEADER_SIZE + self.BINARY_HEADER_SIZE,
            'can_analyze_traces': False,
            'issues': []
        }
        
        # Check if we can analyze trace structure
        samples_per_trace = self.binary_header.get('samples_per_trace', 0)
        format_code = self.binary_header.get('data_sample_format', 0)
        
        if samples_per_trace <= 0:
            analysis['issues'].append("Cannot analyze traces: samples_per_trace is zero or negative")
        elif format_code not in self.DATA_FORMATS:
            analysis['issues'].append(f"Cannot analyze traces: unknown format code {format_code}")
        elif self.DATA_FORMATS[format_code]['size'] == 0:
            analysis['issues'].append(f"Cannot analyze traces: invalid format code {format_code}")
        else:
            # We can analyze trace structure
            analysis['can_analyze_traces'] = True
            
            bytes_per_sample = self.DATA_FORMATS[format_code]['size']
            trace_data_size = samples_per_trace * bytes_per_sample
            trace_total_size = self.TRACE_HEADER_SIZE + trace_data_size
            
            remaining_bytes = self.file_size - analysis['header_size']
            estimated_traces = remaining_bytes // trace_total_size if trace_total_size > 0 else 0
            expected_file_size = analysis['header_size'] + (estimated_traces * trace_total_size)
            
            analysis.update({
                'trace_data_size': trace_data_size,
                'trace_total_size': trace_total_size,
                'bytes_per_sample': bytes_per_sample,
                'estimated_traces': estimated_traces,
                'file_size_expected': expected_file_size,
                'size_difference': self.file_size - expected_file_size
            })
        
        # Display results
        print(f"File size: {self.file_size:,} bytes ({self.file_size/1024/1024:.2f} MB)")
        print(f"Header size: {analysis['header_size']:,} bytes")
        
        if analysis['can_analyze_traces']:
            print(f"Expected file size: {analysis['file_size_expected']:,} bytes")
            print(f"Size difference: {analysis['size_difference']:,} bytes")
            print(f"Estimated traces: {analysis['estimated_traces']:,}")
            print(f"Bytes per trace: {analysis['trace_total_size']:,}")
            
            if abs(analysis['size_difference']) > 1024:  # More than 1KB difference
                analysis['issues'].append(f"File size mismatch: {analysis['size_difference']:,} bytes")
        
        if analysis['issues']:
            print("\n⚠️ Structure issues:")
            for issue in analysis['issues']:
                print(f"    - {issue}")
        
        return analysis
    
    def attempt_trace_reading(self, max_attempts: int = 5) -> List[Dict]:
        """Attempt to read traces even from problematic files"""
        print(f"\n=== ATTEMPTING TO READ TRACES (MAX {max_attempts} ATTEMPTS) ===")
        
        if not self.binary_header:
            self.read_binary_header_safe()
        
        structure = self.analyze_file_structure_safe()
        
        if not structure['can_analyze_traces']:
            print("❌ Cannot read traces due to invalid header information")
            return []
        
        # Try to read traces
        header_offset = self.TEXTUAL_HEADER_SIZE + self.BINARY_HEADER_SIZE
        trace_total_size = structure['trace_total_size']
        
        successful_traces = []
        
        for i in range(min(max_attempts, structure.get('estimated_traces', 0))):
            try:
                trace_offset = header_offset + (i * trace_total_size)
                
                # Check if we have enough bytes left
                if trace_offset + trace_total_size > self.file_size:
                    print(f"⚠️ Trace {i+1}: Not enough data remaining in file")
                    break
                
                # Try to read trace header
                with open(self.filepath, 'rb') as f:
                    f.seek(trace_offset)
                    trace_header_bytes = f.read(self.TRACE_HEADER_SIZE)
                
                if len(trace_header_bytes) < self.TRACE_HEADER_SIZE:
                    print(f"⚠️ Trace {i+1}: Truncated trace header")
                    break
                
                # Parse basic trace header fields
                trace_header = {
                    'trace_seq_line': struct.unpack('>i', trace_header_bytes[0:4])[0],
                    'samples_in_trace': struct.unpack('>h', trace_header_bytes[114:116])[0],
                    'sample_interval_trace': struct.unpack('>h', trace_header_bytes[116:118])[0],
                }
                
                # Validate trace header
                if trace_header['samples_in_trace'] != self.binary_header['samples_per_trace']:
                    print(f"⚠️ Trace {i+1}: Sample count mismatch ({trace_header['samples_in_trace']} vs {self.binary_header['samples_per_trace']})")
                
                # Try to read trace data
                data_offset = trace_offset + self.TRACE_HEADER_SIZE
                bytes_to_read = structure['trace_data_size']
                
                with open(self.filepath, 'rb') as f:
                    f.seek(data_offset)
                    data_bytes = f.read(bytes_to_read)
                
                if len(data_bytes) < bytes_to_read:
                    print(f"⚠️ Trace {i+1}: Truncated trace data ({len(data_bytes)} vs {bytes_to_read} bytes)")
                    break
                
                # Create dummy trace data for invalid formats
                format_code = self.binary_header['data_sample_format']
                if self.DATA_FORMATS[format_code]['size'] == 0:
                    print(f"⚠️ Trace {i+1}: Cannot decode data due to invalid format code")
                    # Create synthetic data for analysis
                    trace_data = np.zeros(self.binary_header['samples_per_trace'])
                else:
                    # Try to interpret data (simplified)
                    samples = self.binary_header['samples_per_trace']
                    trace_data = np.frombuffer(data_bytes[:samples*4], dtype='>f4')[:samples]
                
                successful_traces.append({
                    'header': trace_header,
                    'data': trace_data,
                    'trace_number': i + 1
                })
                
                print(f"✅ Trace {i+1}: Successfully read ({len(trace_data)} samples)")
                
            except Exception as e:
                print(f"❌ Trace {i+1}: Error reading trace - {e}")
                break
        
        print(f"\nSuccessfully read {len(successful_traces)} traces")
        
        self.trace_headers = [t['header'] for t in successful_traces]
        self.traces_data = [t['data'] for t in successful_traces]
        
        return successful_traces
    
    def generate_diagnostic_report(self) -> Dict:
        """Generate comprehensive diagnostic report for problematic files"""
        print("\n" + "="*70)
        print("COMPREHENSIVE SEG-Y DIAGNOSTIC REPORT")
        print("="*70)
        
        report = {
            'file_info': {
                'filepath': str(self.filepath),
                'file_size': self.file_size,
                'exists': self.filepath.exists()
            },
            'issues': self.file_issues,
            'analysis_results': {}
        }
        
        if not self.filepath.exists():
            print("❌ File does not exist!")
            return report
        
        try:
            # Read headers
            self.read_textual_header()
            self.read_binary_header_safe()
            
            # Analyze structure
            structure = self.analyze_file_structure_safe()
            report['structure'] = structure
            
            # Try to read traces
            traces = self.attempt_trace_reading(5)
            
            # Summary
            print(f"\n{'='*50}")
            print("DIAGNOSTIC SUMMARY")
            print(f"{'='*50}")
            print(f"File: {self.filepath.name}")
            print(f"Size: {self.file_size:,} bytes ({self.file_size/1024/1024:.2f} MB)")
            
            if self.binary_header:
                print(f"SEG-Y Revision: {self.binary_header.get('segy_revision', 'Unknown')}")
                print(f"Data Format: {self.binary_header.get('data_sample_format')} - {self.DATA_FORMATS.get(self.binary_header.get('data_sample_format', 0), {}).get('name', 'Unknown')}")
                print(f"Samples per Trace: {self.binary_header.get('samples_per_trace', 0)}")
                print(f"Sample Interval: {self.binary_header.get('sample_interval', 0)} µs")
            
            if structure.get('can_analyze_traces'):
                print(f"Estimated Traces: {structure.get('estimated_traces', 0):,}")
                print(f"Successfully Read Traces: {len(traces)}")
            
            total_issues = len(self.file_issues)
            if total_issues == 0:
                print("✅ NO ISSUES DETECTED")
            else:
                print(f"⚠️  {total_issues} ISSUE(S) DETECTED:")
                for i, issue in enumerate(self.file_issues, 1):
                    print(f"  {i}. {issue}")
            
            report['summary'] = {
                'total_issues': total_issues,
                'traces_read': len(traces),
                'can_analyze': structure.get('can_analyze_traces', False)
            }
            
        except Exception as e:
            print(f"❌ ANALYSIS FAILED - Error: {str(e)}")
            report['error'] = str(e)
            import traceback
            traceback.print_exc()
        
        return report
    
    def create_summary_table(self) -> pd.DataFrame:
        """Create a summary table of findings"""
        if not self.binary_header:
            return pd.DataFrame([{"Property": "Status", "Value": "No data available"}])
        
        data = [
            {"Property": "File Status", "Value": "✅ Readable" if self.filepath.exists() else "❌ Not found"},
            {"Property": "File Size", "Value": f"{self.file_size:,} bytes"},
            {"Property": "SEG-Y Revision", "Value": self.binary_header.get('segy_revision', 'Unknown')},
            {"Property": "Data Format", "Value": f"{self.binary_header.get('data_sample_format')} - {self.DATA_FORMATS.get(self.binary_header.get('data_sample_format', 0), {}).get('name', 'Unknown')}"},
            {"Property": "Samples per Trace", "Value": self.binary_header.get('samples_per_trace', 0)},
            {"Property": "Sample Interval", "Value": f"{self.binary_header.get('sample_interval', 0)} µs"},
            {"Property": "Total Issues", "Value": len(self.file_issues)},
            {"Property": "Traces Read", "Value": len(self.traces_data)},
        ]
        
        return pd.DataFrame(data)


# ============================================================================
# ANALYSIS FOR YOUR SPECIFIC DAT_0023.SGY FILE
# ============================================================================

def analyze_dat_0023_robust():
    """
    Robust analysis specifically for your problematic DAT_0023.SGY file
    """
    print("ROBUST ANALYSIS FOR DAT_0023.SGY")
    print("="*50)
    
    # Initialize robust verifier
    verifier = RobustSEGYVerifier("DAT_0023.SGY")
    
    # Run diagnostic analysis
    report = verifier.generate_diagnostic_report()
    
    # Create summary table
    summary = verifier.create_summary_table()
    print(f"\n{'='*30}")
    print("SUMMARY TABLE")
    print(f"{'='*30}")
    print(summary.to_string(index=False))
    
    # Show recommended actions
    print(f"\n{'='*30}")
    print("RECOMMENDED ACTIONS")
    print(f"{'='*30}")
    
    if verifier.file_issues:
        print("This file has significant issues:")
        for issue in verifier.file_issues:
            print(f"  • {issue}")
        
        print("\nRecommendations:")
        if "Invalid data format code: 0" in ' '.join(verifier.file_issues):
            print("  1. Check if this is actually a SEG-Y file")
            print("  2. Try different SEG-Y reading software")
            print("  3. Contact the data provider for format specifications")
        
        if "samples_per_trace is zero" in ' '.join(verifier.file_issues):
            print("  4. File may be corrupted or incomplete")
            print("  5. Check if file transfer was successful")
        
        print("  6. Try opening with specialized geophysics software (OpendTect, etc.)")
        print("  7. Check if file uses proprietary or non-standard SEG-Y variant")
    else:
        print("✅ File appears to be valid SEG-Y format")
    
    return verifier, report

# Run the robust analysis
if __name__ == "__main__":
    verifier, report = analyze_dat_0023_robust()

ROBUST ANALYSIS FOR DAT_0023.SGY

COMPREHENSIVE SEG-Y DIAGNOSTIC REPORT
=== READING TEXTUAL HEADER (ROBUST MODE) ===
Best encoding: Windows-1252
Printable characters: 3167/3200 (99.0%)
First readable lines:
  C01 This SEG-Y format data file was generated by Prism Mobile software. (c) Radar Sy
  C02 stems, Inc. (www.radsys.lv)
The SEG-Y format is intended for seismic (not geora
  C03 dar) data.
Due to this fact, some lucks of correspondence to SEG-Y format occur
  C04 :
1. The SEG-Y EDCBIC Reel Header filled by this textual information.
2. All t
  C05 ime-based values represented in PICOseconds (not in MICROseconds!).
3. Relative

=== READING BINARY HEADER (ROBUST MODE) ===
Binary Header Analysis:
  SEG-Y Revision: 0
  Data Format: 0 - INVALID - Not defined in SEG-Y standard
  Samples per trace: 0
  Sample interval: 256 microseconds
  Trace sorting: 0 - UNKNOWN/INVALID

⚠️ Issues detected:
    - Invalid data format code: 0 - INVALID - Not defined in SEG-Y standard
    - Invalid samples 