In [1]:
import whisper
import torch
from pydub import AudioSegment
import os
import base64
from io import BytesIO
import numpy
import struct

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 메인 모델
model = whisper.load_model("medium")
model = model.to(device)

# 작은 모델
# small_model = whisper.load_model("tiny")
# small_model = small_model.to(device)

In [3]:
file_path = "/home/jongho/바탕화면/테스트 음성파일/"
file_name = "연제리2.wav"

encode_audio = base64.b64encode(open(f"{file_path}{file_name}", "rb").read()).decode('utf-8') 

# result = small_model.transcribe(f"{file_path}{file_name}" , fp16=False)
# print(result['text'])

In [4]:
def extract_wav_header(binary_data):
    # WAV 파일 헤더 구조에 맞게 필드를 읽음
  
    chunk_id = binary_data[:4]
    chunk_size = struct.unpack('<I', binary_data[4:8])[0]
    format_str = binary_data[8:12]
    subchunk1_id = binary_data[12:16]
    subchunk1_size = struct.unpack('<I', binary_data[16:20])[0]
    audio_format = struct.unpack('<H', binary_data[20:22])[0]
    num_channels = struct.unpack('<H', binary_data[22:24])[0]
    sample_rate = struct.unpack('<I', binary_data[24:28])[0]
    byte_rate = struct.unpack('<I', binary_data[28:32])[0]
    block_align = struct.unpack('<H', binary_data[32:34])[0]
    bits_per_sample = struct.unpack('<H', binary_data[34:36])[0]
    subchunk2_id = binary_data[36:40]
    subchunk2_size = struct.unpack('<I', binary_data[40:44])[0]
    

    header = {"Chunk_ID": chunk_id,
      "Chunk_Size" : chunk_size,
      "Format" : format_str,
      "Subchunk1_ID" : subchunk1_id,
      "Subchunk1_Size" : subchunk1_size,
      "Audio_Format" : audio_format,
      "Num_Channels" : num_channels,
      "Sample_Rate" : sample_rate,
      "Byte_Rate" : byte_rate,
      "Block_Align" : block_align,
      "Bits_Per_Sample" : bits_per_sample,
      "Subchunk2_ID" : subchunk2_id,
      "Subchunk2_Size" : subchunk2_size}
    return header 

def make_header(wav_header, input_data) :
    chunk_id = wav_header['Chunk_ID']
    chunk_size = len(input_data) + 44  # 데이터 크기에 추가 정보 크기(46바이트)를 더함
    format_str = wav_header['Format']
    subchunk1_id = wav_header['Subchunk1_ID']
    subchunk1_size = wav_header['Subchunk1_Size']  # 일반적인 값을 사용함
    audio_format = wav_header['Audio_Format'] # PCM 포맷
    num_channels = wav_header['Num_Channels']  # 스테레오
    sample_rate = wav_header['Sample_Rate']  # 예시로 44100Hz
    byte_rate = wav_header['Byte_Rate']
    block_align = wav_header['Block_Align']    # 채널 수 * 샘플 크기(2바이트)
    bits_per_sample = wav_header['Bits_Per_Sample']   # 16비트 PCM 사용
    subchunk2_id =  b'data'
    subchunk2_size = len(input_data) + 44  # 데이터 크기

    wav_header = struct.pack('<4sI4s4sIHHIIHH4sI', chunk_id, chunk_size, format_str, subchunk1_id,
                            subchunk1_size, audio_format, num_channels, sample_rate, byte_rate,
                            block_align, bits_per_sample, subchunk2_id, subchunk2_size)
    
    return wav_header

In [5]:
# 웹 소켓을 통해서 base64 데이터를 수신 받았을때
# 바이너리 데이터로 변환
decoded_audio = base64.b64decode(encode_audio)

# 오디오 데이터에서 헤더정보 추출
BINARY_DATA = decoded_audio[:44]
audio_signal_data = decoded_audio[44:]

wav_header = extract_wav_header(BINARY_DATA)

print("음성파일 헤더 정보 :",wav_header)

음성파일 헤더 정보 : {'Chunk_ID': b'RIFF', 'Chunk_Size': 543172920, 'Format': b'WAVE', 'Subchunk1_ID': b'fmt ', 'Subchunk1_Size': 16, 'Audio_Format': 1, 'Num_Channels': 2, 'Sample_Rate': 44100, 'Byte_Rate': 176400, 'Block_Align': 4, 'Bits_Per_Sample': 16, 'Subchunk2_ID': b'LIST', 'Subchunk2_Size': 80}


In [6]:
# 분할 사이즈
SPLIT_SIZE = 10000000 # 10MB
# 기본 경로
BASE_PATH = "/home/jongho/Spring_project/CJU_STT/CJU_Web_Application_Services/src/main/webapp/resources/python/result"
# 분할 바이너리 데이터 리스트
split_data = []

for i in range(0, len(audio_signal_data), SPLIT_SIZE):
    split_data.append(audio_signal_data[i:i+SPLIT_SIZE])

print(f"바이너리 데이터 분할 완료 : {len(split_data)}개")

바이너리 데이터 분할 완료 : 55개


In [15]:
i = 20
import numpy as np
import torch
import io

# WAV 파일 헤더 정보 생성
new_header = make_header(wav_header, split_data[i])
audio_with_header = new_header + split_data[i]

# with open(file="./check.txt", mode="w+") as f :
#     f.write(str(audio_with_header))

# audio_array = np.frombuffer(audio_with_header, dtype=np.float32)
# audio_tensor = torch.tensor(audio_array, dtype=torch.float32)

# # STT 실행
# result = model.transcribe(audio_tensor,fp16=False)
# print(result['text'])



In [9]:
# for i in range(len(split_data)) :

#     # WAV 파일 헤더 정보 생성
#     new_header = make_header(wav_header, split_data[i])
#     audio_with_header = new_header + split_data[i]

#     # Whiper 입력 데이터 형식으로 변환
#     audio = AudioSegment.from_file(BytesIO(audio_with_header), format="wav")
#     data = numpy.array(audio.get_array_of_samples())
#     audio_float32 = data.astype('float32') / 32768.0

#     # STT 실행
#     result = model.transcribe(audio_float32, fp16=False)
#     model.l

#     print(result['text'])

In [None]:
# for audio_file_name in work_list :
#     save_path = "/home/jongho/Spring_project/CJU_STT/CJU_Web_Application_Services/src/main/webapp/resources/python/result"
#     target_audio_path = f"{save_path}/{audio_file_name}"
#     print(target_audio_path)
#     result = model.transcribe(target_audio_path, fp16=False)
#     print(result['text'])