
#  オペレーティングシステム 演習 06
#  条件変数


名前と学生証番号を書け. Enter your name and student ID.

 * 名前 Name:
 * 学生証番号 Student ID:


# 1. 条件変数
* もう一つの代表的な同期APIは条件変数
* 条件変数は汎用的な同期機構であり, つまり, 「待つ(ブロックする)」「起こす」という二つの操作だけを提供する
* どういう条件で待つか, 起こすかというのは条件変数を使う方が決める

## 1-1. C
* C言語 Pthreadの条件変数関連のAPIは以下
  * [pthread_cond_init](https://linux.die.net/man/3/pthread_cond_init)
  * [pthread_cond_destroy](https://linux.die.net/man/3/pthread_cond_destroy)
  * [pthread_cond_wait](https://linux.die.net/man/3/pthread_cond_wait)
  * [pthread_cond_signal](https://linux.die.net/man/3/pthread_cond_signal)
  * [pthread_cond_broadcast](https://linux.die.net/man/3/pthread_cond_broadcast)

* テンプレートは,
  * 何かデータ構造を見て, 望む状態_C_になっていなければ待つ
  * さらにそのデータ構造を変更して, 必要であれば待っているかもしれない人を起こす
というもので, それを以下のようにして実現する

```
pthread_mutex_lock(m);
while (1) {
    ...
    if (望む状態) break;
    pthread_cond_wait(c, m); // ...じゃないので寝る
}
変更;
場合により, pthread_cond_broadcast(c); // またはsignal
pthread_mutex_unlock(m);
```

## 1-2. Python
* Python threadingの条件変数関連のAPIは以下
* Pthreadと少し違うのは m を条件変数を作る際に渡すことができること
  * c = threading.Condition([m]) # m は Lock() オブジェクト
    * mを省略するとmを新しく作ったのと同じ意味になる
  * c.acquire()  # m.acquire() と同じ
  * c.release()  # m.release() と同じ
  * c.wait()     # pthread_cond_wait 相当
  * c.notify()   # pthread_cond_signal 相当
  * c.notify_all() # pthread_cond_broadcast 相当

* テンプレートは,
  * 何かデータ構造を見て, 望む状態_C_になっていなければ待つ
  * さらにそのデータ構造を変更して, 必要であれば待っているかもしれない人を起こす
というもので, それを以下のようにして実現する

```
c.acquire()
while 1:
    ...
    if 望む状態: break
    c.wait() # ...じゃないので寝る
}
変更
場合により, c.notify_all() # またはc.notify()
c.release()
```

# 2. 飽和カウンタ
## 2-1. C 
* 条件変数を使うもっとも簡単な例題として, 飽和カウンタを作ってみよう
* 飽和カウンタは上で作ったcounter_tとほぼ同じAPI (名前は scounter_t としておこう)
* 違いは以下
  * 初期化時に容量(capacity)を指定する. capacityはカウンタが取りうる値の最大値を意味する
  * 加算 (inc)をする際にcapacityを越えそうになったら(つまり元々値がcapacityだったら)条件変数の上でブロックする
* API  
  * データ型
```
typedef struct { ... } scounter_t;
```
  * 0 にする 
```
void scounter_init(scounter_t * c, long capacity);
```
  * +1 する (返り値: 深い意味はないが, 元の値を返すとする)
```
long scounter_inc(scounter_t * c);
```
  * -1 する (返り値: 深い意味はないが, 元の値を返すとする)
```
long scounter_dec(scounter_t * c);
```
  * 今の値を返す
```
long scounter_get(scounter_t * c);
```

* 以下は capacity を無視した版 (mutex は使っているが条件変数は使っていない)

In [None]:
%%writefile scounter.c
/* 注: このプログラムはOMP_NUM_THREADSを使わずにコマンドラインで受け取った引数でスレッド数を決めている(#pragma omp parallel num_threads(...)) */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <omp.h>

void die(char * msg) {
  perror(msg);
  exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_sec + ts->tv_nsec * 1.0e-9;
}

/* 飽和カウンタ */
typedef struct {
  long x;
  long capacity;
  pthread_mutex_t m[1];
} scounter_t;

/* 初期化(値を0にする) */
void scounter_init(scounter_t * s, long capacity) {
  s->x = 0;
  s->capacity = capacity;
  if (pthread_mutex_init(s->m, 0)) {
    die("pthread_mutex_init");
  }
}

/* +1 ただしcapacityに達していたら待つ */
long scounter_inc(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x + 1;
  pthread_mutex_unlock(s->m);
  assert(x < s->capacity);
  return x;
}

/* -1 */
long scounter_dec(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x - 1;
  pthread_mutex_unlock(s->m);
  return x;
}

/* 現在の値を返す */
long scounter_get(scounter_t * s) {
  return s->x;
}

int main(int argc, char ** argv) {
  int i = 1;
  /* incを呼ぶスレッド数 */
  int n_inc_threads = (argc > i ? atoi(argv[i]) : 3); i++;
  /* decを呼ぶスレッド数 */
  int n_dec_threads = (argc > i ? atoi(argv[i]) : 2); i++;
  /* incとdecが呼ばれる回数(全スレッドの合計) */
  long n            = (argc > i ? atol(argv[i]) : 10000); i++;
  /* 飽和する値 */
  long capacity     = (argc > i ? atol(argv[i]) : 10000); i++;
  
  scounter_t s[1];
  scounter_init(s, capacity);

  printf("increment threads : %d\n", n_inc_threads);
  printf("decrement threads : %d\n", n_dec_threads);
  printf("increments/decrements : %ld\n", n);
  printf("capacity : %ld\n", capacity);
  
  double t0 = cur_time();
#pragma omp parallel num_threads(n_inc_threads + n_dec_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_inc_threads) {
      /* increment */
      long a = n *  idx      / n_inc_threads;
      long b = n * (idx + 1) / n_inc_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_inc(s);
        assert(x < capacity);
      }
    } else {
      /* decrement */
      idx -= n_dec_threads;
      long a = n *  idx      / n_dec_threads;
      long b = n * (idx + 1) / n_dec_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_dec(s);
        assert(x <= capacity);
      }
    }
  }
  double t1 = cur_time();
  printf("took %.9f sec\n", t1 - t0);
  long x = scounter_get(s);
  printf("%s : value at the end = %ld\n", (x == 0? "OK" : "NG"), x);
  return (x == 0 ? 0 : 1);
}

In [None]:
gcc -Wall -fopenmp -o scounter scounter.c

* 実行は以下
```
./scounter INCするスレッド数 DECするスレッド数 INC(DEC)する回数 CAPACITY
```

* 例えば以下は 3スレッドがincを呼び, 2スレッドがdecを呼ぶ
* inc, decはそれぞれ3スレッド, 2スレッドの合計で, 10000回ずつ呼ばれる
* capacityも10000で, inc (dec)の回数と同じにしているので, カウンタは実際には飽和しない

In [None]:
./scounter 3 2 10000 10000

* 一方, inc回数 &gt; 容量とすると, ほとんどの場合, 途中でエラーになる

In [None]:
./scounter 3 2 100000 10000

* 条件変数を使ってこれを直すのが課題である

## 2-2. Python
* Python 版 API  
  * データ型
```
class scounter:
  ...
```
  * 値0 のscounterを作る
```
s = scounter(capacity)
```
  * +1 する (返り値: 深い意味はないが, 元の値を返すとする)
```
s.inc()
```
  * -1 する (返り値: 深い意味はないが, 元の値を返すとする)
```
s.dec()
```
  * 今の値を返す
```
s.get()
```

* 以下は capacity を無視した版 (mutex は使っているが条件変数は使っていない)

In [None]:
%%writefile scounter.py
import sys
import threading
import time

class scounter:
    """
    飽和カウンタ
    """
    def __init__(self, capacity):
        self.x = 0
        self.capacity = capacity
        self.m = threading.Lock()
    def inc(self):
        self.m.acquire()
        x = self.x
        self.x = x + 1
        self.m.release()
        return x
    def dec(self):
        self.m.acquire()
        x = self.x
        self.x = x - 1
        self.m.release()
        return x
    def get(self):
        return self.x

def main():
    argv = sys.argv
    # incを呼ぶスレッド数 
    n_inc_threads = int(argv[1]) if 1 < len(argv) else 3
    # decを呼ぶスレッド数 
    n_dec_threads = int(argv[2]) if 2 < len(argv) else 2
    # incとdecが呼ばれる回数(全スレッドの合計)
    n             = int(argv[3]) if 3 < len(argv) else 10000
    # 飽和する値
    capacity      = int(argv[4]) if 4 < len(argv) else 10000
    s = scounter(capacity)
    print(f"increment threads : {n_inc_threads}")
    print(f"decrement threads : {n_dec_threads}")
    print(f"increments/decrements : {n}")
    print(f"capacity : {capacity}")
    def inc_thread_fun(idx):
        # increment
        a = n *  idx      // n_inc_threads
        b = n * (idx + 1) // n_inc_threads
        for i in range(a, b):
            x = s.inc()
            assert(x < capacity), (x, capacity)
    def dec_thread_fun(idx):
        a = n *  idx      // n_dec_threads;
        b = n * (idx + 1) // n_dec_threads;
        for i in range(a, b):
            x = s.dec()
            assert(x <= capacity), (x, capacity)
    inc_threads = [threading.Thread(target=inc_thread_fun, args=(i,)) for i in range(n_inc_threads)]
    dec_threads = [threading.Thread(target=dec_thread_fun, args=(i,)) for i in range(n_dec_threads)]
    t0 = time.time()
    for th in inc_threads + dec_threads:
        th.start()
    for th in inc_threads + dec_threads:
        th.join()
    t1 = time.time()
    dt = t1 - t0
    print(f"took {dt:.9f} sec")
    x = s.get()
    ok_ng = "OK" if x == 0 else "NG"
    print(f"{ok_ng} : value at the end = {x}")
    return x != 0
    
sys.exit(main())


* 実行は以下
```
python3 scounter.py INCするスレッド数 DECするスレッド数 INC(DEC)する回数 CAPACITY
```

* 例えば以下は 3スレッドがincを呼び, 2スレッドがdecを呼ぶ
* inc, decはそれぞれ3スレッド, 2スレッドの合計で, 10000回ずつ呼ばれる
* capacityも10000で, inc (dec)の回数と同じにしているので, カウンタは実際には飽和しない

In [None]:
python3 scounter.py 3 2 10000 10000

* 一方, inc回数 &gt; 容量とすると, ほとんどの場合, 途中でエラーになる

In [None]:
python3 scounter.py 3 2 100000 10000

# <font color="green"> Problem 1 :  条件変数を使った飽和カウンタ (C)</font>

* 下掲のプログラムを変更して, 任意のスレッド数, inc (dec) 回数, capacity で正しく動作するようにせよ
* 寝ているスレッドを起こすのに `pthread_cond_broadcast` を使うべきか, `pthread_cond_signal` でもよいか考えよ
  * ヒント: 実は後者ではダメ. なかなか気づきにくい落とし穴がある. Terminal + GDBで調査でもしないとなかなか気づかないだろう
  * 止まってしまったプログラムをkillする方法は上述したのでそれをマスターしてから実験せよ

In [None]:
BEGIN SOLUTION
END SOLUTION
%%writefile scounter.c
/* 注: このプログラムはOMP_NUM_THREADSを使わずにコマンドラインで受け取った引数でスレッド数を決めている(#pragma omp parallel num_threads(...)) */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <omp.h>

void die(char * msg) {
  perror(msg);
  exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_sec + ts->tv_nsec * 1.0e-9;
}

/* 飽和カウンタ */
typedef struct {
  long x;
  long capacity;
  pthread_mutex_t m[1];
} scounter_t;

/* 初期化(値を0にする) */
void scounter_init(scounter_t * s, long capacity) {
  s->x = 0;
  s->capacity = capacity;
  if (pthread_mutex_init(s->m, 0)) {
    die("pthread_mutex_init");
  }
}

/* +1 ただしcapacityに達していたら待つ */
long scounter_inc(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x + 1;
  pthread_mutex_unlock(s->m);
  assert(x < s->capacity);
  return x;
}

/* -1 */
long scounter_dec(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x - 1;
  pthread_mutex_unlock(s->m);
  return x;
}

/* 現在の値を返す */
long scounter_get(scounter_t * s) {
  return s->x;
}

int main(int argc, char ** argv) {
  int i = 1;
  /* incを呼ぶスレッド数 */
  int n_inc_threads = (argc > i ? atoi(argv[i]) : 3); i++;
  /* decを呼ぶスレッド数 */
  int n_dec_threads = (argc > i ? atoi(argv[i]) : 2); i++;
  /* incとdecが呼ばれる回数(全スレッドの合計) */
  long n            = (argc > i ? atol(argv[i]) : 10000); i++;
  /* 飽和する値 */
  long capacity     = (argc > i ? atol(argv[i]) : 10000); i++;
  
  scounter_t s[1];
  scounter_init(s, capacity);

  printf("increment threads : %d\n", n_inc_threads);
  printf("decrement threads : %d\n", n_dec_threads);
  printf("increments/decrements : %ld\n", n);
  printf("capacity : %ld\n", capacity);
  
  double t0 = cur_time();
#pragma omp parallel num_threads(n_inc_threads + n_dec_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_inc_threads) {
      /* increment */
      long a = n *  idx      / n_inc_threads;
      long b = n * (idx + 1) / n_inc_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_inc(s);
        assert(x < capacity);
      }
    } else {
      /* decrement */
      idx -= n_dec_threads;
      long a = n *  idx      / n_dec_threads;
      long b = n * (idx + 1) / n_dec_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_dec(s);
        assert(x <= capacity);
      }
    }
  }
  double t1 = cur_time();
  printf("took %.9f sec\n", t1 - t0);
  long x = scounter_get(s);
  printf("%s : value at the end = %ld\n", (x == 0? "OK" : "NG"), x);
  return (x == 0 ? 0 : 1);
}

