Skip to content

Latest commit

 

History

History
674 lines (581 loc) · 27.9 KB

Lock-2-Linux_x86_Spin_Lock.md

File metadata and controls

674 lines (581 loc) · 27.9 KB

Linux x86自旋锁的实现

原理

  • struct raw_spinlockraw_lock成员分为headtail两部分
    • 如果处理器个数不超过 256,则head使用低位(0-7 位),tail使用高位( 8-15 位);
    • 如果处理器个数超过 256,则head使用低 16 位,tail使用高 16 位。
    • 可见排队自旋锁最多支持 2^16=65536 个处理器。
  • 排队自旋锁初始化时raw_lock被置为 0,即headtail置为 0。
  • 内核执行线程申请自旋锁时,原子地将tail域加 1,表示下一个持锁者的票据号将会是多少(Next),并将tail原值返回作为自己的票据序号(Ticket)。
  • 接着,取出head域,表示当前持锁者的票据号应是多少(Owner)。
    • 这个步骤无需是原子的,但需要用READ_ONCE()确保读到 Owner 是内存上的数据即可。
  • headtail相等时,表明锁处于未使用状态(此时也无人申请该锁)。
    • 如果返回的票据序号 Ticket 等于申请时的 Owner 值,说明自旋锁处于未使用状态,则直接获得锁;
    • 否则,该线程忙等待检查 Owner 是否等于自己持有的票据序号 Ticket,一旦相等,则表明锁轮到自己获取。
  • 线程释放锁时,原子地将 Owner 加 1 即可,下一个持有该 Ticket 的线程将会发现这一变化,从忙等待状态中退出,继续执行后续代码。
  • 线程将严格地按照申请顺序依次获取排队自旋锁,从而完全解决了“不公平”问题。
  • 这个实现将传统的 CAS 拆开,lock 和 unlock 是 Swap 部分,必须是原子的;忙等待为 Compare 部分,不需要是原子的,只需要保证 Owner 读的是内存里的数据即可。

弊端

  • 获取锁和释放锁的时候,需要 invalidate 参与竞争锁的 CPU 的锁字所在的 cache line
    • 参与竞争锁的 CPU 的锁字所在的 cache line 会因为需要不停比较锁字而处于 shared 状态
    • 硬件 cache 一致性协议,比如 MESI,在改变处于 shared 状态 的 cache line 时,需要先 invalidate 其他 CPU 上的 cache line
    • 改变完锁字之后,cache line 先是 modified 状态,由于其他的 CPU 在不停地读取,需要先写回,然后变为 shared 状态

实现

数据结构

  • include/linux/spinlock_types.h
typedef struct raw_spinlock {
    arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
    unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
    unsigned int magic, owner_cpu;
    void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
} raw_spinlock_t;
...
typedef struct spinlock {
    union {
        struct raw_spinlock rlock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
        struct {
            u8 __padding[LOCK_PADSIZE];
            struct lockdep_map dep_map;
        };  
#endif
    };  
} spinlock_t;
  • arch/x86/include/asm/spinlock_types.h
#ifdef CONFIG_PARAVIRT_SPINLOCKS
#define __TICKET_LOCK_INC   2
#define TICKET_SLOWPATH_FLAG    ((__ticket_t)1)
#else
#define __TICKET_LOCK_INC   1
#define TICKET_SLOWPATH_FLAG    ((__ticket_t)0)
#endif

/*如果处理器个数不超过 256,则 head 使用低位(0-7 位),tail 使用高位( 8-15 位);
  如果处理器个数超过 256,则 head 使用低 16 位,tail使用高 16 位。
  可见排队自旋锁最多支持 2^16=65536 个处理器。*/
#if (CONFIG_NR_CPUS < (256 / __TICKET_LOCK_INC))
typedef u8  __ticket_t;
typedef u16 __ticketpair_t;
#else
typedef u16 __ticket_t;
typedef u32 __ticketpair_t;
#endif

#define TICKET_LOCK_INC ((__ticket_t)__TICKET_LOCK_INC)

#define TICKET_SHIFT    (sizeof(__ticket_t) * 8)
...
#ifdef CONFIG_QUEUED_SPINLOCKS
#include <asm-generic/qspinlock_types.h>
#else
typedef struct arch_spinlock {
    union {
        __ticketpair_t head_tail;
        struct __raw_tickets {
            __ticket_t head, tail;
        } tickets;
    };
} arch_spinlock_t;
#define __ARCH_SPIN_LOCK_UNLOCKED   { { 0 } }
#endif /* CONFIG_QUEUED_SPINLOCKS */

