In [31]:
import pyaudio
import usb.core
import usb.util
import usb.control
import numpy as np
import json
import os
from lib.mic_array import MicArray
from lib.usb_pixel_ring_v2 import PixelRing
from lib.tuning import Tuning
import time
import concurrent.futures
from datetime import datetime
import soundfile as sf
import math

In [2]:
def getDeviceIndex():
  # constants
  DEVICE_NAME = 'ReSpeaker 4 Mic Array (UAC1.0)'

  device_idx = []
  p = pyaudio.PyAudio()
  info = p.get_host_api_info_by_index(0)
  numdevices = info.get('deviceCount')

  # respeakerのデバイスインデックスを取得
  for i in range(0, numdevices):
    device = p.get_device_info_by_host_api_device_index(0, i)
    if (device.get('maxInputChannels')) > 0 and (device.get('name')==DEVICE_NAME):
      device_idx.append(i)
      print("Input Device id ", i, " - ", device.get('name'))
  
  return device_idx

def recordParallel(mic_array: MicArray, fname=None):
  mic_array.record(fname=fname)

def recAndGetDoa(rec_dev: MicArray, doa_dev: Tuning, fname=None):
  # 録音
  rec_dev.record(fname=fname)
  # DoA取得
  doa = doa_dev.direction
  return doa

# インデックスを確認

# 録音

In [4]:
dev_idx = getDeviceIndex()
mic_array = MicArray(dev_idx[0])
mic_array.record(2)

Device id: 1
* recording
* done recording


## 複数マイクロホンアレイを同時録音
1. 取得した音声とマイクの対応付
1. DoAとマイクの対応付

In [None]:
dev_idx = getDeviceIndex()
if dev_idx:
  mic_arrays = [MicArray(idx) for idx in dev_idx]

  # 並列処理
  # with concurrent.futures.ThreadPoolExecutor(max_workers=len(mic_arrays)) as executor:
  # with concurrent.futures.ProcessPoolExecutor() as executor:
  with concurrent.futures.ThreadPoolExecutor() as executor:
    time.sleep(1)
    executor.map(recordParallel, mic_arrays)

# LED

In [3]:
# デバイスを取得
VID = 0x2886
PID = 0x0018

devs = usb.core.find(idVendor=VID, idProduct=PID, find_all=True)
devs = list(devs)
print(devs)
# if devs:
#   for dev in devs:
#     print(dev)

[<DEVICE ID 2886:0018 on Bus 020 Address 020>, <DEVICE ID 2886:0018 on Bus 020 Address 019>, <DEVICE ID 2886:0018 on Bus 020 Address 018>, <DEVICE ID 2886:0018 on Bus 020 Address 017>, <DEVICE ID 2886:0018 on Bus 020 Address 014>]


In [44]:
# 全てのデバイスをオフにする
for dev in devs:
  pixel_ring = PixelRing(dev)
  pixel_ring.wakeup(180)
  pixel_ring.off()

In [4]:
# 特定のデバイスのLEDをつける
dev = devs[0]
pixel_ring = PixelRing(dev)
pixel_ring.think()
time.sleep(5)
pixel_ring.off()

In [38]:
VID = 0x2886
PID = 0x0018

devs = usb.core.find(idVendor=VID, idProduct=PID, find_all=True)
if devs:
  for dev in devs:
    pixel_ring = PixelRing(dev)

    pixel_ring.wakeup(180)
    time.sleep(3)
    pixel_ring.listen()
    time.sleep(3)
    pixel_ring.think()
    time.sleep(3)
    pixel_ring.set_volume(8)
    time.sleep(3)
    pixel_ring.off()
    time.sleep(3)

# DoA

In [None]:
VID = 0x2886
PID = 0x0018

devs = usb.core.find(idVendor=VID, idProduct=PID, find_all=True)
if devs:
  for dev in devs:
    mic_tuning = Tuning(dev)
    while True:
      try:
        print(mic_tuning.direction)
        time.sleep(1)
      except KeyboardInterrupt:
        break

# 実験

## デバイスの捜索

In [3]:
# 保存先のディレクトリを作成
n_array = 4 # アレイの数
current_datetime = datetime.now() # 日付
formatted_datetime = current_datetime.strftime("%Y%m%d")

OUT_DIR = f'/Users/toranosuke/Desktop/experiment/result/{formatted_datetime}/narray_{n_array}/'
if os.path.exists(OUT_DIR):
  raise ValueError("OUT_DIR already exists")