In [None]:
BEGIN SOLUTION
END SOLUTION
gcc -Wall -fopenmp -o scounter scounter.c

In [None]:
%%writefile scounter_ans.c
/* 注: このプログラムはOMP_NUM_THREADSを使わずにコマンドラインで受け取った引数でスレッド数を決めている(#pragma omp parallel num_threads(...)) */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <omp.h>

void die(char * msg) {
  perror(msg);
  exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_sec + ts->tv_nsec * 1.0e-9;
}

/* 飽和カウンタ */
typedef struct {
  long x;
  long capacity;
  pthread_mutex_t m[1];
  pthread_cond_t c[1];
} scounter_t;

/* 初期化(値を0にする) */
void scounter_init(scounter_t * s, long capacity) {
  s->x = 0;
  s->capacity = capacity;
  if (pthread_mutex_init(s->m, 0)) {
    die("pthread_mutex_init");
  }
  if (pthread_cond_init(s->c, 0)) {
    die("pthread_cond_init");
  }
}

/* +1 ただしcapacityに達していたら待つ */
long scounter_inc(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  while (x >= s->capacity) {
    assert(x == s->capacity);
    pthread_cond_wait(s->c, s->m);
    x = s->x;
  }
  s->x = x + 1;
  pthread_mutex_unlock(s->m);
  assert(x < s->capacity);
  return x;
}

/* -1 */
long scounter_dec(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x - 1;
  if (x >= s->capacity) {
    assert(x == s->capacity);
    pthread_cond_broadcast(s->c);
  }
  pthread_mutex_unlock(s->m);
  return x;
}

/* 現在の値を返す */
long scounter_get(scounter_t * s) {
  return s->x;
}

int main(int argc, char ** argv) {
  int i = 1;
  /* incを呼ぶスレッド数 */
  int n_inc_threads = (argc > i ? atoi(argv[i]) : 3); i++;
  /* decを呼ぶスレッド数 */
  int n_dec_threads = (argc > i ? atoi(argv[i]) : 2); i++;
  /* incとdecが呼ばれる回数(全スレッドの合計) */
  long n            = (argc > i ? atol(argv[i]) : 10000); i++;
  /* 飽和する値 */
  long capacity     = (argc > i ? atol(argv[i]) : 10000); i++;
  
  scounter_t s[1];
  scounter_init(s, capacity);

  printf("increment threads : %d\n", n_inc_threads);
  printf("decrement threads : %d\n", n_dec_threads);
  printf("increments/decrements : %ld\n", n);
  printf("capacity : %ld\n", capacity);
  
  double t0 = cur_time();
#pragma omp parallel num_threads(n_inc_threads + n_dec_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_inc_threads) {
      /* increment */
      long a = n *  idx      / n_inc_threads;
      long b = n * (idx + 1) / n_inc_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_inc(s);
        assert(x < capacity);
      }
    } else {
      /* decrement */
      idx -= n_dec_threads;
      long a = n *  idx      / n_dec_threads;
      long b = n * (idx + 1) / n_dec_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_dec(s);
        assert(x <= capacity);
      }
    }
  }
  double t1 = cur_time();
  printf("took %.9f sec\n", t1 - t0);
  long x = scounter_get(s);
  printf("%s : value at the end = %ld\n", (x == 0? "OK" : "NG"), x);
  return (x == 0 ? 0 : 1);
}

In [None]:
gcc -Wall -fopenmp -o scounter_ans scounter_ans.c

* 以下でテストせよ
  * テストは, 「単純(超探しやすい)」かつ「バグが発生しやすいケース」で行うのが基本
  * 今回の場合, 容量1というケースが最もバグが発生しやすい(見逃されにくい)と思われる