封装方法

  • include/linux/spinlock.h
#define raw_spin_trylock(lock)  __cond_lock(lock, _raw_spin_trylock(lock))

#define raw_spin_lock(lock) _raw_spin_lock(lock)
...
#define raw_spin_lock_bh(lock)      _raw_spin_lock_bh(lock)
#define raw_spin_unlock(lock)       _raw_spin_unlock(lock)
...

static __always_inline void spin_lock(spinlock_t *lock)
{
    raw_spin_lock(&lock->rlock);
}

static __always_inline void spin_lock_bh(spinlock_t *lock)
{
    raw_spin_lock_bh(&lock->rlock);
}

static __always_inline int spin_trylock(spinlock_t *lock)
{
    return raw_spin_trylock(&lock->rlock);
}
...
static __always_inline void spin_unlock(spinlock_t *lock)
{
    raw_spin_unlock(&lock->rlock);
}
  • 根据CONFIG_INLINE_SPIN_LOCK选项是否配置,_raw_spin_lock()的实现稍有不同,但都会调用__raw_spin_lock()
  • include/linux/spinlock_api_smp.h
#ifdef CONFIG_INLINE_SPIN_LOCK
#define _raw_spin_lock(lock) __raw_spin_lock(lock)
#endif

#ifdef CONFIG_INLINE_SPIN_LOCK_BH
#define _raw_spin_lock_bh(lock) __raw_spin_lock_bh(lock)
#endif
...
#ifdef CONFIG_INLINE_SPIN_TRYLOCK
#define _raw_spin_trylock(lock) __raw_spin_trylock(lock)
#endif
...
#ifndef CONFIG_UNINLINE_SPIN_UNLOCK
#define _raw_spin_unlock(lock) __raw_spin_unlock(lock)
#endif
...

static inline void __raw_spin_lock_bh(raw_spinlock_t *lock)
{   /*先禁的下半部,同时,禁下半部也意味着禁止抢占*/
    __local_bh_disable_ip(_RET_IP_, SOFTIRQ_LOCK_OFFSET);
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    /*再去竞争锁*/
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

static inline int __raw_spin_trylock(raw_spinlock_t *lock)
{   /*注意:自旋锁会禁止抢占*/
    preempt_disable();
    if (do_raw_spin_trylock(lock)) {
        spin_acquire(&lock->dep_map, 0, 1, _RET_IP_);
        return 1;
    }   
    preempt_enable();
    return 0;
}
...
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{   /*注意:自旋锁会禁止抢占*/
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
    spin_release(&lock->dep_map, 1, _RET_IP_);
    do_raw_spin_unlock(lock);
    preempt_enable();
}
  • kernel/locking/spinlock.c
#ifndef CONFIG_INLINE_SPIN_TRYLOCK
int __lockfunc _raw_spin_trylock(raw_spinlock_t *lock)
{
    return __raw_spin_trylock(lock);
}
EXPORT_SYMBOL(_raw_spin_trylock);
#endif
...
#ifndef CONFIG_INLINE_SPIN_LOCK
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
    __raw_spin_lock(lock);
}
EXPORT_SYMBOL(_raw_spin_lock);
...
#ifndef CONFIG_INLINE_SPIN_LOCK_BH
void __lockfunc _raw_spin_lock_bh(raw_spinlock_t *lock)
{
    __raw_spin_lock_bh(lock);
}   
EXPORT_SYMBOL(_raw_spin_lock_bh);
#endif

#ifdef CONFIG_UNINLINE_SPIN_UNLOCK
void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
{
    __raw_spin_unlock(lock);
}
EXPORT_SYMBOL(_raw_spin_unlock);
#endif
...
#endif
  • 我们看CONFIG_LOCK_STAT选项没开启时LOCK_CONTENDED的实现
    • include/linux/lockdep.h
#ifdef CONFIG_LOCK_STAT
...
#else /* CONFIG_LOCK_STAT */

#define lock_contended(lockdep_map, ip) do {} while (0)
#define lock_acquired(lockdep_map, ip) do {} while (0)

#define LOCK_CONTENDED(_lock, try, lock) \
    lock(_lock)