os.makedirs(OUT_DIR, exist_ok=True)

# デバイスを取得
## 録音用
dev_idx = getDeviceIndex()
if dev_idx:
  rec_devs = [MicArray(idx, out_dir_path=OUT_DIR) for idx in dev_idx]
## DOA用
VID = 0x2886
PID = 0x0018
usb_devs = usb.core.find(idVendor=VID, idProduct=PID, find_all=True)
usb_devs = list(usb_devs)
doa_devs = [Tuning(dev) for dev in usb_devs]

assert len(rec_devs) == len(doa_devs), 'The number of devices is different'

Input Device id  0  -  ReSpeaker 4 Mic Array (UAC1.0)
Input Device id  1  -  ReSpeaker 4 Mic Array (UAC1.0)
Input Device id  2  -  ReSpeaker 4 Mic Array (UAC1.0)
Input Device id  3  -  ReSpeaker 4 Mic Array (UAC1.0)


## 録音、DoAのインデックスの対応付

In [7]:
# 全録音デバイスで録音
if dev_idx:
  mic_arrays = [MicArray(idx) for idx in dev_idx]

  # 並列処理
  with concurrent.futures.ThreadPoolExecutor() as executor:
    time.sleep(1)
    executor.map(recordParallel, mic_arrays)

[Device ID:0] * recording
[Device ID:1] * recording
[Device ID:2] * recording
[Device ID:3] * recording
[Device ID:0] * done recording
[Device ID:1] * done recording
[Device ID:2] * done recording
[Device ID:3] * done recording


In [12]:
# 特定のデバイスのLEDをつける
dev = usb_devs[3]
pixel_ring = PixelRing(dev)
pixel_ring.think()
time.sleep(5)
pixel_ring.off()

In [13]:
array_idx_info = [
  {'array_id': 1, 'i_rec': 0, 'i_doa': 0},
  {'array_id': 2, 'i_rec': 2, 'i_doa': 1},
  {'array_id': 3, 'i_rec': 3, 'i_doa': 3},
  {'array_id': 4, 'i_rec': 1, 'i_doa': 2},
]

## 録音

In [38]:
trial_no = 9 # 試行回数

# array id順にデバイスを並び替え
rec_devs_order_by_array_id = [rec_devs[info['i_rec']] for info in array_idx_info]
doa_devs_order_by_array_id = [doa_devs[info['i_doa']] for info in array_idx_info]

# ファイル名
wav_fnames = [f'mic-{info["array_id"]}_trial-{trial_no}' for info in array_idx_info]

# 並列処理
with concurrent.futures.ThreadPoolExecutor() as executor:
  time.sleep(1.5)
  doa_rets = executor.map(recAndGetDoa, rec_devs_order_by_array_id, doa_devs_order_by_array_id, wav_fnames)

# doa情報や各種メタ情報を保存
source_loc = np.r_[0, 0]
d = 1.0 # [m] 
array_interval_deg = 360/n_array

# マイクロホンアレイの中心座標を計算
array_locs = np.zeros((2, n_array))
for i in range(n_array):
  array_rad = math.radians(array_interval_deg * i)
  array_loc = np.r_[d * math.cos(array_rad), d * math.sin(array_rad)] + source_loc
  array_locs[:, i] = array_loc.T

json_fname = f'trial-{trial_no}'
result_info = {
  'source': {
    'azimuth': 0,
    'source_loc': source_loc.tolist(),
  },
  'mic': {
    'n_array': n_array,
    'n_ch': 4,
    'array_info': array_idx_info,
    'doa': list(doa_rets),
    'array_interval_deg': array_interval_deg,
    'src_array_dist': d,
    'array_loc': array_locs.tolist()
  },
  'trial': trial_no,
}
with open(OUT_DIR+json_fname+'.json', 'w') as f:
  json.dump(result_info, f, indent=2)
print(OUT_DIR+json_fname+'.json')

[[ 1.0000000e+00  6.1232340e-17 -1.0000000e+00 -1.8369702e-16]
 [ 0.0000000e+00  1.0000000e+00  1.2246468e-16 -1.0000000e+00]]


In [34]:
np.c_[1,2]

array([[1, 2]])

# マイクキャリブレーション

## キャリブレーション用音源

In [27]:
import numpy as np
import wave