* 単純かつバグが発生しやすいケース (うまく動かなければ適宜 100000 を減らして, なるべく小さい数でテストをするのが基本. 本気でデバッグが必要なら terminal を使うことを推奨)

In [None]:
BEGIN SOLUTION
END SOLUTION
./scounter 1 1 100000 1

In [None]:
./scounter_ans 1 1 100000 1

* まともな容量でのテスト

In [None]:
BEGIN SOLUTION
END SOLUTION
./scounter 1 1 100000 1000

In [None]:
./scounter_ans 1 1 100000 1000

* 多数のスレッドでのテスト

In [None]:
BEGIN SOLUTION
END SOLUTION
./scounter 10 1 100000 1000

In [None]:
./scounter_ans 10 1 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
./scounter 1 10 100000 1000

In [None]:
./scounter_ans 1 10 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
./scounter 10 10 1000000 1000

In [None]:
./scounter_ans 10 10 1000000 1000

# <font color="green"> Problem 2 :  条件変数を使った飽和カウンタ (Python)</font>

* 下掲のプログラムを変更して, 任意のスレッド数, inc (dec) 回数, capacity で正しく動作するようにせよ

In [None]:
BEGIN SOLUTION
END SOLUTION
%%writefile scounter.py
import sys
import threading
import time

class scounter:
    """
    飽和カウンタ
    """
    def __init__(self, capacity):
        self.x = 0
        self.capacity = capacity
        self.m = threading.Lock()
    def inc(self):
        self.m.acquire()
        x = self.x
        self.x = x + 1
        self.m.release()
        return x
    def dec(self):
        self.m.acquire()
        x = self.x
        self.x = x - 1
        self.m.release()
        return x
    def get(self):
        return self.x

def main():
    argv = sys.argv
    # incを呼ぶスレッド数 
    n_inc_threads = int(argv[1]) if 1 < len(argv) else 3
    # decを呼ぶスレッド数 
    n_dec_threads = int(argv[2]) if 2 < len(argv) else 2
    # incとdecが呼ばれる回数(全スレッドの合計)
    n             = int(argv[3]) if 3 < len(argv) else 10000
    # 飽和する値
    capacity      = int(argv[4]) if 4 < len(argv) else 10000
    s = scounter(capacity)
    print(f"increment threads : {n_inc_threads}")
    print(f"decrement threads : {n_dec_threads}")
    print(f"increments/decrements : {n}")
    print(f"capacity : {capacity}")
    def inc_thread_fun(idx):
        # increment
        a = n *  idx      // n_inc_threads
        b = n * (idx + 1) // n_inc_threads
        for i in range(a, b):
            x = s.inc()
            assert(x < capacity), (x, capacity)
    def dec_thread_fun(idx):
        a = n *  idx      // n_dec_threads;
        b = n * (idx + 1) // n_dec_threads;
        for i in range(a, b):
            x = s.dec()
            assert(x <= capacity), (x, capacity)
    inc_threads = [threading.Thread(target=inc_thread_fun, args=(i,)) for i in range(n_inc_threads)]
    dec_threads = [threading.Thread(target=dec_thread_fun, args=(i,)) for i in range(n_dec_threads)]
    t0 = time.time()
    for th in inc_threads + dec_threads:
        th.start()
    for th in inc_threads + dec_threads:
        th.join()
    t1 = time.time()
    dt = t1 - t0
    print(f"took {dt:.9f} sec")
    x = s.get()
    ok_ng = "OK" if x == 0 else "NG"
    print(f"{ok_ng} : value at the end = {x}")
    return x != 0
    
sys.exit(main())


In [None]:
%%writefile scounter_ans.py
import sys
import threading
import time

class scounter:
    """
    飽和カウンタ
    """
    def __init__(self, capacity):
        self.x = 0
        self.capacity = capacity
        self.m = threading.Lock()
        self.c = threading.Condition(self.m)
    def inc(self):
        self.m.acquire()
        x = self.x
        while x >= self.capacity:
            assert(x == self.capacity), (x, self.capacity)
            self.c.wait()
            x = self.x
        self.x = x + 1
        self.m.release()
        return x
    def dec(self):
        self.m.acquire()
        x = self.x
        self.x = x - 1
        if x >= self.capacity:
            assert(x == self.capacity), (x, self.capacity)
            self.c.notify_all()
        self.m.release()
        return x
    def get(self):
        return self.x

def main():
    argv = sys.argv
    # incを呼ぶスレッド数 
    n_inc_threads = int(argv[1]) if 1 < len(argv) else 3
    # decを呼ぶスレッド数 
    n_dec_threads = int(argv[2]) if 2 < len(argv) else 2
    # incとdecが呼ばれる回数(全スレッドの合計)
    n             = int(argv[3]) if 3 < len(argv) else 10000
    # 飽和する値
    capacity      = int(argv[4]) if 4 < len(argv) else 10000
    s = scounter(capacity)
    print(f"increment threads : {n_inc_threads}")
    print(f"decrement threads : {n_dec_threads}")
    print(f"increments/decrements : {n}")
    print(f"capacity : {capacity}")
    def inc_thread_fun(idx):
        # increment
        a = n *  idx      // n_inc_threads
        b = n * (idx + 1) // n_inc_threads
        for i in range(a, b):
            x = s.inc()
            assert(x < capacity), (x, capacity)
    def dec_thread_fun(idx):
        a = n *  idx      // n_dec_threads;
        b = n * (idx + 1) // n_dec_threads;
        for i in range(a, b):
            x = s.dec()
            assert(x <= capacity), (x, capacity)
    inc_threads = [threading.Thread(target=inc_thread_fun, args=(i,)) for i in range(n_inc_threads)]
    dec_threads = [threading.Thread(target=dec_thread_fun, args=(i,)) for i in range(n_dec_threads)]
    t0 = time.time()
    for th in inc_threads + dec_threads:
        th.start()
    for th in inc_threads + dec_threads:
        th.join()
    t1 = time.time()
    dt = t1 - t0
    print(f"took {dt:.9f} sec")
    x = s.get()
    ok_ng = "OK" if x == 0 else "NG"
    print(f"{ok_ng} : value at the end = {x}")
    return x != 0
    
sys.exit(main())


* 以下でテストせよ
  * テストは, 「単純(超探しやすい)」かつ「バグが発生しやすいケース」で行うのが基本
  * 今回の場合, 容量1というケースが最もバグが発生しやすい(見逃されにくい)と思われる

* 単純かつバグが発生しやすいケース (うまく動かなければ適宜 100000 を減らして, なるべく小さい数でテストをするのが基本. 本気でデバッグが必要なら terminal を使うことを推奨)

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 scounter.py 1 1 100000 1

In [None]:
python3 scounter_ans.py 1 1 100000 1

* まともな容量でのテスト

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 scounter.py 1 1 100000 1000

In [None]:
python3 scounter_ans.py 1 1 100000 1000

* 多数のスレッドでのテスト

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 scounter.py 10 1 100000 1000

In [None]:
python3 scounter_ans.py 10 1 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 scounter.py 1 10 100000 1000

In [None]:
python3 scounter_ans.py 1 10 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 scounter.py 10 10 1000000 1000

In [None]:
python3 scounter_ans.py 10 10 1000000 1000

# <font color="green"> Problem 3 :  両側飽和カウンタ (C)</font>

* どうせならカウンタの上限を定めるだけでなく, 下限も定めたい. ここでは0ということにしておこう
* つまり, 0のときに dec が行われたらそこでも待つようにする
* 下掲のプログラムを変更して, 任意のスレッド数, inc (dec) 回数, capacity で正しく動作するようにせよ
* ヒント: decをしようとして値が0だった際に待つための条件変数をもう一つ用意する

In [None]:
BEGIN SOLUTION
END SOLUTION
%%writefile tcounter.c
/* 注: このプログラムはOMP_NUM_THREADSを使わずにコマンドラインで受け取った引数でスレッド数を決めている(#pragma omp parallel num_threads(...)) */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <omp.h>

void die(char * msg) {
  perror(msg);
  exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_sec + ts->tv_nsec * 1.0e-9;
}

/* 飽和カウンタ */
typedef struct {
  long x;
  long capacity;
  pthread_mutex_t m[1];
} scounter_t;

/* 初期化(値を0にする) */
void scounter_init(scounter_t * s, long capacity) {
  s->x = 0;
  s->capacity = capacity;
  if (pthread_mutex_init(s->m, 0)) {
    die("pthread_mutex_init");
  }
}

/* +1 ただしcapacityに達していたら待つ */
long scounter_inc(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x + 1;
  pthread_mutex_unlock(s->m);
  assert(x < s->capacity);
  return x;
}

/* -1 */
long scounter_dec(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  s->x = x - 1;
  pthread_mutex_unlock(s->m);
  return x;
}

