# Exploring Fan Tach Measurement

I would like to read the speed of a computer fan with high accuracy and low
latency.  That seems like a problem that would have been thoroughly analyzed
and solved, but I'm not finding existing solutions.

## Literature

## Test Setup

I'm working with a micro controller and a 4-pin fan with PWM input and a tach
output.

The fan is a [Noctua A4x10 5V PWM](https://noctua.at/en/nf-a4x10-5v-pwm).
Noctua have published [a white paper](https://www.noctua.at/pub/media/wysiwyg/Noctua_PWM_specifications_white_paper.pdf) about their PWM system.  They've
also published [a brief description of the fan's
controller IC](https://noctua.at/en/ne_fd1_pwm_ic).

For our purposes, the things to note are that the fan's nominal speed ranges
from 1,000 to 5,000 RPM, it accepts a PWM input at 25 KHz, and it produces a
tach output, which has two pulses per fan resolution.

The micro controller is an ESP32S3.  The 'S3 has typical PWM outputs, and it
has an input capture peripheral.  At 25 KHz, the PWM has 10 bit resolution.
The capture peripheral can timestamp edges of a digital input signal with
the value of a free-running counter that increments at 80 MHz.


In [52]:
# Fan parameters
PWM_FREQ = 25_000
TACH_PULSES_PER_REV = 2
FAN_MAX_SPEED = 5000

# MCU parameters
CLK_FREQ = 80_000_000
PWM_BITS = 10
PWM_RESOLUTION = 2**PWM_BITS

## Firmware

I've written code for the MCU that runs one of four fixed scenarios and records
every edge of both the output PWM signal and the returning PWM signal.  It
creates a "log file" (actually just a recording of the serial output) that has
a compressed log file.

The log file records a single 32 bit number for each recorded edge.  The
number is one bit identifying the channel, PWM or tach; one bit for direction,
rising or falling; and 30 bits of timestamp.  The timestamp is just the value
of the 80 MHz counter at the instant of the edge.  It wraps around every
13 seconds.

In [53]:
from collections import namedtuple
from enum import IntEnum
import numpy as np
import re


class Channel(IntEnum):
    PWM = 1
    TACH = 0

class Dir(IntEnum):
    RISING = 1
    FALLING = 0


class Signal(namedtuple('Signal', 'name, chan, dir, clk')):
    
    def rising_edges(self):
        print(f'{self.dir=}')
        print(f'{self.dir == Dir.RISING=}')
        print(f'{Dir.RISING=}')
        print(f'{1 == Dir.RISING=}')
        print(f'{np.any(self.dir == Dir.RISING)=}')
        return self.clk[self.dir == Dir.RISING]
    
    def falling_edges(self):
        return self.clk[self.dir == Dir.FALLING]


class Scenario:

    def __init__(self, file, start_sec = 0):
        self.filename = file
        self._read(file, start_clk=int(start_sec * CLK_FREQ))

    def _read(self, file, start_clk=0):
        contents = open(file).read()
        m = re.search(r'.* scenario', contents)
        assert m, f"can't find scenario description in {file}"
        self.description = m.group(0)
        m = re.search(r'(\d+) events logged', contents)
        assert m, f"can't find event count in {file}"
        self.edge_count = int(m.group(1))

        hex_data = contents[m.span()[1]:]
        hex_edges = re.findall(r'[\da-f]+', hex_data)
        assert len(hex_edges) == self.edge_count
        int_edges = [int(x, base=16) for x in hex_edges]
        self.raw_edges = np.array(int_edges, dtype=np.uint32)

        edges = self._unwrap(self.raw_edges)
        # N.B. edges are now (signed) int64.

        chan, dir, clk = self._decode(edges)

        offset = clk[0] - start_clk

        pwm_edges = edges[chan == 1]
        tach_edges = edges[chan == 0]
        self.pwm = Signal('pwm', *self._decode(pwm_edges, offset=offset))
        self.tach = Signal('tach', *self._decode(tach_edges, offset=offset))

    def _unwrap(self, edges):
        """Timestamps wrap around.  Convert to unwrapped."""
        # This function will preserve the flags in the low bits.
        # N.B., sometimes edges are out of order, so the data
        # may wrap backward as well as forward.
        edges = np.array(edges, dtype=np.int64)
        delta = edges[1:] - edges[:-1]
        overflow = delta < -2**31
        underflow = delta > +2**31
        eitherflow = overflow.astype(np.int64) - underflow.astype(np.int64)
        # adjustment = 2**32 * np.cumsum(eitherflow)
        adjustment = 2**32 * eitherflow.cumsum()
        adjustment = np.hstack([[0], adjustment])
        return edges + adjustment

        return edges

    def _decode(self, edges, offset=0):
        # Each edge is 32 bits and encodes
        #  - a channel (tach = 0, PWM = 1)
        #  - a direction (falling = 0, rising = 1)
        #  - a 30 bit timestamp
        chan = edges & 1
        dir = edges >> 1 & 1
        clk = ((edges >> 2) - offset) & 0x3FFF_FFFF
        return (chan, dir, clk)

    def __repr__(self):
        return f'<{self.__class__.__name__}: {self.description}>'

    def __len__(self):
        return self.edge_count;

# Quick unwrap test
edges = np.array([0xFFFFFFFE, 0, 1, 0xFFFFFFFF,
                  1, 2, 0x7FFFFFFF, 0xFFFF0000, 3], dtype=np.uint32)
unwrapped = Scenario._unwrap(None, edges)
assert list(unwrapped) == [0xFFFFFFFE, 0x100000000, 0x100000001, 0xFFFFFFFF,
                           0x100000001, 0x100000002, 0x17FFFFFFF, 0x1FFFF0000,
                           0x200000003]



## Scenarios

There are four scenarios.

 * **Half Speed** -- The PWM signal is at a steady 50% duty cycle for
 five seconds.

 * **Bang Bang** -- The PWM signal starts at 0.  It jumps to 100%, stays there
 for five seconds, then jumps back to 0 and waits five seconds for the fan
 to stop.

 * **RAMP** -- The PWM signal starts at 0.  It gradually ramps up to 100% over
 five seconds, then it ramps back down to 0 over the next 5 seconds.

 * **Staircase** -- The PWM signal starts at 0.  It increases to 100% in eight
 steps, waiting 1.6 seconds at each step for the fan speed to (almost)
 stabilize.  Then it steps back down to 0 at the same speed.






In [54]:
half_speed = Scenario('../half-speed.log')
bang_bang = Scenario('../bang-bang.log', 1.0)
ramp = Scenario('../ramp.log')
staircase = Scenario('../staircase.log')

## Checking the Signals

I can test that the signals have some expected properties.  And then I can
fix them up.

### Alternating Edges

Every signal should strictly alternate rising and falling edges.


In [None]:
def verify_alternating_edges(scenario):
    for sig in [scenario.pwm, scenario.tach]:
        dir = sig.dir
        if len(dir):
            try:
                assert np.all(dir[::2] == dir[0])
                assert np.all(dir[1::2] != dir[0])
            except AssertionError:
                d0 = int(dir[0])
                anomalies = (dir != (d0 + np.arange(len(dir))) % 2).nonzero()[0]
                print(f'{len(anomalies)=}')
                print(f'{anomalies=}')
                for i in anomalies:
                    neighbors = np.arange(max(0, i - 2), i + 2)
                    print(f'{scenario.description}: {sig.name}')
                    print(f'    at {i}')
                    print(f'    dir {dir[neighbors]}')
                    print(f'    clk '
                           '{sig.clk[neighbors] - ([0] + sig.clk)[neighbors]}')
                raise

try:
    verify_alternating_edges(half_speed)
except AssertionError:
    for i in [568, 684]:
        half_speed.tach.dir[i] = 1
    verify_alternating_edges(half_speed)

verify_alternating_edges(bang_bang)

verify_alternating_edges(ramp)

try:
    verify_alternating_edges(staircase)
except AssertionError:
    for i in [859, 1281, 1353, 1585, 2303, 2561, 2663, 2809, 2873,
             2925, 3151, 3211, 3221, 3387, 3821, 3935, 3947, 4019]:
        staircase.tach.dir[i] = 1
    verify_alternating_edges(staircase)


In [None]:

def verify_pwm_sync(scenario):
    print(f'\n{scenario.description}\n')
    pwm = scenario.pwm

    f_clk_per_pulse = CLK_FREQ / PWM_FREQ
    clk_per_pulse = int(f_clk_per_pulse)
    assert clk_per_pulse == f_clk_per_pulse
    # print(f'{clk_per_pulse=}')

    rising = pwm.rising_edges()
    # print(f'{len(rising)=}')
    print(f'{len(pwm.dir)=}')
    z = (rising - rising[0]) % clk_per_pulse != 0
    print(f'{rising[:4]=}')
    # print(f'{rising[z]=}')
    # print(f'{np.argwhere(z)=}')
    print(f'{rising[112714:112721]=}')
    print(f'{pwm.falling_edges()[112714:112721]=}')
    assert np.all((rising - rising[0]) % clk_per_pulse == 0)

try:
    verify_alternating_edges(half_speed)
except AssertionError:
    show_nonalternating_edges(half_speed)
# verify_pwm_sync(half_speed)
# verify_pwm_sync(bang_bang)
# verify_pwm_sync(ramp)
# verify_pwm_sync(staircase)