In [2]:
import nifgen
import time
import numpy as np
import os
import math

In [3]:
# Outputs a sine wave
with nifgen.Session("Dev1") as session:
    session.output_mode = nifgen.OutputMode.FUNC
    session.configure_standard_waveform(waveform = nifgen.Waveform.SQUARE, amplitude=1.0, frequency = 7911.16)
    with session.initiate():
        time.sleep(10)

In [4]:
# Outputs a sine wave
with nifgen.Session("Dev1") as session:
    session.output_mode = nifgen.OutputMode.FUNC
    session.configure_standard_waveform(waveform = nifgen.Waveform.SQUARE, amplitude=1.0, frequency = 7911.15)
    session.start_trigger_type = nifgen.StartTriggerType.DIGITAL_EDGE
    session.digital_edge_start_trigger_edge = nifgen.StartTriggerDigitalEdgeEdge.RISING
    session.digital_edge_start_trigger_source = "/Dev1/PFI1"
    with session.initiate():
        time.sleep(10)

In [7]:
len(waveform)

6320

In [5]:
import nifgen
import numpy as np
import time

# TTL aligned wave
# Settings
resource_name = "Dev1"
sample_rate = 30_000_000  # 10 MS/s
waveform_freq = 7911.24    # 7911.3 Hz square wave
duration = 1 / waveform_freq  # Full period = 126.4 µs
num_samples = int(sample_rate * duration)

# Timing
t = np.linspace(0, duration, num_samples, endpoint=False)

# Step 1: Base square wave
square_wave = np.sign(np.sin(2 * np.pi * waveform_freq * t))

# Step 2: Identify high portion (the +1 region)
high_indices = np.where(square_wave > 0)[0]

# Step 3: Create high-frequency pulses (e.g., 1 MHz) to insert into the high part
pulse_freq = 1_000_000  # 1 MHz
pulse_wave = 0.5 * np.sign(np.cos(2 * np.pi * pulse_freq * t[high_indices]))

# Step 4: Replace high part of square wave with high-freq pulses
waveform = square_wave.copy()
waveform[high_indices] = pulse_wave

# Optional: scale from -1/1 to 0/1 if needed
# waveform = (waveform + 1) / 2

waveform = waveform.tolist()


# Initialize session
with nifgen.Session(resource_name) as session:
    # Set to ARB mode
    session.output_mode = nifgen.OutputMode.ARB
    session.arb_sample_rate = sample_rate

    # Write waveform to device memory
    wf_handle = session.create_waveform(waveform)

    # Configure trigger
    session.start_trigger_type = nifgen.StartTriggerType.DIGITAL_EDGE
    session.digital_edge_start_trigger_edge = nifgen.StartTriggerDigitalEdgeEdge.RISING
    session.trigger_mode = nifgen.TriggerMode.STEPPED
    session.digital_edge_start_trigger_source = "/Dev1/PFI1"

    # Configure the arbitrary waveform to be played
    # The gain and offset depend on your desired output voltage range.
    # For now, we'll assume defaults or 1.0 gain, 0.0 offset.
    session.configure_arb_waveform(wf_handle, gain=1.0, offset=0.0)

    # Set the number of times the waveform should repeat *per trigger*
    # This is the property that controls the "finite" aspect you mentioned.

    # Enable retriggering by setting the repetition mode to continuous
    # and using the start trigger to gate each repetition.
    # The combination of start_trigger_type and waveform_repetitions = 1
    # is what enables finite retriggering of a single waveform.
    # Many NI FGen devices handle retriggering implicitly when a start trigger
    # is configured and the output mode is ARB.

    session.initiate()

    print("Armed. A new waveform will play on each rising TTL pulse at PFI1.")
    time.sleep(5)  # Keep session alive for 10 seconds

Armed. A new waveform will play on each rising TTL pulse at PFI1.


In [8]:
# Chunking waveform for streaming

# Raw waveform
waveform = r"C:\Users\max\Desktop\NanoStride\waveform_output_smaller.bin"

# Determining allocation size
def allocation_size(waveform):
    total_size_bytes = os.path.getsize(waveform)
    max_chunk_size = 10 * 1024 * 1024 # 200MB

    valid_chunks = set()
    for i in range(1, int(math.isqrt(total_size_bytes))+1):
        if total_size_bytes % i == 0:
            complement = total_size_bytes // i
            if i <= max_chunk_size:
                valid_chunks.add(i)
            if complement <= max_chunk_size:
                valid_chunks.add(complement)
    return(max(valid_chunks) if valid_chunks else None)

# Setting waveform handle and waveform size
waveform_size = allocation_size(r"C:\Users\max\Desktop\NanoStride\waveform_output_smaller.bin")


In [10]:
# Loading first chunk for test
first_chunk = open(waveform, "rb").read(allocation_size(waveform))


In [11]:
len(first_chunk)

9536000

In [21]:
with nifgen.Session("Dev1") as session:
    session.clear_arb_memory()

In [6]:
# Testing abitrary waveform

#waveform = np.fromfile(r"C:\Users\max\Desktop\NanoStride\waveform_output.bin", dtype=np.float64)
#waveform_handle = session.create_waveform_from_file_f64(r'C:\Users\max\Desktop\NanoStride\waveform_output.bin', byte_order=nifgen.ByteOrder.LITTLE)