/* 現在の値を返す */
long scounter_get(scounter_t * s) {
  return s->x;
}

int main(int argc, char ** argv) {
  int i = 1;
  /* incを呼ぶスレッド数 */
  int n_inc_threads = (argc > i ? atoi(argv[i]) : 3); i++;
  /* decを呼ぶスレッド数 */
  int n_dec_threads = (argc > i ? atoi(argv[i]) : 2); i++;
  /* incとdecが呼ばれる回数(全スレッドの合計) */
  long n            = (argc > i ? atol(argv[i]) : 10000); i++;
  /* 飽和する値 */
  long capacity     = (argc > i ? atol(argv[i]) : 10000); i++;
  
  scounter_t s[1];
  scounter_init(s, capacity);

  printf("increment threads : %d\n", n_inc_threads);
  printf("decrement threads : %d\n", n_dec_threads);
  printf("increments/decrements : %ld\n", n);
  printf("capacity : %ld\n", capacity);
  
  double t0 = cur_time();
#pragma omp parallel num_threads(n_inc_threads + n_dec_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_inc_threads) {
      /* increment */
      long a = n *  idx      / n_inc_threads;
      long b = n * (idx + 1) / n_inc_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_inc(s);
        assert(x < capacity);
        assert(x >= 0);
      }
    } else {
      /* decrement */
      idx -= n_dec_threads;
      long a = n *  idx      / n_dec_threads;
      long b = n * (idx + 1) / n_dec_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_dec(s);
        assert(x <= capacity);
        assert(x > 0);
      }
    }
  }
  double t1 = cur_time();
  printf("took %.9f sec\n", t1 - t0);
  long x = scounter_get(s);
  printf("%s : value at the end = %ld\n", (x == 0? "OK" : "NG"), x);
  return (x == 0 ? 0 : 1);
}

In [None]:
BEGIN SOLUTION
END SOLUTION
gcc -Wall -fopenmp -o tcounter tcounter.c

In [None]:
%%writefile tcounter_ans.c
/* 注: このプログラムはOMP_NUM_THREADSを使わずにコマンドラインで受け取った引数でスレッド数を決めている(#pragma omp parallel num_threads(...)) */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <omp.h>

void die(char * msg) {
  perror(msg);
  exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_sec + ts->tv_nsec * 1.0e-9;
}

/* 飽和カウンタ */
typedef struct {
  long x;
  long capacity;
  pthread_mutex_t m[1];
  pthread_cond_t c[1];
  pthread_cond_t d[1];
} scounter_t;

/* 初期化(値を0にする) */
void scounter_init(scounter_t * s, long capacity) {
  s->x = 0;
  s->capacity = capacity;
  if (pthread_mutex_init(s->m, 0)) {
    die("pthread_mutex_init");
  }
  if (pthread_cond_init(s->c, 0)) {
    die("pthread_cond_init");
  }
  if (pthread_cond_init(s->d, 0)) {
    die("pthread_cond_init");
  }
}

/* +1 ただしcapacityに達していたら待つ */
long scounter_inc(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  while (x >= s->capacity) {
    assert(x == s->capacity);
    pthread_cond_wait(s->c, s->m);
    x = s->x;
  }
  s->x = x + 1;
  if (x <= 0) {
    assert(x == 0);
    pthread_cond_broadcast(s->d);
  }
  pthread_mutex_unlock(s->m);
  assert(x < s->capacity);
  return x;
}

/* -1 */
long scounter_dec(scounter_t * s) {
  pthread_mutex_lock(s->m);
  long x = s->x;
  while (x <= 0) {
    assert(x == 0);
    pthread_cond_wait(s->d, s->m);
    x = s->x;
  }
  s->x = x - 1;
  if (x >= s->capacity) {
    assert(x == s->capacity);
    pthread_cond_broadcast(s->c);
  }
  pthread_mutex_unlock(s->m);
  return x;
}

/* 現在の値を返す */
long scounter_get(scounter_t * s) {
  return s->x;
}

int main(int argc, char ** argv) {
  int i = 1;
  /* incを呼ぶスレッド数 */
  int n_inc_threads = (argc > i ? atoi(argv[i]) : 3); i++;
  /* decを呼ぶスレッド数 */
  int n_dec_threads = (argc > i ? atoi(argv[i]) : 2); i++;
  /* incとdecが呼ばれる回数(全スレッドの合計) */
  long n            = (argc > i ? atol(argv[i]) : 10000); i++;
  /* 飽和する値 */
  long capacity     = (argc > i ? atol(argv[i]) : 10000); i++;
  
  scounter_t s[1];
  scounter_init(s, capacity);

  printf("increment threads : %d\n", n_inc_threads);
  printf("decrement threads : %d\n", n_dec_threads);
  printf("increments/decrements : %ld\n", n);
  printf("capacity : %ld\n", capacity);
  
  double t0 = cur_time();
#pragma omp parallel num_threads(n_inc_threads + n_dec_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_inc_threads) {
      /* increment */
      long a = n *  idx      / n_inc_threads;
      long b = n * (idx + 1) / n_inc_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_inc(s);
        assert(x < capacity);
        assert(x >= 0);
      }
    } else {
      /* decrement */
      idx -= n_dec_threads;
      long a = n *  idx      / n_dec_threads;
      long b = n * (idx + 1) / n_dec_threads;
      for (long i = a; i < b; i++) {
        long x = scounter_dec(s);
        assert(x <= capacity);
        assert(x > 0);
      }
    }
  }
  double t1 = cur_time();
  printf("took %.9f sec\n", t1 - t0);
  long x = scounter_get(s);
  printf("%s : value at the end = %ld\n", (x == 0? "OK" : "NG"), x);
  return (x == 0 ? 0 : 1);
}

In [None]:
gcc -Wall -fopenmp -o tcounter_ans tcounter_ans.c

In [None]:
BEGIN SOLUTION
END SOLUTION
./tcounter 1 1 100000 1

In [None]:
./tcounter_ans 1 1 100000 1

In [None]:
BEGIN SOLUTION
END SOLUTION
./tcounter 1 1 100000 1000

In [None]:
./tcounter_ans 1 1 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
./tcounter 10 1 100000 1000

In [None]:
./tcounter_ans 10 1 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
./tcounter 1 10 100000 1000

In [None]:
./tcounter_ans 1 10 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
./tcounter 10 10 1000000 1000

In [None]:
./tcounter_ans 10 10 1000000 1000

# <font color="green"> Problem 4 :  両側飽和カウンタ (Python)</font>
* Pythonで同じことを行え

In [None]:
BEGIN SOLUTION
END SOLUTION
%%writefile tcounter.py
import sys
import threading
import time

class scounter:
    """
    飽和カウンタ
    """
    def __init__(self, capacity):
        self.x = 0
        self.capacity = capacity
        self.m = threading.Lock()
    def inc(self):
        self.m.acquire()
        x = self.x
        self.x = x + 1
        self.m.release()
        return x
    def dec(self):
        self.m.acquire()
        x = self.x
        self.x = x - 1
        self.m.release()
        return x
    def get(self):
        return self.x

def main():
    argv = sys.argv
    # incを呼ぶスレッド数 
    n_inc_threads = int(argv[1]) if 1 < len(argv) else 3
    # decを呼ぶスレッド数 
    n_dec_threads = int(argv[2]) if 2 < len(argv) else 2
    # incとdecが呼ばれる回数(全スレッドの合計)
    n             = int(argv[3]) if 3 < len(argv) else 10000
    # 飽和する値
    capacity      = int(argv[4]) if 4 < len(argv) else 10000
    s = scounter(capacity)
    print(f"increment threads : {n_inc_threads}")
    print(f"decrement threads : {n_dec_threads}")
    print(f"increments/decrements : {n}")
    print(f"capacity : {capacity}")
    def inc_thread_fun(idx):
        # increment
        a = n *  idx      // n_inc_threads
        b = n * (idx + 1) // n_inc_threads
        for i in range(a, b):
            x = s.inc()
            assert(x < capacity), (x, capacity)
            assert(x >= 0), x
    def dec_thread_fun(idx):
        a = n *  idx      // n_dec_threads;
        b = n * (idx + 1) // n_dec_threads;
        for i in range(a, b):
            x = s.dec()
            assert(x <= capacity), (x, capacity)
            assert(x > 0), x
    inc_threads = [threading.Thread(target=inc_thread_fun, args=(i,)) for i in range(n_inc_threads)]
    dec_threads = [threading.Thread(target=dec_thread_fun, args=(i,)) for i in range(n_dec_threads)]
    t0 = time.time()
    for th in inc_threads + dec_threads:
        th.start()
    for th in inc_threads + dec_threads:
        th.join()
    t1 = time.time()
    dt = t1 - t0
    print(f"took {dt:.9f} sec")
    x = s.get()
    ok_ng = "OK" if x == 0 else "NG"
    print(f"{ok_ng} : value at the end = {x}")
    return x != 0
    