#endif /* CONFIG_LOCK_STAT */
  • include/linux/spinlock.h
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{   
    __acquire(lock);
    arch_spin_lock(&lock->raw_lock);
}
...
static inline int do_raw_spin_trylock(raw_spinlock_t *lock)
{
    return arch_spin_trylock(&(lock)->raw_lock);
}

static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{
    arch_spin_unlock(&lock->raw_lock);
    __release(lock);
}
...

实现细节

  • arch/x86/include/asm/spinlock.h
static inline int  __tickets_equal(__ticket_t one, __ticket_t two)
{
    /*我们观察TICKET_SLOWPATH_FLAG为 0 的情况;
      如果one和two相等,异或结果为 0,取非后函数返回true;
      如果one和two不等,异或结果不为0,与TICKET_SLOWPATH_FLAG取反的结果进行与操作也不为0,取非后返回false。*/
    return !((one ^ two) & ~TICKET_SLOWPATH_FLAG);
}

static inline void __ticket_check_and_clear_slowpath(arch_spinlock_t *lock,
                            __ticket_t head)
{
    if (head & TICKET_SLOWPATH_FLAG) {
        arch_spinlock_t old, new;

        old.tickets.head = head;
        new.tickets.head = head & ~TICKET_SLOWPATH_FLAG;
        old.tickets.tail = new.tickets.head + TICKET_LOCK_INC;
        new.tickets.tail = old.tickets.tail;

        /* try to clear slowpath flag when there are no contenders */
        cmpxchg(&lock->head_tail, old.head_tail, new.head_tail);
    }
}

...

/*
 * Ticket locks are conceptually two parts, one indicating the current head of
 * the queue, and the other indicating the current tail. The lock is acquired
 * by atomically noting the tail and incrementing it by one (thus adding
 * ourself to the queue and noting our position), then waiting until the head
 * becomes equal to the the initial value of the tail.
 *
 * We use an xadd covering *both* parts of the lock, to increment the tail and
 * also load the position of the head, which takes care of memory ordering
 * issues and should be optimal for the uncontended case. Note the tail must be
 * in the high part, because a wide xadd increment of the low part would carry
 * up and contaminate the high part.
 */
static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
    /*TICKET_LOCK_INC为 1,加锁时tail+1,表示下一个获得锁的线程应持有的Ticket*/
    register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };

    /*xadd 将相加后的值存入指定内存地址,并返回指定地址里原来的值。
      实现时会在之前插入 lock 前缀,表示 lock 的指令执行期间锁住总线,于是该指令就是原子操作
      (锁总线会影响其他 CPU 对内存的操作,那怕是不相关的。如今多借助 cache 一致性协议实现原子操作)。
      这样,下面一条语句的效果就是:
      1. 锁的 tail 域加 1,即更新了 Next(锁的 tail 域)
      2. 同时读取 Owner(锁的 head 域)
      3. 本地变量 inc 的 tail 域存的是 Ticket,head 域存的是 Owner
     */
    inc = xadd(&lock->tickets, inc);
    /*如果 Owner 与 Ticket 相等表示获得锁,退出*/
    if (likely(inc.head == inc.tail))
        goto out;

    /*否则,进入自旋过程*/
    for (;;) {
        unsigned count = SPIN_THRESHOLD;

        do {
            /*不停读取 Owner,即锁对象的 head 域,更新到 inc.head*/
            inc.head = READ_ONCE(lock->tickets.head);
            /*Owner 等于 inc.tail 存的 Ticket,获得锁,退出*/
            if (__tickets_equal(inc.head, inc.tail))
                goto clear_slowpath;
            /*让 CPU 歇一会,后面会详细讲怎么歇*/
            cpu_relax();
        } while (--count);
        /*未配置 CONFIG_PARAVIRT_SPINLOCKS 选项,下面函数为空函数*/
        __ticket_lock_spinning(lock, inc.tail);
    }
clear_slowpath:
    /*未配置 CONFIG_PARAVIRT_SPINLOCKS 选项,下面函数啥也不做*/
    __ticket_check_and_clear_slowpath(lock, inc.head);
out:
    barrier();  /* make sure nothing creeps before the lock is taken */
}

