In [15]:
using FITSIO
"""
Abstract type for all event list implementations
"""
abstract type AbstractEventList{T} end

"""
    DictMetadata

A structure containing metadata from FITS file headers.

## Fields

- `headers::Vector{Dict{String,Any}}`: A vector of dictionaries containing header information from each HDU.
"""
struct DictMetadata
    headers::Vector{Dict{String,Any}}
end

"""
    EventList{T} <: AbstractEventList{T}

A structure containing event data from a FITS file.

## Fields

- `filename::String`: Path to the source FITS file.
- `times::Vector{T}`: Vector of event times.
- `energies::Vector{T}`: Vector of event energies.
- `metadata::DictMetadata`: Metadata information extracted from the FITS file headers.
"""
struct EventList{T} <: AbstractEventList{T}
    filename::String
    times::Vector{T}
    energies::Vector{T}
    metadata::DictMetadata
end

times(ev::EventList) = ev.times
energies(ev::EventList) = ev.energies

"""
    readevents(path; T = Float64)

Read event data from a FITS file into an EventList structure.

## Arguments
- `path::String`: Path to the FITS file
- `T::Type=Float64`: Numeric type for the data

## Returns
- [`EventList`](@ref) containing the extracted data

## Notes

The function extracts `TIME` and `ENERGY` columns from any TableHDU in the FITS
file. All headers from each HDU are collected into the metadata field. It will
use the first occurrence of complete event data (both TIME and ENERGY columns)
found in the file.
"""
function readevents(path; T = Float64)
    headers = Dict{String,Any}[]
    times = T[]
    energies = T[]

    FITS(path, "r") do f
        for i = 1:length(f)  # Iterate over HDUs
            hdu = f[i]
            # Always collect headers from all extensions
            header_dict = Dict{String,Any}()
            for key in keys(read_header(hdu))
                header_dict[string(key)] = read_header(hdu)[key]
            end
            push!(headers, header_dict)

            # Check if the HDU is a table
            if isa(hdu, TableHDU)
                colnames = FITSIO.colnames(hdu)

                # Read TIME and ENERGY data if columns exist and vectors are empty
                if isempty(times) && ("TIME" in colnames)
                    times = convert(Vector{T}, read(hdu, "TIME"))
                end
                if isempty(energies) && ("ENERGY" in colnames)
                    energies = convert(Vector{T}, read(hdu, "ENERGY"))
                end

                # If we found both time and energy data, we can return
                if !isempty(times) && !isempty(energies)
                    @info "Found complete event data in extension $(i) of $(path)"
                    metadata = DictMetadata(headers)
                    return EventList{T}(path, times, energies, metadata)
                end
            end
        end
    end

    if isempty(times)
        @warn "No TIME data found in FITS file $(path). Time series analysis will not be possible."
    end
    if isempty(energies)
        @warn "No ENERGY data found in FITS file $(path). Energy spectrum analysis will not be possible."
    end

    metadata = DictMetadata(headers)
    return EventList{T}(path, times, energies, metadata)
end

Base.length(ev::AbstractEventList) = length(times(ev))
Base.size(ev::AbstractEventList) = (length(ev),)
Base.getindex(ev::EventList, i) = (ev.times[i], ev.energies[i])

function Base.show(io::IO, ev::EventList{T}) where T
    print(io, "EventList{$T}(n=$(length(ev)), file=$(ev.filename))")
end

"""
    validate(events::AbstractEventList)

Validate the event list structure.

## Returns
- `true` if valid, throws ArgumentError otherwise
"""
function validate(events::AbstractEventList)
    evt_times = times(events)
    if !issorted(evt_times)
        throw(ArgumentError("Event times must be sorted in ascending order"))
    end
    if length(evt_times) == 0
        throw(ArgumentError("Event list is empty"))
    end
    return true
end


validate