# Set the parameters
frequency = 16000
duration = 180  # in seconds
sampling_rate = 44100
amplitude = 32767

# Generate the sine wave
samples = (np.sin(2*np.pi*np.arange(sampling_rate*duration)*frequency/sampling_rate)).astype(np.float32)

# Save the sine wave as a wav file
with wave.open(f'/Users/toranosuke/Desktop/experiment/array_calibration/sound_src/sine_wave_{frequency}hz.wav', 'wb') as f:
  f.setnchannels(1)
  f.setsampwidth(2)
  f.setframerate(sampling_rate)
  f.writeframes((samples*amplitude).astype(np.int16))


## 校正音源の録音

In [3]:
dev_idx = getDeviceIndex()

Input Device id  0  -  ReSpeaker 4 Mic Array (UAC1.0)
Input Device id  1  -  ReSpeaker 4 Mic Array (UAC1.0)


### インデックス対応

In [4]:
if dev_idx:
  mic_arrays = [MicArray(idx) for idx in dev_idx]

  # 並列処理
  with concurrent.futures.ThreadPoolExecutor() as executor:
    time.sleep(1)
    executor.map(recordParallel, mic_arrays)

[Device ID:0] * recording
[Device ID:1] * recording
[Device ID:0] * done recording
[Device ID:1] * done recording


### 録音

In [10]:
ref_array_idx = 1
test_array_idx = 5
OUT_DIR = f'/Users/toranosuke/Desktop/array_calibration/result/ref_{ref_array_idx}_test_{test_array_idx}/'
os.makedirs(OUT_DIR, exist_ok=True)

if dev_idx:
  mic_arrays = [MicArray(idx, out_dir_path=OUT_DIR) for idx in dev_idx]

trial_num = 3
for i in range(trial_num):
  trial = i+1
  # src_hz = 125
  # src_hz = 250
  # src_hz = 500
  # src_hz = 1000
  src_hz = 2000

  if dev_idx:
    fnames = [f'mic-{test_array_idx}_f-{src_hz}_trial-{trial}', 
              f'mic-{ref_array_idx}_f-{src_hz}_trial-{trial}']

    # 並列処理
    with concurrent.futures.ThreadPoolExecutor() as executor:
      time.sleep(2)
      executor.map(recordParallel, mic_arrays, fnames)
    print(fnames)

[Device ID:0] * recording
[Device ID:1] * recording
[Device ID:0] * done recording
[Device ID:1] * done recording
['mic-5_f-2000_trial-1', 'mic-1_f-2000_trial-1']
[Device ID:0] * recording
[Device ID:1] * recording
[Device ID:0] * done recording
[Device ID:1] * done recording
['mic-5_f-2000_trial-2', 'mic-1_f-2000_trial-2']
[Device ID:0] * recording
[Device ID:1] * recording
[Device ID:0] * done recording
[Device ID:1] * done recording
['mic-5_f-2000_trial-3', 'mic-1_f-2000_trial-3']


## 増幅率を計算

In [15]:
def calcSPL(sigs, fs, s_time=-1, e_time=-1):
  assert s_time <= e_time, 's_time must be smaller than e_time'
  if s_time == -1 and e_time == -1:
    s_idx = 0
    e_idx = sigs.shape[0]
  else:
    s_idx = s_time * fs
    e_idx = e_time * fs

  L = e_idx - s_idx
  # I = np.sum(sigs**2, axis=0) / L
  # I0 = 10**(-12)
  # spl = 10 * np.log10(I / I0)
  p_sqr = np.sum(sigs**2, axis=0) / L
  p0 = 2*10**(-5)
  spl = 10 * np.log10(p_sqr / p0**2)
  return spl

In [24]:
WAV_PATH = '/Users/toranosuke/Desktop/experiment/array_calibration/result/'

src_hz = [125, 250, 500, 1000, 2000]
ref_array_idx = 1
# test_array_idxs = [2, 3, 4, 5]
test_array_idxs = [2]
trial = [1, 2, 3]