static __always_inline int arch_spin_trylock(arch_spinlock_t *lock)
{
    arch_spinlock_t old, new;

    /*取出 Owner 与 Next */
    old.tickets = READ_ONCE(lock->tickets);
    /*Owner(.head)与Next(.tail)不等,锁已被其他线程持有,返回 0 */
    if (!__tickets_equal(old.tickets.head, old.tickets.tail))
        return 0;
    /*Owner与Next相等,开始尝试持有锁,仅仅是开始,这里操作的是栈上的变量new。
      Next(.tail)放在高位,故要将锁增量TICKET_LOCK_INC左移到高位再相加。
     */
    new.head_tail = old.head_tail + (TICKET_LOCK_INC << TICKET_SHIFT);
    new.head_tail &= ~TICKET_SLOWPATH_FLAG;

    /* cmpxchg is a full barrier, so nothing can move before it */
    /*cmpxchg 族指令实现原子地比较并交换,由于前面插入了 lock 前缀,因此 cmpxchg 指令
      执行时锁内存总线,别的线程无法更新锁。
      cmpxchg 比较 old.head_tail 与 lock->head_tail 地址里的值,返回原来
      lock->head_tail 地址里的值:
      1)如果相等,将 new.head_tail 的值存入 lock->head_tail 地址指向的内存。
         故返回值如果等于 old.head_tail 表示成功。执行线程由此获得锁,返回 true。
         此举原子地完成了比较和更新 Next (占用锁)的过程。
      2)如果不等,说明有其他核更新了 lock->head_tail 地址里的值,但这种情况也返回此时
         lock->head_tail 地址里的值。此时,返回值 == old.head_tail 的表达式必然不成立,
         尝试获取锁失败。
     */
    return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_tail;
}

static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
    if (TICKET_SLOWPATH_FLAG &&
        static_key_false(&paravirt_ticketlocks_enabled)) {
        __ticket_t head;

        BUILD_BUG_ON(((__ticket_t)NR_CPUS) != NR_CPUS);

        head = xadd(&lock->tickets.head, TICKET_LOCK_INC);

        if (unlikely(head & TICKET_SLOWPATH_FLAG)) {
            head &= ~TICKET_SLOWPATH_FLAG;
            __ticket_unlock_kick(lock, (head + TICKET_LOCK_INC));
        }
    } else
       /*解锁只需将 Owner(.head)原子地加 1 即可*/
        __add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);
}

static inline int arch_spin_is_locked(arch_spinlock_t *lock)
{
    struct __raw_tickets tmp = READ_ONCE(lock->tickets);
    /*判断是否能锁已被持有只需判断Next(.tail)与Owner(.head)是否一样即可*/
    return !__tickets_equal(tmp.tail, tmp.head);
}

Cacheline Lock

当 CPU0 试图执行原子递增操作时,

  1. CPU0 发出“Read Invalidate”消息,其他 CPU 将原子变量所在的缓存无效,并从 cache 返回数据。CPU0 将 cache line 置成 Exclusive 状态。然后将该 cache line 标记 locked
  2. 然后 CPU0 读取原子变量,修改,最后写入 cache line。
  3. 将 cache line 置为 unlocked

在步骤 1 和 3 之间,如果其他 CPU(例如 CPU1)尝试执行一个原子递增操作,

  1. CPU1 会发送一个“Read Invalidate”消息
  2. CPU0 收到消息后,检查对应的 cache line 的状态是 locked,暂时不回复消息
  3. CPU1 会一直等待 CPU0 回复“Invalidate Acknowledge”消息,直到 cache line 变成 unlocked。这样就可以实现原子操作。 我们称这种方式为 锁 cache line。这种实现方式必须要求操作的变量位于一个 cache line。
  • arch/x86/include/asm/cmpxchg.h
/*
 * An exchange-type operation, which takes a value and a pointer, and
 * returns the old value.
 */
#define __xchg_op(ptr, arg, op, lock)                   \
    ({                              \
            __typeof__ (*(ptr)) __ret = (arg);          \
        switch (sizeof(*(ptr))) {               \
        case __X86_CASE_B:                  \
            asm volatile (lock #op "b %b0, %1\n"        \
                      : "+q" (__ret), "+m" (*(ptr)) \
                      : : "memory", "cc");      \
            break;                      \
        case __X86_CASE_W:                  \
            asm volatile (lock #op "w %w0, %1\n"        \
                      : "+r" (__ret), "+m" (*(ptr)) \
                      : : "memory", "cc");      \
            break;                      \
        case __X86_CASE_L:                  \
            asm volatile (lock #op "l %0, %1\n"     \
                      : "+r" (__ret), "+m" (*(ptr)) \
                      : : "memory", "cc");      \
            break;                      \
        case __X86_CASE_Q:                  \
            asm volatile (lock #op "q %q0, %1\n"        \
                      : "+r" (__ret), "+m" (*(ptr)) \
                      : : "memory", "cc");      \
            break;                      \
        default:                        \
            __ ## op ## _wrong_size();          \
        }                           \
        __ret;                          \
    })