In [16]:
using Test
@testset "EventList Tests" begin
    # Test 1: Create a sample FITS file for testing
    @testset "Sample FITS file creation" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample.fits")
        f = FITS(sample_file, "w")
        write(f, Int[])  # Empty primary array
        # Create a binary table HDU with TIME and ENERGY columns
        times = Float64[1.0, 2.0, 3.0, 4.0, 5.0]
        energies = Float64[10.0, 20.0, 15.0, 25.0, 30.0]
        # Add a binary table extension
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)

        @test isfile(sample_file)

        # Test reading the sample file
        data = readevents(sample_file)
        @test data.filename == sample_file
        @test length(data.times) == 5
        @test length(data.energies) == 5
        @test eltype(data.times) == Float64
        @test eltype(data.energies) == Float64
    end

    # Test 2: Test with different data types
    @testset "Different data types" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_float32.fits")
        f = FITS(sample_file, "w")
        write(f, Int[])
        # Create data
        times = Float64[1.0, 2.0, 3.0]
        energies = Float64[10.0, 20.0, 30.0]
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)
        # Test with Float32
        data_f32 = readevents(sample_file, T = Float32)
        @test eltype(data_f32.times) == Float32
        @test eltype(data_f32.energies) == Float32
        # Test with Int64
        data_i64 = readevents(sample_file, T = Int64)
        @test eltype(data_i64.times) == Int64
        @test eltype(data_i64.energies) == Int64
    end

    # Test 3: Missing Columns
    @testset "Missing columns" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_no_energy.fits")
        # Create a sample FITS file with only TIME column
        f = FITS(sample_file, "w")
        write(f, Int[])
        times = Float64[1.0, 2.0, 3.0]
        table = Dict{String,Array}()
        table["TIME"] = times
        write(f, table)
        close(f)
        local data
        @test_logs (:warn, "No ENERGY data found in FITS file $(sample_file). Energy spectrum analysis will not be possible.") begin
            data = readevents(sample_file)
        end
        @test length(data.times) == 3
        @test length(data.energies) == 0

        # Create a file with only ENERGY column
        sample_file2 = joinpath(test_dir, "sample_no_time.fits")
        f = FITS(sample_file2, "w")
        write(f, Int[])  # Empty primary array
        energies = Float64[10.0, 20.0, 30.0]
        table = Dict{String,Array}()
        table["ENERGY"] = energies
        write(f, table)
        close(f)
        local data2
        @test_logs (:warn, "No TIME data found in FITS file $(sample_file2). Time series analysis will not be possible.") begin
            data2 = readevents(sample_file2)
        end
        @test length(data2.times) == 0  # No TIME column
        @test length(data2.energies) == 3
    end

    # Test 4: Multiple HDUs
    @testset "Multiple HDUs" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_multi_hdu.fits")
        # Create a sample FITS file with multiple HDUs
        f = FITS(sample_file, "w")
        write(f, Int[])
        times1 = Float64[1.0, 2.0, 3.0]
        energies1 = Float64[10.0, 20.0, 30.0]
        table1 = Dict{String,Array}()
        table1["TIME"] = times1
        table1["ENERGY"] = energies1
        write(f, table1)
        # Second table HDU (with OTHER column)
        other_data = Float64[100.0, 200.0, 300.0]
        table2 = Dict{String,Array}()
        table2["OTHER"] = other_data
        write(f, table2)
        # Third table HDU (with TIME only)
        times3 = Float64[4.0, 5.0, 6.0]
        table3 = Dict{String,Array}()
        table3["TIME"] = times3
        write(f, table3)
        close(f)
        
        # Diagnostic printing
        data = readevents(sample_file)
        @test length(data.metadata.headers) >= 2  # At least primary and first extension
        @test length(data.metadata.headers) <= 4  # No more than primary + 3 extensions
        # Should read the first HDU with both TIME and ENERGY
        @test length(data.times) == 3
        @test length(data.energies) == 3
    end

    # Test 5: Test with monol_testA.evt
    @testset "test monol_testA.evt" begin
        test_filepath = joinpath("data", "monol_testA.evt")
        if isfile(test_filepath)
            data = readevents(test_filepath)
            @test data.filename == test_filepath
            @test length(data.metadata.headers) > 0
            @test !isempty(data.times)
        else
            @info "Test file '$(test_filepath)' not found. Skipping this test."
        end
    end

    # Test 6: Error handling
    @testset "Error handling" begin
        # Test with non-existent file - using a more generic approach
        @test_throws Exception readevents("non_existent_file.fits")

        # Test with invalid FITS file
        invalid_file = tempname()
        open(invalid_file, "w") do io
            write(io, "This is not a FITS file")
        end
        @test_throws Exception readevents(invalid_file)
    end

    # Test 7: Struct Type Validation
    @testset "EventList Struct Type Checks" begin
        # Create a sample FITS file for type testing
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_types.fits")
        
        # Prepare test data
        f = FITS(sample_file, "w")
        write(f, Int[])  # Empty primary array
        
        # Create test data
        times = Float64[1.0, 2.0, 3.0, 4.0, 5.0]
        energies = Float64[10.0, 20.0, 15.0, 25.0, 30.0]
        
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)

        # Test type-specific instantiations
        @testset "Type Parametric Struct Tests" begin
            # Test Float64 EventList
            data_f64 = readevents(sample_file, T = Float64)
            @test isa(data_f64, EventList{Float64})
            @test typeof(data_f64) == EventList{Float64}
            
            # Test Float32 EventList
            data_f32 = readevents(sample_file, T = Float32)
            @test isa(data_f32, EventList{Float32})
            @test typeof(data_f32) == EventList{Float32}
            
            # Test Int64 EventList
            data_i64 = readevents(sample_file, T = Int64)
            @test isa(data_i64, EventList{Int64})
            @test typeof(data_i64) == EventList{Int64}
        end

        # Test struct field types
        @testset "Struct Field Type Checks" begin
            data = readevents(sample_file)
            
            # Check filename type
            @test isa(data.filename, String)
            
            # Check times and energies vector types
            @test isa(data.times, Vector{Float64})
            @test isa(data.energies, Vector{Float64})
            
            # Check metadata type
            @test isa(data.metadata, DictMetadata)
            @test isa(data.metadata.headers, Vector{Dict{String,Any}})
        end
    end

    # Test 8: Validation Function
    @testset "Validation Tests" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_validate.fits")
        
        # Prepare test data
        f = FITS(sample_file, "w")
        write(f, Int[])
        times = Float64[1.0, 2.0, 3.0, 4.0, 5.0]
        energies = Float64[10.0, 20.0, 15.0, 25.0, 30.0]
        
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)

        data = readevents(sample_file)
        
        # Test successful validation
        @test validate(data) == true

        # Test with unsorted times
        unsorted_times = Float64[3.0, 1.0, 2.0]
        unsorted_energies = Float64[30.0, 10.0, 20.0]
        unsorted_data = EventList{Float64}(sample_file, unsorted_times, unsorted_energies, 
                                           DictMetadata([Dict{String,Any}()]))
        @test_throws ArgumentError validate(unsorted_data)

        # Test with empty event list
        empty_data = EventList{Float64}(sample_file, Float64[], Float64[], 
                                        DictMetadata([Dict{String,Any}()]))
        @test_throws ArgumentError validate(empty_data)
    end