sys.exit(main())


In [None]:
%%writefile tcounter_ans.py
import sys
import threading
import time

class scounter:
    """
    飽和カウンタ
    """
    def __init__(self, capacity):
        self.x = 0
        self.capacity = capacity
        self.m = threading.Lock()
        self.c = threading.Condition(self.m)
        self.d = threading.Condition(self.m)
    def inc(self):
        self.m.acquire()
        x = self.x
        while x >= self.capacity:
            assert(x == self.capacity), (x, self.capacity)
            self.c.wait()
            x = self.x
        self.x = x + 1
        if x <= 0:
            assert(x == 0), x
            self.d.notify_all()
        self.m.release()
        return x
    def dec(self):
        self.m.acquire()
        x = self.x
        while x <= 0:
            assert(x == 0), x
            self.d.wait()
            x = self.x
        self.x = x - 1
        if x >= self.capacity:
            assert(x == self.capacity), (x, self.capacity)
            self.c.notify_all()
        self.m.release()
        return x
    def get(self):
        return self.x

def main():
    argv = sys.argv
    # incを呼ぶスレッド数 
    n_inc_threads = int(argv[1]) if 1 < len(argv) else 3
    # decを呼ぶスレッド数 
    n_dec_threads = int(argv[2]) if 2 < len(argv) else 2
    # incとdecが呼ばれる回数(全スレッドの合計)
    n             = int(argv[3]) if 3 < len(argv) else 10000
    # 飽和する値
    capacity      = int(argv[4]) if 4 < len(argv) else 10000
    s = scounter(capacity)
    print(f"increment threads : {n_inc_threads}")
    print(f"decrement threads : {n_dec_threads}")
    print(f"increments/decrements : {n}")
    print(f"capacity : {capacity}")
    def inc_thread_fun(idx):
        # increment
        a = n *  idx      // n_inc_threads
        b = n * (idx + 1) // n_inc_threads
        for i in range(a, b):
            x = s.inc()
            assert(x < capacity), (x, capacity)
            assert(x >= 0), x
    def dec_thread_fun(idx):
        a = n *  idx      // n_dec_threads;
        b = n * (idx + 1) // n_dec_threads;
        for i in range(a, b):
            x = s.dec()
            assert(x <= capacity), (x, capacity)
            assert(x > 0), x
    inc_threads = [threading.Thread(target=inc_thread_fun, args=(i,)) for i in range(n_inc_threads)]
    dec_threads = [threading.Thread(target=dec_thread_fun, args=(i,)) for i in range(n_dec_threads)]
    t0 = time.time()
    for th in inc_threads + dec_threads:
        th.start()
    for th in inc_threads + dec_threads:
        th.join()
    t1 = time.time()
    dt = t1 - t0
    print(f"took {dt:.9f} sec")
    x = s.get()
    ok_ng = "OK" if x == 0 else "NG"
    print(f"{ok_ng} : value at the end = {x}")
    return x != 0
    
sys.exit(main())


In [None]:
BEGIN SOLUTION
END SOLUTION
python3 tcounter.py 1 1 100000 1

In [None]:
python3 tcounter_ans.py 1 1 100000 1

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 tcounter.py 1 1 100000 1000

In [None]:
python3 tcounter_ans.py 1 1 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 tcounter.py 10 1 100000 1000

In [None]:
python3 tcounter_ans.py 10 1 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 tcounter.py 1 10 100000 1000

In [None]:
python3 tcounter_ans.py 1 10 100000 1000

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 tcounter.py 10 10 1000000 1000

In [None]:
python3 tcounter_ans.py 10 10 1000000 1000

# 3. 有限バッファ
* 有限バッファは, 固定長の配列で, スレッド間でデータを受け渡すためのデータ構造

## 3-1. Cの有限バッファ* APIは
  * bounded_buffer_t bb[1];
  * bounded_buffer_init(bb, capacity);
  * bounded_buffer_put(bb, x);
  * long x = bounded_buffer_get(bb);
* capacityは保持できるデータ数(= putされて, まだgetされていないデータ数)
* capacity個のデータが既に保持されているときにputされたら, getされるまで待つ
* 1個もデータがないときにgetされたら, putされるまで待つ
という動作をする

以下は未完成版で,
* capacity個のデータが既に保持されているときにputされたら, 0を返す(そうでなければ1を返す)
* 1個もデータがないときにgetされたら, -1を返す (putするデータ(従ってgetされるデータ)は0以上とする)
という動作をする

In [None]:
%%writefile bbuf.c
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <omp.h>
#include <pthread.h>

void die(const char * msg) {
  perror(msg); exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_nsec * 1.0E-9 + ts->tv_sec;
}

/* 有限バッファ
   要素は必ず >= 0 とする */
typedef struct {
  long n_gets;                  /* getされた回数 */
  long n_puts;                  /* putされた回数 */
  long capacity;                /* 容量 */
  long * a;                     /* 中身(capacity要素の配列) */
} bounded_buffer_t;

/* 容量 capacity で初期化 */
void bounded_buffer_init(bounded_buffer_t * bb, long capacity) {
  long * a = (long *)malloc(sizeof(long) * capacity);
  if (!a) die("malloc");
  bb->a = a;
  bb->capacity = capacity;
  bb->n_gets = 0;
  bb->n_puts = 0;
}

/* 要素を追加 
 * 満杯だったら待つようにするのが課題
 * 以下はそのままだと満杯の場合は0を返す(間違い)
 */
int bounded_buffer_put(bounded_buffer_t * bb, long x) {
  long g = bb->n_gets;
  long p = bb->n_puts;
  long cap = bb->capacity;
  assert(x >= 0);
  if (p - g >= cap) {
    return 0;                   /* NG */
  }
  bb->a[p % cap] = x;
  bb->n_puts = p + 1;
  return 1;                     /* OK */
}

/* 要素を取り出す
 * 空だったら待つようにするのが課題
 * 以下はそのままだと空の場合は-1を返す(間違い)
 */
long bounded_buffer_get(bounded_buffer_t * bb) {
  long g = bb->n_gets;
  long p = bb->n_puts;
  long cap = bb->capacity;
  if (p - g <= 0) {
    return -1;                  /* 空 */
  }
  long x = bb->a[g % cap];
  bb->n_gets = g + 1;
  assert(x >= 0);
  return x;
}

int main(int argc, char ** argv) {
  long i = 1;
  /* putするスレッド数 */
  int n_putter_threads = (argc > i ? atoi(argv[i]) : 1); i++;
  /* getするスレッド数 */
  int n_getter_threads = (argc > i ? atoi(argv[i]) : 1); i++;
  /* putとgetの間にbarrierを入れるか? */
  int barrier_between_puts_gets = (argc > i ? atoi(argv[i]) : 1); i++;
  /* put (get)される回数 */
  long n                = (argc > i ? atol(argv[i]) : 1000000); i++;
  /* 容量 */
  long capacity         = (argc > i ? atol(argv[i]) : 1000); i++;
  /* 検証用(validate[x] == 1 iff getでxが取り出された) */
  char * validate = (char *)calloc(n, 1);

  bounded_buffer_t bb[1];
  bounded_buffer_init(bb, capacity);

  int nthreads = n_putter_threads + n_getter_threads;
  pthread_barrier_t barrier[1];
  pthread_barrier_init(barrier, 0, nthreads);

  double t0 = cur_time();
#pragma omp parallel num_threads(n_putter_threads + n_getter_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_putter_threads) {
      /* I am a putter thread */
      long a = n *  idx      / n_putter_threads;
      long b = n * (idx + 1) / n_putter_threads;
      /* 0,1,...,n-1 を1つずつput */
      for (long x = a; x < b; x++) {
        int ok = bounded_buffer_put(bb, x);
        assert(ok);
      }
      if (barrier_between_puts_gets) {
        /* putが全員終わってからget */
        pthread_barrier_wait(barrier);
      }
    } else {
      /* I am a getter thread */
      idx -= n_putter_threads;
      long a = n *  idx      / n_getter_threads;
      long b = n * (idx + 1) / n_getter_threads;
      if (barrier_between_puts_gets) {
        /* putが全員終わってからget */
        pthread_barrier_wait(barrier);
      }
      /* 合計n回get */
      for (long x = a; x < b; x++) {
        long x = bounded_buffer_get(bb);
        assert(x >= 0);
        assert(x < n);
        assert(validate[x] == 0);
        validate[x] = 1;
      }
    }
  }
  double t1 = cur_time();
  printf("%f sec\n", t1 - t0);
  for (long i = 0; i < n; i++) {
    assert(validate[i]);
  }
  printf("OK\n");
  return 0;
}

In [None]:
BEGIN SOLUTION
END SOLUTION
gcc -Wall -fopenmp -o bbuf bbuf.c