...

/*
 * xadd() adds "inc" to "*ptr" and atomically returns the previous
 * value of "*ptr".
 *
 * xadd() is locked when multiple CPUs are online
 * xadd_sync() is always locked
 * xadd_local() is never locked
 */
#define __xadd(ptr, inc, lock)  __xchg_op((ptr), (inc), xadd, lock)
#define xadd(ptr, inc)      __xadd((ptr), (inc), LOCK_PREFIX)
...
#define __add(ptr, inc, lock)                       \
    ({                              \
            __typeof__ (*(ptr)) __ret = (inc);          \
        switch (sizeof(*(ptr))) {               \
        case __X86_CASE_B:                  \
            asm volatile (lock "addb %b1, %0\n"     \
                      : "+m" (*(ptr)) : "qi" (inc)  \
                      : "memory", "cc");        \
            break;                      \
        case __X86_CASE_W:                  \
            asm volatile (lock "addw %w1, %0\n"     \
                      : "+m" (*(ptr)) : "ri" (inc)  \
                      : "memory", "cc");        \
            break;                      \
        case __X86_CASE_L:                  \
            asm volatile (lock "addl %1, %0\n"      \
                      : "+m" (*(ptr)) : "ri" (inc)  \
                      : "memory", "cc");        \
            break;                      \
        case __X86_CASE_Q:                  \
            asm volatile (lock "addq %1, %0\n"      \
                      : "+m" (*(ptr)) : "ri" (inc)  \
                      : "memory", "cc");        \
            break;                      \
        default:                        \
            __add_wrong_size();             \
        }                           \
        __ret;                          \
    })
...
  • arch/x86/include/asm/cmpxchg.h
/*
 * Atomic compare and exchange.  Compare OLD with MEM, if identical,
 * store NEW in MEM.  Return the initial value in MEM.  Success is
 * indicated by comparing RETURN with OLD.
 */
#define __raw_cmpxchg(ptr, old, new, size, lock)            \
({                                  \
    __typeof__(*(ptr)) __ret;                   \
    __typeof__(*(ptr)) __old = (old);               \
    __typeof__(*(ptr)) __new = (new);               \
    switch (size) {                         \
    case __X86_CASE_B:                      \
    {                               \
        volatile u8 *__ptr = (volatile u8 *)(ptr);      \
        asm volatile(lock "cmpxchgb %2,%1"          \
                 : "=a" (__ret), "+m" (*__ptr)      \
                 : "q" (__new), "0" (__old)         \
                 : "memory");               \
        break;                          \
    }                               \
    case __X86_CASE_W:                      \
    {                               \
        volatile u16 *__ptr = (volatile u16 *)(ptr);        \
        asm volatile(lock "cmpxchgw %2,%1"          \
                 : "=a" (__ret), "+m" (*__ptr)      \
                 : "r" (__new), "0" (__old)         \
                 : "memory");               \
        break;                          \
    }                               \
    case __X86_CASE_L:                      \
    {                               \
        volatile u32 *__ptr = (volatile u32 *)(ptr);        \
        asm volatile(lock "cmpxchgl %2,%1"          \
                 : "=a" (__ret), "+m" (*__ptr)      \
                 : "r" (__new), "0" (__old)         \
                 : "memory");               \
        break;                          \
    }                               \
    case __X86_CASE_Q:                      \
    {                               \
        volatile u64 *__ptr = (volatile u64 *)(ptr);        \
        asm volatile(lock "cmpxchgq %2,%1"          \
                 : "=a" (__ret), "+m" (*__ptr)      \
                 : "r" (__new), "0" (__old)         \
                 : "memory");               \
        break;                          \
    }                               \
    default:                            \
        __cmpxchg_wrong_size();                 \
    }                               \
    __ret;                              \
})

#define __cmpxchg(ptr, old, new, size)                  \
    __raw_cmpxchg((ptr), (old), (new), (size), LOCK_PREFIX)
...
#define cmpxchg(ptr, old, new)                      \
    __cmpxchg(ptr, old, new, sizeof(*(ptr)))\
...