for hz in src_hz:
  for test_array_idx in test_array_idxs:
    dir_name = f'ref_{ref_array_idx}_test_{test_array_idx}/'
    gains = []
    for t in trial:
      ref_fname = f'mic-{ref_array_idx}_f-{hz}_trial-{t}.wav'
      test_fname = f'mic-{test_array_idx}_f-{hz}_trial-{t}.wav'
      ref_sig, ref_fs = sf.read(WAV_PATH + dir_name + ref_fname)
      test_sig, test_fs = sf.read(WAV_PATH + dir_name + test_fname)
      
      ref_spl = calcSPL(ref_sig[:, 1:5], ref_fs)
      ref_spl = np.mean(ref_spl) # 4chの平均
      test_spl = calcSPL(test_sig[:, 1:5], test_fs)
      test_spl = np.mean(test_spl)
            
      gain = test_spl - ref_spl
      gains.append(gain)
    gains = np.array(gains)
    gain_mean = np.mean(gains, axis=0)
    gain_std = np.std(gains, axis=0)
    print(f'hz: {hz}, test_array_idx: {test_array_idx}')
    print(f'mean: {gain_mean}, std: {gain_std}')
    print()

hz: 125, test_array_idx: 2
mean: 0.15278159703399533, std: 0.00034202965646674033

hz: 250, test_array_idx: 2
mean: 0.1061759472618841, std: 0.00594778356137463

hz: 500, test_array_idx: 2
mean: 1.802301123762696, std: 0.007895778587487122

hz: 1000, test_array_idx: 2
mean: 0.08409750345825746, std: 0.008427155730253974

hz: 2000, test_array_idx: 2
mean: 1.2889425454682548, std: 0.0020486113454129375



In [None]:
WAV_PATH = '/Users/toranosuke/Desktop/experiment/array_calibration/result/'

src_hz = [125, 250, 500, 1000, 2000]
ref_array_idx = 1
# test_array_idxs = [2, 3, 4, 5]
test_array_idxs = [5]
trial = [1, 2, 3]

for hz in src_hz:
  for test_array_idx in test_array_idxs:
    dir_name = f'ref_{ref_array_idx}_test_{test_array_idx}/'
    gains = []
    for t in trial:
      ref_fname = f'mic-{ref_array_idx}_f-{hz}_trial-{t}.wav'
      test_fname = f'mic-{test_array_idx}_f-{hz}_trial-{t}.wav'
      ref_sig, ref_fs = sf.read(WAV_PATH + dir_name + ref_fname)
      test_sig, test_fs = sf.read(WAV_PATH + dir_name + test_fname)
      
      ref_spl = calcSPL(ref_sig[:, 1:5], ref_fs)
      ref_spl = np.mean(ref_spl) # 4chの平均
      test_spl = calcSPL(test_sig[:, 1:5], test_fs)
      test_spl = np.mean(test_spl)
            
      gain = test_spl - ref_spl
      gains.append(gain)
    gains = np.array(gains)
    gain_mean = np.mean(gains, axis=0)
    gain_std = np.std(gains, axis=0)
    print(f'hz: {hz}, test_array_idx: {test_array_idx}')
    print(f'mean: {gain_mean}, std: {gain_std}')
    print()

# JSONに情報を追加する

In [44]:
n_arrays = [3, 4, 5]
source_loc = np.r_[0, 0]
d = 1.0 # [m] 

for n_array in n_arrays:
  array_interval_deg = 360/n_array
  # マイクロホンアレイの中心座標を計算
  array_locs = np.zeros((2, n_array))
  for i in range(n_array):
    array_rad = math.radians(array_interval_deg * i)
    array_loc = np.r_[d * math.cos(array_rad), d * math.sin(array_rad)] + source_loc
    array_locs[:, i] = array_loc.T
  
  json_dir = f'/Users/toranosuke/Desktop/experiment/result/20231121/narray_{n_array}/'
  files = os.listdir(json_dir)

  # jsonファイルを読み込む
  for file in files:
    if file.endswith('.json'):
      json_path = os.path.join(json_dir, file)
      with open(json_path, 'r') as f:
        meta_tmp = json.load(f)
      
      meta = {
        'source': {
          'azimuth': 0,
          'source_loc': source_loc.tolist(),
        },
        'mic': {
          'n_array': meta_tmp['n_array'],
          'n_ch': 4,
          'array_info': meta_tmp['array_info'],
          'doa': meta_tmp['doa'],
          'array_interval_deg': array_interval_deg,
          'src_array_dist': d,
          'array_loc': array_locs.tolist()
        },
        'trial': meta_tmp['trial'],
      }

      with open(json_path, 'w') as f:
        json.dump(meta, f, indent=2)