```
./bbuf put_threads get_threads sync_between_puts_and_gets n capacity
```
 
で起動されると
 * 容量は capacity のbuffer が作られる
 * put_threads 個のスレッドが作られ, putする
 * get_threads 個のスレッドが作られ, getする
 * sync_between_puts_and_gets == 0 の場合, putするスレッドとgetするスレッドも並行に動く. sync_between_puts_and_gets == 1の場合, putスレッドがすべてのデータをputし終えてからgetスレッドがgetを始める
 * 合計 n 個のデータが put (get) される
 
与えれた未完成バージョンは以下の条件であれば動作する
 * put_threads == 1
 * get_threads == 1
 * sync_between_puts_and_gets == 1
 * capacity >= n

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 1 1 1 1000 1000

以下はどれもエラーになる

* putとgetが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 1 1 0 1000 1000

* 複数のputが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 2 1 1 1000 1000

* 複数のgetが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 1 2 1 1000 1000

* capacity < n

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 1 1 1 1000 100

## 3-2. Python の有限バッファ
* APIは
  * bb = bounded_buffer(capacity)
  * bb.put(x)
  * x = bb.get()

以下は未完成版で,
* capacity個のデータが既に保持されているときにputされたら, 0を返す(そうでなければ1を返す)
* 1個もデータがないときにgetされたら, -1を返す (putするデータ(従ってgetされるデータ)は0以上とする)
という動作をする

In [None]:
%%writefile bbuf.py

import sys
import threading
import time

def parallel(f, nthreads):
    """
    #pragma omp parallel に似たもの

    f(0), f(1), ..., f(nthreads - 1) の各々をスレッドで実行
    """
    threads = [threading.Thread(target=f, args=(i, ))
               for i in range(nthreads)]
    for th in threads:
        th.start()
    for th in threads:
        th.join()

def parallel_for(f, a, b, nthreads):
    """
    #pragma omp parallel for に似たもの
    f(a), f(a+1), ..., f(b-1) を nthreads で分割して実行
    """
    def thread_fun(i):
        ai = (a * (nthreads - i)     + b * i) // nthreads
        bi = (a * (nthreads - i - 1) + b * (i + 1)) // nthreads
        for i in range(ai, bi):
            f(i)
    parallel(thread_fun, nthreads)

class bounded_buffer:
    """
    有限バッファ
    要素は必ず >= 0 とする
    """
    def __init__(self, capacity):
        self.a = [0] * capacity
        self.capacity = capacity
        self.n_gets = 0
        self.n_puts = 0
    def put(self, x):
        """
        要素を追加 
        満杯だったら待つようにするのが課題
        以下はそのままだと満杯の場合は0を返す(間違い)
        """
        g = self.n_gets
        p = self.n_puts
        cap = self.capacity
        assert(x >= 0), x
        if p - g >= cap:
            return 0 # NG
        self.a[p % cap] = x
        self.n_puts = p + 1
        return 1 # OK
    def get(self):
        """
        要素を取り出す
        空だったら待つようにするのが課題
        以下はそのままだと空の場合は-1を返す(間違い)
        """
        g = self.n_gets
        p = self.n_puts
        cap = self.capacity
        if p - g <= 0:
            return -1 # 空 
        x = self.a[g % cap]
        self.n_gets = g + 1
        assert(x >= 0), x
        return x

def main():
    i = 1
    argv = sys.argv
    argc = len(argv)
    # putするスレッド数
    n_putter_threads = int(argv[i]) if i < argc else 1
    i += 1
    # getするスレッド数 
    n_getter_threads = int(argv[i]) if i < argc else 1
    i += 1
    # putとgetの間にbarrierを入れるか?
    barrier_between_puts_gets = int(argv[i]) if i < argc else 1
    i += 1
    # put (get)される回数
    n                = int(argv[i]) if i < argc else 1000000
    i += 1
    # 容量
    capacity         = int(argv[i]) if i < argc else 1000
    i += 1
    # 検証用(validate[x] == 1 iff getでxが取り出された)
    validate = [0] * n
    bb = bounded_buffer(capacity)
    nthreads = n_putter_threads + n_getter_threads
    barrier = threading.Barrier(nthreads)

    def thread_fun(idx):
        if idx < n_putter_threads:
            # I am a putter thread
            a = n *  idx      // n_putter_threads
            b = n * (idx + 1) // n_putter_threads
            # 0,1,...,n-1 を1つずつput
            for x in range(a, b):
                ok = bb.put(x)
                assert(ok)
            if barrier_between_puts_gets:
                # putが全員終わってからget
                barrier.wait()
        else:
            idx -= n_putter_threads
            # I am a getter thread
            a = n *  idx      // n_getter_threads
            b = n * (idx + 1) // n_getter_threads
            # 0,1,...,n-1 を1つずつget
            if barrier_between_puts_gets:
                # putが全員終わってからget
                barrier.wait()
            for i in range(a, b):
                x = bb.get()
                assert(x >= 0), x
                assert(x < n), x
                assert(validate[x] == 0), validate[x]
                validate[x] = 1
    t0 = time.time()
    parallel(thread_fun, nthreads)
    t1 = time.time()
    dt = t1 - t0
    print(f"{dt} sec")
    for i in range(n):
        assert(validate[i] == 1), validate[i]
    print("OK")
    return 0

sys.exit(main())


```
python3 bbuf.py put_threads get_threads sync_between_puts_and_gets n capacity
```
 
で起動する. 動作はC版と同じ

同様に, 未完成バージョンは以下の条件であれば動作する
 * put_threads == 1
 * get_threads == 1
 * sync_between_puts_and_gets == 1
 * capacity >= n

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 1 1 1 1000 1000

以下はどれもエラーになる

* putとgetが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 1 1 0 1000 1000

* 複数のputが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 2 1 1 1000 1000

* 複数のgetが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 1 2 1 1000 1000

* capacity < n

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 1 1 1 1000 100

# <font color="green"> Problem 5 :  有限バッファの実装 (C)</font>

* 有限バッファへmutex, 条件変数を導入し, 複数のスレッドが並行に動き, capacity < n であっても正しく動くようにせよ
* 構造は, 両側飽和カウンタとよく似ているので参考にせよ

In [None]:
BEGIN SOLUTION
END SOLUTION
%%writefile bbuf.c
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <omp.h>
#include <pthread.h>

void die(const char * msg) {
  perror(msg); exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_nsec * 1.0E-9 + ts->tv_sec;
}

/* 有限バッファ
   要素は必ず >= 0 とする */
typedef struct {
  long n_gets;                  /* getされた回数 */
  long n_puts;                  /* putされた回数 */
  long capacity;                /* 容量 */
  long * a;                     /* 中身(capacity要素の配列) */
  pthread_mutex_t m[1];
} bounded_buffer_t;

/* 容量 capacity で初期化 */
void bounded_buffer_init(bounded_buffer_t * bb, long capacity) {
  long * a = (long *)malloc(sizeof(long) * capacity);
  if (!a) die("malloc");
  bb->a = a;
  bb->capacity = capacity;
  bb->n_gets = 0;
  bb->n_puts = 0;
  pthread_mutex_init(bb->m, 0);
}

/* 要素を追加 
 * 満杯だったら待つようにするのが課題
 * 以下はそのままだと満杯の場合は0を返す(間違い)
 */
int bounded_buffer_put(bounded_buffer_t * bb, long x) {
  long g = bb->n_gets;
  long p = bb->n_puts;
  long cap = bb->capacity;
  assert(x >= 0);
  if (p - g >= cap) {
    return 0;                   /* NG */
  }
  bb->a[p % cap] = x;
  bb->n_puts = p + 1;
  return 1;                     /* OK */
}

/* 要素を取り出す
 * 空だったら待つようにするのが課題
 * 以下はそのままだと空の場合は-1を返す(間違い)
 */
long bounded_buffer_get(bounded_buffer_t * bb) {
  long g = bb->n_gets;
  long p = bb->n_puts;
  long cap = bb->capacity;
  if (p - g <= 0) {
    return -1;                  /* 空 */
  }
  long x = bb->a[g % cap];
  bb->n_gets = g + 1;
  assert(x >= 0);
  return x;
}