cpu_relex() - 让CPU歇一会

  • arch/x86/include/asm/processor.h
/* REP NOP (PAUSE) is a good thing to insert into busy-wait loops. */
static __always_inline void rep_nop(void)
{
    asm volatile("rep; nop" ::: "memory");
}

static __always_inline void cpu_relax(void)
{
    rep_nop();
}
rep; nop; 是什么意思?
为什么不是 pause 指令?
  • pause指令的释义:Gives hint to processor that improves performance of spin-wait loops。
  • 注意:pause指令的操作码为F3 90
  • PAUSE — Spin Loop Hint

Description

Improves the performance of spin-wait loops. When executing a “spin-wait loop,” processors will suffer a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.

An additional function of the PAUSE instruction is to reduce the power consumed by a processor while executing a spin loop. A processor can execute a spin-wait loop extremely quickly, causing the processor to consume a lot of power while it waits for the resource it is spinning on to become available. Inserting a pause instruction in a spin-wait loop greatly reduces the processor’s power consumption.

This instruction was introduced in the Pentium 4 processors, but is backward compatible with all IA-32 processors. In earlier IA-32 processors, the PAUSE instruction operates like a NOP instruction. The Pentium 4 and Intel Xeon processors implement the PAUSE instruction as a delay. The delay is finite and can be zero for some processors. This instruction does not change the architectural state of the processor (that is, it performs essentially a delaying no-op operation).

This instruction’s operation is the same in non-64-bit modes and 64-bit mode.

  • 可参考 SDM Vol.3, 8.10 MANAGEMENT OF IDLE AND BLOCKED CONDITIONS, 8.10.2 PAUSE Instruction

  • 可见pause指令实现自旋等待的效果更好,原因在于:

    • 给处理器一个提示,我这里想要自旋,处理器籍此优化其性能。
    • pause指令实现的自旋等待循环,处理器会减少能源消耗。
  • 对于不支持超线程的处理器,用pause指令也是有益的。

  • 代码里写的rep; nop;,实际上它们的操作码与pause的操作码是相同的,这是为了向后兼容。

    • pause指令是Pentium 4处理器引入的,这样实现向后兼容所有的IA-32处理器。
    • 早期的IA-32处理器没有上面的优化效果,但依然会自旋等待(因为rep; nop;指令是支持的)。
    • 此处也有人讨论过。
为什么不单独用 nop 指令?

NOP instruction can be between 0.4-0.5 clocks and PAUSE instruction can consume 38-40 clocks. Please refer to the whitepaper on how to measure the latency and throughput of various instructions. The REPE instruction comes in various flavors and the latency/throughput of each of them varies. Please also see below for the sample code to measure the average clocks.

  • 简单地说,比起用pause指令,用nop指令让CPU歇的时间更短,能耗必然会增高。

spin_lock_bh()是先禁下半部还是先竞争锁?

  • 之前讲下半部加锁时就讨论过了,应该是先禁下半部。
  • 可以参考上面__raw_spin_lock_bh()的实现。
  • include/linux/bottom_half.h
#ifdef CONFIG_TRACE_IRQFLAGS
extern void __local_bh_disable_ip(unsigned long ip, unsigned int cnt);
#else
static __always_inline void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
{
    preempt_count_add(cnt);
    barrier();
}
#endif
...__

示例

动作 Next (lock.tail) Owner (lock.head) Ticket (local_variable.tail) 描述
初始状态 0 0 - -
P1加锁 1 0 0 Next + 1,Owner == Ticket == 0,P1获得锁
P2加锁 2 0 1 Next + 1 == 2,Ticket = Old Next = 1,Owner != Ticket,P2自旋
P3加锁 3 0 2 Next + 1 == 3,Ticket = Old Next = 2,Owner != Ticket,P3自旋
P1解锁 3 1 - Owner + 1 == 1,P1释放锁
P2得锁 3 1 1 Owner == Ticket == 1,P2获得锁
p4加锁 4 1 3 Next + 1 == 4,Ticket = Old Next = 3,Owner != Ticket,P4自旋
P2解锁 4 2 - Owner + 1 == 2,P2释放锁
P3得锁 4 2 2 Owner == Ticket == 2,P3获得锁
P3解锁 4 3 - Owner + 1 == 3,P3释放锁
P4得锁 4 3 3 Owner == Ticket == 3,P4获得锁
P4解锁 4 4 - Owner + 1 == 4,P4释放锁