end

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_TyeKCZ\sample.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_jn6ERa\sample_float32.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_jn6ERa\sample_float32.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_j9ejso\sample_multi_hdu.fits
[33m[1m└ [22m[39m[90m@ Main In[15]:102[39m


[0m[1mTest Summary:   | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
EventList Tests | [32m  39  [39m[36m   39  [39m[0m1.2s


[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_79JB99\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_79JB99\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_79JB99\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_79JB99\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_jl25kQ\sample_validate.fits


Test.DefaultTestSet("EventList Tests", Any[Test.DefaultTestSet("Sample FITS file creation", Any[], 6, false, false, true, 1.743334756737e9, 1.74333475718e9, false, "In[16]"), Test.DefaultTestSet("Different data types", Any[], 4, false, false, true, 1.74333475718e9, 1.743334757229e9, false, "In[16]"), Test.DefaultTestSet("Missing columns", Any[], 6, false, false, true, 1.743334757229e9, 1.743334757517e9, false, "In[16]"), Test.DefaultTestSet("Multiple HDUs", Any[], 4, false, false, true, 1.743334757517e9, 1.743334757538e9, false, "In[16]"), Test.DefaultTestSet("test monol_testA.evt", Any[], 3, false, false, true, 1.743334757538e9, 1.743334757544e9, false, "In[16]"), Test.DefaultTestSet("Error handling", Any[], 2, false, false, true, 1.743334757544e9, 1.743334757815e9, false, "In[16]"), Test.DefaultTestSet("EventList Struct Type Checks", Any[Test.DefaultTestSet("Type Parametric Struct Tests", Any[], 6, false, false, true, 1.743334757817e9, 1.743334757836e9, false, "In[16]"), Test.Default

In [17]:
"""
    LightCurve{T}

A structure containing lightcurve data from a FITS file.

## Fields

- `timebins::Vector{T}`: Vector of time bins.
- `counts::Vector{Int}`: Vector of event counts in each time bin.
- `count_error::Vector{T}`: Vector of errors on the counts in each time bin.
- `err_method::Symbol`: Method used for computing the errors.
"""
struct LightCurve{T}
    timebins::Vector{T}
    counts::Vector{Int}
    count_error::Vector{T}
    err_method::Symbol
end

"""
   create_lightcurve(eventlist::EventList{T}, binsize::Real; err_method::Symbol=:poisson) -> LightCurve{T}

Create a lightcurve from an event list.

## Arguments
- `eventlist::EventList{T}`: Event list containing the event data.
- `binsize::Real`: Size of the time bins.

## Keyword Arguments
- `err_method::Symbol=:poisson`: Method for computing the errors.

## Returns
- [`LightCurve`](@ref) containing the binned data.
"""
function create_lightcurve(eventlist::EventList{T}, binsize::Real; err_method::Symbol=:poisson) where T
    # Validate error method first
    if err_method != :poisson
        throw(ArgumentError("Unsupported error computation method: $err_method"))
    end

    # Convert binsize to the same type as the event times
    binsize_t = convert(T, binsize)
    
    times = eventlist.times
    min_time = minimum(times)
    max_time = maximum(times)
    
    # Calculate number of bins needed
    time_range = max_time - min_time
    nbins = floor(Int, time_range / binsize_t)
    
    # If there's any remainder or if max_time falls exactly on a bin edge, add another bin
    if !isapprox(mod(time_range, binsize_t), zero(T)) || isapprox(mod(max_time - min_time, binsize_t), zero(T))
        nbins += 1
    end
    
    # Create arrays
    counts = zeros(Int, nbins)
    count_error = Vector{T}(undef, nbins)
    timebins = Vector{T}(undef, nbins + 1)
    
    # Create bin edges
    for i in 0:nbins
        timebins[i + 1] = min_time + (i * binsize_t)
    end
    
    # Bin the events using count() for better performance
    for i in 1:nbins
        bin_start = timebins[i]
        bin_end = timebins[i + 1]
        counts[i] = count(x -> bin_start <= x < bin_end, times)
    end
    
    # Calculate Poisson errors
    for i in 1:nbins
        count_error[i] = convert(T, sqrt(counts[i]))
    end
    
    return LightCurve{T}(timebins, counts, count_error, err_method)
end
# Show method for pretty printing
function Base.show(io::IO, lc::LightCurve{T}) where T
    nbins = length(lc.counts)
    binsize = lc.timebins[2] - lc.timebins[1]
    print(io, "LightCurve{$T}(n=$nbins, binsize=$binsize)")
end

In [18]:
using Test
using FITSIO

@testset "LightCurve Tests" begin
    # Test 1: Create a lightcurve from a sample EventList
    @testset "Lightcurve creation" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample.fits")
        f = FITS(sample_file, "w")
        write(f, Int[])  # Empty primary array
        
        # Create a binary table HDU with TIME and ENERGY columns
        times = Float64[1.0, 2.0, 3.0, 4.0, 5.0]
        energies = Float64[10.0, 20.0, 15.0, 25.0, 30.0]
        
        # Add a binary table extension
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)
        
        @test isfile(sample_file)
        
        # Test reading the sample file
        data = readevents(sample_file)
        @test data.filename == sample_file
        @test length(data.times) == 5
        @test length(data.energies) == 5
        @test eltype(data.times) == Float64
        @test eltype(data.energies) == Float64
        
        # Create a lightcurve from the event data
        lc =create_lightcurve(data, 1.0)
        
        # Basic size checks
        @test length(lc.timebins) == 6  # 5 bins + 1 endpoint
        @test length(lc.counts) == 5
        
        # Time bin checks
        @test lc.timebins[1] ≈ 1.0
        @test lc.timebins[end] ≈ 6.0
        
        # Count checks - each bin should have 1 count
        @test all(lc.counts .== 1)
        
        # Error checks - since each bin has 1 count, errors should be sqrt(1)
        @test all(lc.count_error .≈ fill(sqrt(1.0), 5))
        
        # Method check
        @test lc.err_method == :poisson
    end

   # Test 2: Test lightcurve with different bin sizes
    @testset "Different bin sizes" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_bin_sizes.fits")
        f = FITS(sample_file, "w")
        write(f, Int[])
        # Create data
        times = Float64[1.0, 2.0, 3.0, 3.5, 4.0, 5.0]
        energies = Float64[10.0, 20.0, 30.0, 25.0, 15.0, 10.0]
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)
        data = readevents(sample_file)
    
        # Test with bin size of 2.0
        lc_2 =create_lightcurve(data, 2.0)
        @test length(lc_2.timebins) == 4  # 3 bins + 1 endpoint
        @test length(lc_2.counts) == 3
        @test lc_2.timebins[1] == 1.0
        @test lc_2.timebins[end] == 7.0  # Updated expected value
        @test lc_2.counts == [2, 3, 1]  # Updated expected value
        @test lc_2.count_error == sqrt.([2, 3, 1])  # Updated expected value
    
        # Test with bin size of 1.0
        lc_1 =create_lightcurve(data, 1.0)
        @test length(lc_1.timebins) == 6  # 5 bins + 1 endpoint
        @test length(lc_1.counts) == 5
        @test lc_1.timebins[1] == 1.0
        @test lc_1.timebins[end] == 6.0
        @test lc_1.counts == [1, 1, 2, 1, 1]  # Updated expected value
        @test lc_1.count_error == sqrt.([1, 1, 2, 1, 1])  # Updated expected value
    end
    
    # Test 3: Test lightcurve error computation methods
    @testset "Error computation methods" begin
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_error_methods.fits")
        f = FITS(sample_file, "w")
        write(f, Int[])
        # Create data
        times = Float64[1.0, 2.0, 2.5, 3.0, 4.5, 5.0]
        energies = Float64[10.0, 20.0, 20.0, 25.0, 15.0, 10.0]
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)
        data = readevents(sample_file)
    
        # Test with default error method (Poisson)
        lc =create_lightcurve(data, 1.0)
        @test lc.count_error == sqrt.([1, 2, 1, 1, 1])  # Updated expected value
        
        # Add more error computation methods here if needed
    end

    # Test 4: Struct Type Validation
    @testset "LightCurve Struct Type Checks" begin
        # Create a sample FITS file for type testing
        test_dir = mktempdir()
        sample_file = joinpath(test_dir, "sample_types.fits")
        
        # Prepare test data
        f = FITS(sample_file, "w")
        write(f, Int[])  # Empty primary array
        
        # Create test data
        times = Float64[1.0, 2.0, 3.0, 4.0, 5.0]
        energies = Float64[10.0, 20.0, 15.0, 25.0, 30.0]
        
        table = Dict{String,Array}()
        table["TIME"] = times
        table["ENERGY"] = energies
        write(f, table)
        close(f)

        # Test type-specific instantiations
        @testset "Type Parametric Struct Tests" begin
            # Test Float64 EventList
            data_f64 = readevents(sample_file, T = Float64)
            @test isa(data_f64, EventList{Float64})
            @test typeof(data_f64) == EventList{Float64}
            
            # Test lightcurve creation
            lc_f64 =create_lightcurve(data_f64, 1.0)
            @test isa(lc_f64, LightCurve{Float64})
            @test typeof(lc_f64) == LightCurve{Float64}

            # Test Float32 EventList
            data_f32 = readevents(sample_file, T = Float32)
            @test isa(data_f32, EventList{Float32})
            @test typeof(data_f32) == EventList{Float32}

            # Test lightcurve creation
            lc_f32 =create_lightcurve(data_f32, 1.0)
            @test isa(lc_f32, LightCurve{Float32})
            @test typeof(lc_f32) == LightCurve{Float32}

            # Test Int64 EventList
            data_i64 = readevents(sample_file, T = Int64)
            @test isa(data_i64, EventList{Int64})
            @test typeof(data_i64) == EventList{Int64}

            # Test lightcurve creation
            lc_i64 =create_lightcurve(data_i64, 1.0)
            @test isa(lc_i64, LightCurve{Int64})
            @test typeof(lc_i64) == LightCurve{Int64}
        end

        # Test struct field types
        @testset "Struct Field Type Checks" begin
            data = readevents(sample_file)
            
            # Check filename type
            @test isa(data.filename, String)
            
            # Check times and energies vector types
            @test isa(data.times, Vector{Float64})
            @test isa(data.energies, Vector{Float64})
            
            # Check metadata type
            @test isa(data.metadata, DictMetadata)
            @test isa(data.metadata.headers, Vector{Dict{String,Any}})

            # Test lightcurve creation
            lc =create_lightcurve(data, 1.0)

            # Check timebins type
            @test isa(lc.timebins, Vector{Float64})

            # Check counts type
            @test isa(lc.counts, Vector{Int})

            # Check count_error type
            @test isa(lc.count_error, Vector{Float64})

            # Check err_method type
            @test isa(lc.err_method, Symbol)
        end
    end
end

[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_z8pN68\sample.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_2V7d6T\sample_bin_sizes.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_nb0EeP\sample_error_methods.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_sPiuVV\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_sPiuVV\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extension 2 of C:\Users\asus4\AppData\Local\Temp\jl_sPiuVV\sample_types.fits
[36m[1m[ [22m[39m[36m[1mInfo: [22m[39mFound complete event data in extensi

[0m[1mTest Summary:    | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
LightCurve Tests | [32m  47  [39m[36m   47  [39m[0m0.8s


Test.DefaultTestSet("LightCurve Tests", Any[Test.DefaultTestSet("Lightcurve creation", Any[], 13, false, false, true, 1.743334758074e9, 1.743334758331e9, false, "In[18]"), Test.DefaultTestSet("Different bin sizes", Any[], 12, false, false, true, 1.743334758331e9, 1.743334758352e9, false, "In[18]"), Test.DefaultTestSet("Error computation methods", Any[], 1, false, false, true, 1.743334758352e9, 1.74333475837e9, false, "In[18]"), Test.DefaultTestSet("LightCurve Struct Type Checks", Any[Test.DefaultTestSet("Type Parametric Struct Tests", Any[], 12, false, false, true, 1.743334758376e9, 1.743334758854e9, false, "In[18]"), Test.DefaultTestSet("Struct Field Type Checks", Any[], 9, false, false, true, 1.743334758854e9, 1.743334758856e9, false, "In[18]")], 0, false, false, true, 1.74333475837e9, 1.743334758856e9, false, "In[18]")], 0, false, false, true, 1.743334758074e9, 1.743334758856e9, false, "In[18]")

In [19]:
using ResumableFunctions, StatsBase, Statistics, DataFrames
using FFTW, NaNMath, FITSIO, Intervals
using ProgressBars: tqdm as show_progress

include("fourier.jl")
export positive_fft_bins
export poisson_level
export normalize_abs
export normalize_frac
export normalize_leahy_from_variance
export normalize_periodograms
export bias_term
export raw_coherence
export estimate_intrinsic_coherence
export error_on_averaged_cross_spectrum
export get_average_ctrate
export get_flux_iterable_from_segments
export avg_pds_from_events
export avg_cs_from_events


In [23]:
"""
Abstract type representing a power spectrum, which characterizes the distribution 
of power across different frequencies in a signal.

Subtypes include:
- PowerSpectrum{T}: Represents a power spectrum for a single signal segment
- AveragedPowerspectrum{T}: Represents a power spectrum averaged over multiple segments
"""
abstract type AbstractPowerSpectrum{T} end

"""
Represents a power spectrum computed from a single signal segment.

## Fields
- `freqs::Vector{T}`: Vector of frequency values 
- `power::Vector{T}`: Corresponding power values for each frequency
- `power_errors::Vector{T}`: Uncertainty estimates for power at each frequency
- `dt::T`: Time sampling interval
- `n::Int`: Number of data points in the original segment
"""
struct PowerSpectrum{T} <: AbstractPowerSpectrum{T}
    freqs::Vector{T}
    power::Vector{T}
    power_errors::Vector{T}
    dt::T
    n::Int
end

"""
Represents a power spectrum computed by averaging multiple signal segments.

## Fields
- `freqs::Vector{T}`: Vector of frequency values
- `power::Vector{T}`: Average power values for each frequency
- `power_errors::Vector{T}`: Standard error of power estimates
- `m::Int`: Number of segments used in averaging
- `n::Int`: Number of data points in each segment
- `dt::T`: Time sampling interval
"""
struct AveragedPowerspectrum{T} <: AbstractPowerSpectrum{T}
    freqs::Vector{T}
    power::Vector{T} 
    power_errors::Vector{T}
    m::Int
    n::Int
    dt::T
end

"""
    powerspectrum(events::EventList{T}; dt::Real=0.001,
                 segment_size::Int=length(events.times) ÷ 32, 
                 norm::Symbol=:frac) where T -> AveragedPowerspectrum{T}

Compute the power spectrum for a list of events using segmented averaging.

## Arguments
- `events::EventList{T}`: Input event list containing times and optional energies
- `dt::Real=0.001`: Time sampling interval (default: 0.001)
- `segment_size::Int=length(events.times) ÷ 32`: Size of each segment for FFT (default: 1/32 of total data)
- `norm::Symbol=:frac`: Normalization method for power spectrum
    - `:leahy`: Multiplies power by 2
    - `:frac`: Normalizes power by mean power
    - `:abs`: No normalization

## Returns
- `AveragedPowerspectrum{T}`: Power spectrum computed from segmented data

## Throws
- `ArgumentError` if segment size is invalid or insufficient data points

## Examples
```julia
events = EventList(times, energies, metadata)
ps = powerspectrum(events, dt=0.01, segment_size=1024, norm=:leahy)
```
"""
function powerspectrum(events::EventList{T}; dt::Real = 0.001,
                      segment_size::Int = length(events.times) ÷ 32, 
                      norm::Symbol = :frac) where T
    if segment_size <= 0
        throw(ArgumentError("Segment size must be positive"))
    end
    if length(events.times) < segment_size
        throw(ArgumentError("Not enough data points"))
    end
    return _powerspectrum_averaged(events, convert(T, dt), segment_size, norm)
end

"""
    _powerspectrum_averaged(events::EventList{T}, dt::T, 
                          segment_size::Int, norm::Symbol) where T -> AveragedPowerspectrum{T}

Compute averaged power spectrum from event list segments.

## Arguments
- `events::EventList{T}`: Input event list
- `dt::T`: Time sampling interval
- `segment_size::Int`: Number of points in each segment
- `norm::Symbol`: Normalization method for power spectrum

## Returns
- `AveragedPowerspectrum{T}`: Power spectrum averaged over multiple segments

## Notes
- Prioritizes using event energies if available, otherwise uses event times
- Computes FFT for each segment and averages power
- Calculates frequency-dependent power with specified normalization
"""
function _powerspectrum_averaged(events::EventList{T}, 
                               dt::T, 
                               segment_size::Int, 
                               norm::Symbol) where T
    signal = events.times
    
    if !all(iszero, events.energies)
        signal = events.energies
    end
    
    n_segments = length(signal) ÷ segment_size
    segments = [signal[(i-1)*segment_size+1 : i*segment_size] for i in 1:n_segments]
    
    # Apply Hanning window if needed
    windowed_segments = [segment .* hanning(segment_size) for segment in segments]
    
    ffts = [fft(segment) for segment in windowed_segments]
    
    nyquist_index = segment_size ÷ 2 + 1
    powers = [abs2.(fft[1:nyquist_index]) for fft in ffts]
    
    avg_power = mean(powers)
    
    freqs = FFTW.rfftfreq(segment_size, 1/dt)
    
    if length(freqs) != length(avg_power)
        throw(ArgumentError("Frequency and power arrays must match"))
    end
    
    normalized_power = _normalize_power(avg_power, norm)
    power_errors = std(powers) ./ sqrt(length(segments))
    
    positive_indices = 2:length(freqs)
    
    return AveragedPowerspectrum{T}(
        convert(Vector{T}, freqs[positive_indices]),
        convert(Vector{T}, normalized_power[positive_indices]),
        convert(Vector{T}, power_errors[positive_indices]),
        length(segments),
        segment_size,
        dt
    )
end

"""
    hanning(N::Int) -> Vector{Float64}

Generate a Hanning window function for signal tapering.

## Arguments
- `N::Int`: Length of the window

## Returns
- `Vector{Float64}`: Hanning window weights

## Examples
```julia
window = hanning(1024)  # Creates a Hanning window of length 1024
```
"""
function hanning(N::Int)
    return [0.5 * (1 - cos(2π * n / (N-1))) for n in 0:N-1]
end

"""
    _normalize_power(power::AbstractVector, norm::Symbol) -> Vector{Float64}

Normalize power spectrum based on specified method.

## Arguments
- `power`: Input power values
- `norm`: Normalization method (`:leahy`, `:frac`, or `:abs`)

## Returns
- `Vector{Float64}`: Normalized power values

## Throws
- `ArgumentError` for unknown normalization method
"""
function _normalize_power(power::AbstractVector, norm::Symbol)
    if norm == :leahy
        return power .* 2
    elseif norm == :frac
        return power ./ mean(power)
    elseif norm == :abs
        return power
    else
        throw(ArgumentError("Unknown normalization method: $norm"))
    end
end

# Interface functions
freqs(ps::AbstractPowerSpectrum) = ps.freqs
power(ps::AbstractPowerSpectrum) = ps.power
errors(ps::AbstractPowerSpectrum) = ps.power_errors

# Base methods
Base.length(ps::AbstractPowerSpectrum) = length(ps.freqs)
Base.size(ps::AbstractPowerSpectrum) = (length(ps),)
Base.getindex(ps::AbstractPowerSpectrum, i) = (ps.freqs[i], ps.power[i], ps.power_errors[i])

# Show methods
function Base.show(io::IO, ps::PowerSpectrum{T}) where T
    print(io, "PowerSpectrum{$T}(n=$(ps.n), df=$(ps.freqs[2]-ps.freqs[1]))")
end

function Base.show(io::IO, ps::AveragedPowerspectrum{T}) where T
    print(io, "AveragedPowerspectrum{$T}(n=$(ps.n), m=$(ps.m), df=$(ps.freqs[2]-ps.freqs[1]))")
end

"""
    validate(ps::AbstractPowerSpectrum) -> Bool

Validate the power spectrum structure.

## Returns
- `true` if valid, throws ArgumentError otherwise

## Throws
- `ArgumentError` if any validation checks fail
"""
function validate(ps::AbstractPowerSpectrum)
    if length(ps.freqs) != length(ps.power)
        throw(ArgumentError("Frequency and power arrays must have the same length"))
    end
    if length(ps.power) != length(ps.power_errors)
        throw(ArgumentError("Power and error arrays must have the same length"))
    end
    if any(x -> x < 0, ps.power)
        throw(ArgumentError("Power values must be non-negative"))
    end
    if any(x -> x < 0, ps.power_errors)
        throw(ArgumentError("Error values must be non-negative"))
    end
    return true
end

validate

In [24]:
@testset "Power Spectrum Analysis" begin
    @testset "Basic Spectrum Creation" begin
        dt = 0.001
        T = 100.0
        t = 0:dt:T
        freq = 2.0
        
        signal = 2.0 * sin.(2π * freq * t)
        
        # Create empty metadata
        metadata = DictMetadata([Dict{String,Any}()])
        
        events = EventList{Float64}(
            "test_basic.fits",
            collect(Float64, t),
            Float64.(signal),
            metadata
        )
    
        ps = powerspectrum(events, dt=dt)
        @test isa(ps, AveragedPowerspectrum)
        @test length(ps.freqs) == length(ps.power)
        @test length(ps.power) == length(ps.power_errors)
        
        freqs = ps.freqs
        powers = ps.power
        peak_idx = argmax(powers)
        detected_freq = ps.freqs[peak_idx]
        
        @test isapprox(detected_freq, freq, rtol=0.1, atol=1/(T*dt))
    end

    @testset "Multiple Frequency Detection" begin
        n_points = 2^17
        dt = 0.001
        t = range(0, step=dt, length=n_points)
        T = t[end]
        f1, f2 = 1.0, 2.5
        
        signal = 50.0 * (sin.(2π * f1 * t) + sin.(2π * f2 * t))
        
        # Create empty metadata
        metadata = DictMetadata([Dict{String,Any}()])
        
        events = EventList(
            "test_multi.fits",
            collect(Float64, t),
            Float64.(signal),
            metadata
        )

        ps = powerspectrum(events, dt=dt)
        
        min_freq_idx = max(2, floor(Int, 0.1/(T*dt)))
        freqs = ps.freqs[min_freq_idx:end]
        powers = ps.power[min_freq_idx:end]
        
        peak_indices = Int[]
        for i in 2:(length(powers)-1)
            if powers[i] > powers[i-1] && powers[i] > powers[i+1]
                push!(peak_indices, i)
            end
        end
        
        if !isempty(peak_indices)
            peak_freqs = freqs[peak_indices]
            peak_powers = powers[peak_indices]
            
            sorted_idx = sortperm(peak_powers, rev=true)
            peak_freqs = peak_freqs[sorted_idx]
            
            df = 1/T 
            found_f1 = any(f -> abs(f - f1) ≤ 8df, peak_freqs)
            found_f2 = any(f -> abs(f - f2) ≤ 8df, peak_freqs)
            
            @test found_f1
            @test found_f2
        else
            @test false
        end
    end

    @testset "Averaged Power Spectrum" begin
        n_points = 2^16
        dt = 0.001
        t = range(0, step=dt, length=n_points)
        freq = 2.0
        
        signal = 20.0 * sin.(2π * freq * t)
        
        # Create empty metadata
        metadata = DictMetadata([Dict{String,Any}()])
        
        events = EventList(
            "test_avg.fits",
            collect(Float64, t),
            Float64.(signal),
            metadata
        )
    
        segment_size = 2048
        
        aps = powerspectrum(events, dt=dt, segment_size=segment_size)
        
        @test isa(aps, AveragedPowerspectrum)
        @test length(aps.freqs) == length(aps.power)
        @test length(aps.power) == length(aps.power_errors)
        @test aps.m > 0
        
        powers = aps.power
        freqs = aps.freqs
        
        peak_idx = argmax(powers)
        detected_freq = freqs[peak_idx]
        
        df = 1/(segment_size * dt)
        
        @test abs(detected_freq - freq) <= 2*df
    end

    @testset "Normalization Methods" begin
        n_points = 2^12
        dt = 0.001
        t = range(0, step=dt, length=n_points)
        signal = sin.(2π * 2.0 * t)
        
        # Create empty metadata
        metadata = DictMetadata([Dict{String,Any}()])
        
        events = EventList(
            "test_norm.fits",
            collect(Float64, t),
            Float64.(signal),
            metadata
        )
        
        for norm in [:leahy, :frac, :abs]
            ps = powerspectrum(events, dt=dt, norm=norm)
            @test all(ps.power .>= 0)
            @test all(ps.power_errors .>= 0)
            @test length(ps.freqs) == length(ps.power)
        end
    end
end

[0m[1mTest Summary:           | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Power Spectrum Analysis | [32m  20  [39m[36m   20  [39m[0m1.1s


Test.DefaultTestSet("Power Spectrum Analysis", Any[Test.DefaultTestSet("Basic Spectrum Creation", Any[], 4, false, false, true, 1.743335226762e9, 1.743335227725e9, false, "In[24]"), Test.DefaultTestSet("Multiple Frequency Detection", Any[], 2, false, false, true, 1.743335227725e9, 1.743335227809e9, false, "In[24]"), Test.DefaultTestSet("Averaged Power Spectrum", Any[], 5, false, false, true, 1.743335227809e9, 1.743335227842e9, false, "In[24]"), Test.DefaultTestSet("Normalization Methods", Any[], 9, false, false, true, 1.743335227842e9, 1.743335227866e9, false, "In[24]")], 0, false, false, true, 1.743335226762e9, 1.743335227866e9, false, "In[24]")