int main(int argc, char ** argv) {
  long i = 1;
  /* putするスレッド数 */
  int n_putter_threads = (argc > i ? atoi(argv[i]) : 1); i++;
  /* getするスレッド数 */
  int n_getter_threads = (argc > i ? atoi(argv[i]) : 1); i++;
  /* putとgetの間にbarrierを入れるか? */
  int barrier_between_puts_gets = (argc > i ? atoi(argv[i]) : 1); i++;
  /* put (get)される回数 */
  long n                = (argc > i ? atol(argv[i]) : 1000000); i++;
  /* 容量 */
  long capacity         = (argc > i ? atol(argv[i]) : 1000); i++;
  /* 検証用(validate[x] == 1 iff getでxが取り出された) */
  char * validate = (char *)calloc(n, 1);

  bounded_buffer_t bb[1];
  bounded_buffer_init(bb, capacity);

  int nthreads = n_putter_threads + n_getter_threads;
  pthread_barrier_t barrier[1];
  pthread_barrier_init(barrier, 0, nthreads);

  double t0 = cur_time();
#pragma omp parallel num_threads(n_putter_threads + n_getter_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_putter_threads) {
      /* I am a putter thread */
      long a = n *  idx      / n_putter_threads;
      long b = n * (idx + 1) / n_putter_threads;
      /* 0,1,...,n-1 を1つずつput */
      for (long x = a; x < b; x++) {
        int ok = bounded_buffer_put(bb, x);
        assert(ok);
      }
      if (barrier_between_puts_gets) {
        /* putが全員終わってからget */
        pthread_barrier_wait(barrier);
      }
    } else {
      /* I am a getter thread */
      idx -= n_putter_threads;
      long a = n *  idx      / n_getter_threads;
      long b = n * (idx + 1) / n_getter_threads;
      if (barrier_between_puts_gets) {
        /* putが全員終わってからget */
        pthread_barrier_wait(barrier);
      }
      /* 合計n回get */
      for (long x = a; x < b; x++) {
        long x = bounded_buffer_get(bb);
        assert(x >= 0);
        assert(x < n);
        assert(validate[x] == 0);
        validate[x] = 1;
      }
    }
  }
  double t1 = cur_time();
  printf("%f sec\n", t1 - t0);
  for (long i = 0; i < n; i++) {
    assert(validate[i]);
  }
  printf("OK\n");
  return 0;
}

In [None]:
BEGIN SOLUTION
END SOLUTION
gcc -Wall -fopenmp -o bbuf bbuf.c

In [None]:
%%writefile bbuf_ans.c
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <omp.h>
#include <pthread.h>

void die(const char * msg) {
  perror(msg); exit(1);
}

double cur_time() {
  struct timespec ts[1];
  clock_gettime(CLOCK_REALTIME, ts);
  return ts->tv_nsec * 1.0E-9 + ts->tv_sec;
}

/* 有限バッファ
   要素は必ず >= 0 とする */
typedef struct {
  long n_gets;                  /* getされた回数 */
  long n_puts;                  /* putされた回数 */
  long capacity;                /* 容量 */
  long * a;                     /* 中身(capacity要素の配列) */
  pthread_mutex_t m[1];
  pthread_cond_t gw[1];          /* get waiters */
  pthread_cond_t pw[1];          /* put waiters */
} bounded_buffer_t;

/* 容量 capacity で初期化 */
void bounded_buffer_init(bounded_buffer_t * bb, long capacity) {
  long * a = (long *)malloc(sizeof(long) * capacity);
  if (!a) die("malloc");
  bb->a = a;
  bb->capacity = capacity;
  bb->n_gets = 0;
  bb->n_puts = 0;
  pthread_mutex_init(bb->m, 0);
  pthread_cond_init(bb->gw, 0);
  pthread_cond_init(bb->pw, 0);
}

/* 要素を追加 
 * 満杯だったら待つようにするのが課題
 * 以下はそのままだと満杯の場合は0を返す(間違い)
 */
int bounded_buffer_put(bounded_buffer_t * bb, long x) {
  long cap = bb->capacity;
  pthread_mutex_lock(bb->m);
  while (1) {
    long g = bb->n_gets;
    long p = bb->n_puts;
    assert(x >= 0);
    if (p - g < cap) {
      break;
    }
    assert(p - g == cap);
    pthread_cond_wait(bb->pw, bb->m);
  }
  long g = bb->n_gets;
  long p = bb->n_puts;
  bb->a[p % cap] = x;
  bb->n_puts = p + 1;
  if (p <= g) {
    assert(p == g);
    pthread_cond_broadcast(bb->gw);
  }
  pthread_mutex_unlock(bb->m);
  return 1;                     /* OK */
}

/* 要素を取り出す
 * 空だったら待つようにするのが課題
 * 以下はそのままだと空の場合は-1を返す(間違い)
 */
long bounded_buffer_get(bounded_buffer_t * bb) {

  long cap = bb->capacity;
  pthread_mutex_lock(bb->m);
  while (1) {
    long g = bb->n_gets;
    long p = bb->n_puts;
    if (p - g > 0) {
      break;
    }
    assert(p - g == 0);
    pthread_cond_wait(bb->gw, bb->m);
  }
  long g = bb->n_gets;
  long p = bb->n_puts;
  long x = bb->a[g % cap];
  bb->n_gets = g + 1;
  assert(x >= 0);
  if (p - g >= cap) {
    assert(p - g == cap);
    pthread_cond_broadcast(bb->pw);
  }
  pthread_mutex_unlock(bb->m);
  return x;
}

int main(int argc, char ** argv) {
  long i = 1;
  /* putするスレッド数 */
  int n_putter_threads = (argc > i ? atoi(argv[i]) : 1); i++;
  /* getするスレッド数 */
  int n_getter_threads = (argc > i ? atoi(argv[i]) : 1); i++;
  /* putとgetの間にbarrierを入れるか? */
  int barrier_between_puts_gets = (argc > i ? atoi(argv[i]) : 1); i++;
  /* put (get)される回数 */
  long n                = (argc > i ? atol(argv[i]) : 1000000); i++;
  /* 容量 */
  long capacity         = (argc > i ? atol(argv[i]) : 1000); i++;
  /* 検証用(validate[x] == 1 iff getでxが取り出された) */
  char * validate = (char *)calloc(n, 1);

  bounded_buffer_t bb[1];
  bounded_buffer_init(bb, capacity);

  int nthreads = n_putter_threads + n_getter_threads;
  pthread_barrier_t barrier[1];
  pthread_barrier_init(barrier, 0, nthreads);

  double t0 = cur_time();
#pragma omp parallel num_threads(n_putter_threads + n_getter_threads)
  {
    int idx = omp_get_thread_num();
    if (idx < n_putter_threads) {
      /* I am a putter thread */
      long a = n *  idx      / n_putter_threads;
      long b = n * (idx + 1) / n_putter_threads;
      /* 0,1,...,n-1 を1つずつput */
      for (long x = a; x < b; x++) {
        int ok = bounded_buffer_put(bb, x);
        assert(ok);
      }
      if (barrier_between_puts_gets) {
        /* putが全員終わってからget */
        pthread_barrier_wait(barrier);
      }
    } else {
      /* I am a getter thread */
      idx -= n_putter_threads;
      long a = n *  idx      / n_getter_threads;
      long b = n * (idx + 1) / n_getter_threads;
      if (barrier_between_puts_gets) {
        /* putが全員終わってからget */
        pthread_barrier_wait(barrier);
      }
      /* 合計n回get */
      for (long x = a; x < b; x++) {
        long x = bounded_buffer_get(bb);
        assert(x >= 0);
        assert(x < n);
        assert(validate[x] == 0);
        validate[x] = 1;
      }
    }
  }
  double t1 = cur_time();
  printf("%f sec\n", t1 - t0);
  for (long i = 0; i < n; i++) {
    assert(validate[i]);
  }
  printf("OK\n");
  return 0;
}

In [None]:
gcc -Wall -fopenmp -o bbuf_ans bbuf_ans.c

* 色々なパラメータで実行してみよ

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 1 1 1 10 10

In [None]:
./bbuf_ans 1 1 1 10 10

* 以下でテストせよ

* putとgetが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 1 1 0 100000 100

In [None]:
./bbuf_ans 1 1 0 100000 100

* 多数のput/getが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 10 20 0 100000 100

In [None]:
./bbuf_ans 10 20 0 100000 100

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 20 10 0 100000 100

In [None]:
./bbuf_ans 20 10 0 100000 100

* 極端にcapacityが少ない

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 20 30 0 100000 1

In [None]:
./bbuf_ans 20 30 0 100000 1

In [None]:
BEGIN SOLUTION
END SOLUTION
./bbuf 30 20 0 100000 1

In [None]:
./bbuf_ans 30 20 0 100000 1

# <font color="green"> Problem 6 :  有限バッファの実装 (Python)</font>

* 有限バッファへmutex, 条件変数を導入し, 複数のスレッドが並行に動き, capacity < n であっても正しく動くようにせよ
* 構造は, 両側飽和カウンタとよく似ているので参考にせよ

In [None]:
BEGIN SOLUTION
END SOLUTION
%%writefile bbuf.py

import sys
import threading
import time

def parallel(f, nthreads):
    """
    #pragma omp parallel に似たもの

    f(0), f(1), ..., f(nthreads - 1) の各々をスレッドで実行
    """
    threads = [threading.Thread(target=f, args=(i, ))
               for i in range(nthreads)]
    for th in threads:
        th.start()
    for th in threads:
        th.join()