with nifgen.Session("Dev1") as session:
    session.output_mode = nifgen.OutputMode.ARB
    session.arb_sample_rate = 25e6

    num_samples = 100
    waveform = np.sin(np.linspace(0, 2 * np.pi, num_samples)).tolist()
    waveform_handle = session.create_waveform(waveform)
    #waveform_handle = session.streaming_waveform_handle()
    #session.channels["0"].arb_waveform = waveform_handle

    with session.initiate():
        time.sleep(5)

In [19]:
def stream_waveform(waveform_path, chunk_size_bytes):
    with open(waveform_path, "rb") as f:
        first_bytes = f.read(chunk_size_bytes)

    # Convert raw bytes to float64, then to float32
    first_data = np.frombuffer(first_bytes, dtype='<f8').astype(np.float32)

    with nifgen.Session("Dev1") as session:
        session.output_mode = nifgen.OutputMode.SCRIPT
        session.arb_sample_rate = 25e6

        waveform_name = "streamwave"
        num_samples = len(first_data)


        # ✅ Allocate a named waveform
        session.allocate_named_waveform(waveform_name, num_samples)

        # ✅ Write the waveform using the name (not handle)
        session.write_waveform(waveform_name, first_data.tolist())

        # ✅ Write a script that references the named waveform
        session.write_script(f"""
        script main
                generate {waveform_name}
        end script
        """)


        # ✅ Start generation
        with session.initiate():
            session.wait_until_done()

if __name__ == "__main__":
    chunk_size_bytes = 64000 * 1024  # 64 KB
    waveform_path = r"C:\Users\max\Desktop\NanoStride\waveform_output.bin"
    stream_waveform(waveform_path, chunk_size_bytes)


In [44]:
def stream_full_waveform(waveform_path):

    # Determine chunk size and number of chunks
    chunk_size_bytes = allocation_size(waveform_path)
    total_size_bytes = os.path.getsize(waveform_path)
    num_chunks = total_size_bytes // chunk_size_bytes

    with open(waveform_path, "rb") as f:
        for _ in range(num_chunks):
            print(_)
            chunk = f.read(chunk_size_bytes)

            # Ensure chunk size is a multiple of 8 bytes (float64)
            valid_bytes = len(chunk) - (len(chunk) % 8)
            chunk = chunk[:valid_bytes]

            data = np.frombuffer(chunk, dtype='<f8').astype(np.float32)

            with nifgen.Session("Dev1") as session:
                session.output_mode = nifgen.OutputMode.SCRIPT
                session.arb_sample_rate = 25e6

                waveform_name = "streamwave"
                num_samples = len(data)


                # ✅ Allocate a named waveform
                session.allocate_named_waveform(waveform_name, num_samples)

                # ✅ Write the waveform using the name (not handle)
                session.write_waveform(waveform_name, data.tolist())

                # ✅ Write a script that references the named waveform
                session.write_script(f"""
                script main
                    repeat 1
                        generate {waveform_name}
                    end repeat
                end script
                """)


                # ✅ Start generation
                with session.initiate():
                    session.wait_until_done()

if __name__ == "__main__":
    chunk_size_bytes = 64000 * 1024  # 64 KB
    waveform_path = r"C:\Users\max\Desktop\NanoStride\waveform_output_smaller.bin"
    stream_full_waveform(waveform_path)


0
1
2
3
4
5
6
7
8
9
10
11


In [18]:
def stream_full_waveform(waveform_path):
    # Determine chunk size and number of chunks
    chunk_size_bytes = allocation_size(waveform_path)
    total_size_bytes = os.path.getsize(waveform_path)
    num_chunks = total_size_bytes // chunk_size_bytes
    bytes_streamed = 0  # Track total bytes streamed

    with open(waveform_path, "rb") as f:
        for _ in range(num_chunks):
            #print(_)
            chunk = f.read(chunk_size_bytes)
            #bytes_streamed += len(chunk)

            # Ensure chunk size is a multiple of 8 bytes (float64)
            valid_bytes = len(chunk) - (len(chunk) % 8)
            chunk = chunk[:valid_bytes]

            data = np.frombuffer(chunk, dtype='<f8').astype(np.float32)

            with nifgen.Session("Dev1") as session:
                session.output_mode = nifgen.OutputMode.SCRIPT
                session.arb_sample_rate = 100_000_000

                waveform_name = "streamwave"
                num_samples = len(data)

                # ✅ Allocate a named waveform
                session.allocate_named_waveform(waveform_name, num_samples)

                # ✅ Write the waveform using the name (not handle)
                session.write_waveform(waveform_name, data.tolist())

                # ✅ Write a script that references the named waveform
                session.write_script(f"""
                script main
                    repeat 1
                        generate {waveform_name}
                    end repeat
                end script
                """)

                # ✅ Start generation
                with session.initiate():
                    session.wait_until_done()

    #print(f"Total bytes streamed: {bytes_streamed}")
    #rint(f"Total file size     : {total_size_bytes}")


In [20]:
waveform_path = r"C:\Users\max\Desktop\NanoStride\waveform_output.bin"
stream_full_waveform(waveform_path)