def parallel_for(f, a, b, nthreads):
    """
    #pragma omp parallel for に似たもの
    f(a), f(a+1), ..., f(b-1) を nthreads で分割して実行
    """
    def thread_fun(i):
        ai = (a * (nthreads - i)     + b * i) // nthreads
        bi = (a * (nthreads - i - 1) + b * (i + 1)) // nthreads
        for i in range(ai, bi):
            f(i)
    parallel(thread_fun, nthreads)

class bounded_buffer:
    """
    有限バッファ
    要素は必ず >= 0 とする
    """
    def __init__(self, capacity):
        self.a = [0] * capacity
        self.capacity = capacity
        self.n_gets = 0
        self.n_puts = 0
        self.m = threading.Lock()
    def put(self, x):
        """
        要素を追加 
        満杯だったら待つようにするのが課題
        以下はそのままだと満杯の場合は0を返す(間違い)
        """
        g = self.n_gets
        p = self.n_puts
        cap = self.capacity
        assert(x >= 0), x
        if p - g >= cap:
            return 0 # NG
        self.a[p % cap] = x
        self.n_puts = p + 1
        return 1 # OK
    def get(self):
        """
        要素を取り出す
        空だったら待つようにするのが課題
        以下はそのままだと空の場合は-1を返す(間違い)
        """
        g = self.n_gets
        p = self.n_puts
        cap = self.capacity
        if p - g <= 0:
            return -1 # 空 
        x = self.a[g % cap]
        self.n_gets = g + 1
        assert(x >= 0), x
        return x

def main():
    i = 1
    argv = sys.argv
    argc = len(argv)
    # putするスレッド数
    n_putter_threads = int(argv[i]) if i < argc else 1
    i += 1
    # getするスレッド数 
    n_getter_threads = int(argv[i]) if i < argc else 1
    i += 1
    # putとgetの間にbarrierを入れるか?
    barrier_between_puts_gets = int(argv[i]) if i < argc else 1
    i += 1
    # put (get)される回数
    n                = int(argv[i]) if i < argc else 1000000
    i += 1
    # 容量
    capacity         = int(argv[i]) if i < argc else 1000
    i += 1
    # 検証用(validate[x] == 1 iff getでxが取り出された)
    validate = [0] * n
    bb = bounded_buffer(capacity)
    nthreads = n_putter_threads + n_getter_threads
    barrier = threading.Barrier(nthreads)

    def thread_fun(idx):
        if idx < n_putter_threads:
            # I am a putter thread
            a = n *  idx      // n_putter_threads
            b = n * (idx + 1) // n_putter_threads
            # 0,1,...,n-1 を1つずつput
            for x in range(a, b):
                ok = bb.put(x)
                assert(ok)
            if barrier_between_puts_gets:
                # putが全員終わってからget
                barrier.wait()
        else:
            idx -= n_putter_threads
            # I am a getter thread
            a = n *  idx      // n_getter_threads
            b = n * (idx + 1) // n_getter_threads
            # 0,1,...,n-1 を1つずつget
            if barrier_between_puts_gets:
                # putが全員終わってからget
                barrier.wait()
            for i in range(a, b):
                x = bb.get()
                assert(x >= 0), x
                assert(x < n), x
                assert(validate[x] == 0), validate[x]
                validate[x] = 1
    t0 = time.time()
    parallel(thread_fun, nthreads)
    t1 = time.time()
    dt = t1 - t0
    print(f"{dt} sec")
    for i in range(n):
        assert(validate[i] == 1), validate[i]
    print("OK")
    return 0

sys.exit(main())


In [None]:
%%writefile bbuf_ans.py

import sys
import threading
import time

def parallel(f, nthreads):
    """
    #pragma omp parallel に似たもの

    f(0), f(1), ..., f(nthreads - 1) の各々をスレッドで実行
    """
    threads = [threading.Thread(target=f, args=(i, ))
               for i in range(nthreads)]
    for th in threads:
        th.start()
    for th in threads:
        th.join()

def parallel_for(f, a, b, nthreads):
    """
    #pragma omp parallel for に似たもの
    f(a), f(a+1), ..., f(b-1) を nthreads で分割して実行
    """
    def thread_fun(i):
        ai = (a * (nthreads - i)     + b * i) // nthreads
        bi = (a * (nthreads - i - 1) + b * (i + 1)) // nthreads
        for i in range(ai, bi):
            f(i)
    parallel(thread_fun, nthreads)

class bounded_buffer:
    """
    有限バッファ
    要素は必ず >= 0 とする
    """
    def __init__(self, capacity):
        self.a = [0] * capacity
        self.capacity = capacity
        self.n_gets = 0
        self.n_puts = 0
        self.m = threading.Lock()
        self.gw = threading.Condition(self.m)
        self.pw = threading.Condition(self.m)
    def put(self, x):
        """
        要素を追加 
        満杯だったら待つようにするのが課題
        以下はそのままだと満杯の場合は0を返す(間違い)
        """
        cap = self.capacity
        self.m.acquire()
        while 1:
            g = self.n_gets
            p = self.n_puts
            if p - g < cap:
                break
            assert(p - g == cap), (p, g, p - g, cap)
            self.pw.wait()
        self.a[p % cap] = x
        self.n_puts = p + 1
        if p <= g:
            assert(p == g), (p, g)
            self.gw.notify_all()
        self.m.release()
        return 1 # OK 
    def get(self):
        """
        要素を取り出す
        空だったら待つようにするのが課題
        以下はそのままだと空の場合は-1を返す(間違い)
        """
        cap = self.capacity
        self.m.acquire()
        while 1:
            g = self.n_gets
            p = self.n_puts
            if p - g > 0:
                break
            assert(p - g == 0), (p, g)
            self.gw.wait()
        x = self.a[g % cap]
        self.n_gets = g + 1
        assert(x >= 0), x
        if p - g >= cap:
            assert(p - g == cap), (p, g, p - g, cap)
            self.pw.notify_all()
        self.m.release()
        return x

def main():
    i = 1
    argv = sys.argv
    argc = len(argv)
    # putするスレッド数
    n_putter_threads = int(argv[i]) if i < argc else 1
    i += 1
    # getするスレッド数 
    n_getter_threads = int(argv[i]) if i < argc else 1
    i += 1
    # putとgetの間にbarrierを入れるか?
    barrier_between_puts_gets = int(argv[i]) if i < argc else 1
    i += 1
    # put (get)される回数
    n                = int(argv[i]) if i < argc else 1000000
    i += 1
    # 容量
    capacity         = int(argv[i]) if i < argc else 1000
    i += 1
    # 検証用(validate[x] == 1 iff getでxが取り出された)
    validate = [0] * n
    bb = bounded_buffer(capacity)
    nthreads = n_putter_threads + n_getter_threads
    barrier = threading.Barrier(nthreads)

    def thread_fun(idx):
        if idx < n_putter_threads:
            # I am a putter thread
            a = n *  idx      // n_putter_threads
            b = n * (idx + 1) // n_putter_threads
            # 0,1,...,n-1 を1つずつput
            for x in range(a, b):
                ok = bb.put(x)
                assert(ok)
            if barrier_between_puts_gets:
                # putが全員終わってからget
                barrier.wait()
        else:
            idx -= n_putter_threads
            # I am a getter thread
            a = n *  idx      // n_getter_threads
            b = n * (idx + 1) // n_getter_threads
            # 0,1,...,n-1 を1つずつget
            if barrier_between_puts_gets:
                # putが全員終わってからget
                barrier.wait()
            for i in range(a, b):
                x = bb.get()
                assert(x >= 0), x
                assert(x < n), x
                assert(validate[x] == 0), validate[x]
                validate[x] = 1
    t0 = time.time()
    parallel(thread_fun, nthreads)
    t1 = time.time()
    dt = t1 - t0
    print(f"{dt} sec")
    for i in range(n):
        assert(validate[i] == 1), validate[i]
    print("OK")
    return 0

sys.exit(main())


* 色々なパラメータで実行してみよ

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 1 1 1 10 10

In [None]:
python3 bbuf_ans.py 1 1 1 10 10

* 以下でテストせよ

* putとgetが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py  1 1 0 100000 100

In [None]:
python3 bbuf_ans.py 1 1 0 100000 100

* 多数のput/getが並行

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py  10 20 0 100000 100

In [None]:
python3 bbuf_ans.py 10 20 0 100000 100

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 20 10 0 100000 100

In [None]:
python3 bbuf_ans.py 20 10 0 100000 100

* 極端にcapacityが少ない

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py  20 30 0 100000 1

In [None]:
python3 bbuf_ans.py 20 30 0 100000 1

In [None]:
BEGIN SOLUTION
END SOLUTION
python3 bbuf.py 30 20 0 100000 1

In [None]:
python3 bbuf_ans.py 30 20 0 100000 1