In [1]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import pathlib
import random
import string
import re
import numpy as np

import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization

In [2]:
import keras.ops as ops


class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "dense_dim": self.dense_dim,
                "num_heads": self.num_heads,
            }
        )
        return config


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(latent_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        inputs, encoder_outputs = inputs
        causal_mask = self.get_causal_attention_mask(inputs)

        if mask is None:
            inputs_padding_mask, encoder_outputs_padding_mask = None, None
        else:
            inputs_padding_mask, encoder_outputs_padding_mask = mask

        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask,
            query_mask=inputs_padding_mask,
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            query_mask=inputs_padding_mask,
            key_mask=encoder_outputs_padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = ops.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = ops.arange(sequence_length)[:, None]
        j = ops.arange(sequence_length)
        mask = ops.cast(i >= j, dtype="int32")
        mask = ops.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = ops.concatenate(
            [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])],
            axis=0,
        )
        return ops.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "latent_dim": self.latent_dim,
                "num_heads": self.num_heads,
            }
        )
        return config


In [3]:
from keras.saving import register_keras_serializable



vocab_size = 50000
sequence_length = 30
batch_size = 64

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")



@keras.saving.register_keras_serializable()
def custom_standardization(input_string):
    lowercase = tf_strings.lower(input_string)
    return tf_strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


In [5]:
loaded_transformer = keras.models.load_model(r"D:\Ardunio_Ide\Nhung\Full\transformer_model.keras", custom_objects={
    "PositionalEmbedding": PositionalEmbedding,
    "TransformerEncoder": TransformerEncoder,
    "TransformerDecoder": TransformerDecoder,
})



import pickle
from keras.layers import TextVectorization

# Đọc từ file
with open(r"D:\Ardunio_Ide\Nhung\Full\eng_vocab.pkl", "rb") as f:
    eng_vocab = pickle.load(f)

with open(r"D:\Ardunio_Ide\Nhung\Full\vi_vocab.pkl", "rb") as f:
    vi_vocab = pickle.load(f)
vi_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
vi_vectorization.set_vocabulary(vi_vocab)
eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
eng_vectorization.set_vocabulary(eng_vocab)







In [6]:
loaded_transformer.summary()

In [7]:
eng_vocab = eng_vectorization.get_vocabulary()
eng_index_lookup = dict(zip(range(len(eng_vocab)), eng_vocab))
max_decoded_sentence_length = 30


def decode_sequence(input_sentence):
    tokenized_input_sentence = vi_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = eng_vectorization([decoded_sentence])[:, :-1]
        predictions = loaded_transformer(
            {
                "encoder_inputs": tokenized_input_sentence,
                "decoder_inputs": tokenized_target_sentence,
            }
        )

        # ops.argmax(predictions[0, i, :]) is not a concrete value for jax here
        sampled_token_index = ops.convert_to_numpy(
            ops.argmax(predictions[0, i, :])
        ).item(0)
        sampled_token = eng_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence



In [8]:
response  = decode_sequence("Tải âm thanh vào bộ nhớ ")

response = response.replace("[start] ","")
response = response.replace(" [end]","")
response= response.replace(" m ", "'m ")
response= response.replace(" s ", "'s ")
response=response.replace(" d ", "'d ")
response

'load the sound in your memory'

In [17]:
import speech_recognition as sr
from gtts import gTTS
import io

import pygame


def text2speech(text):
    # Tạo âm thanh bằng gTTS
    tts = gTTS(text, lang='en')

    # Tải âm thanh vào bộ nhớ (BytesIO)
    mp3_fp = io.BytesIO()
    tts.write_to_fp(mp3_fp)
    mp3_fp.seek(0)

    # Phát âm thanh với pygame
    pygame.mixer.init()
    pygame.mixer.music.load(mp3_fp, 'mp3')
    pygame.mixer.music.play()
    # Chờ đến khi âm thanh phát xong
    while pygame.mixer.music.get_busy(): 
        pygame.time.Clock().tick(10)  

if __name__ == '__main__':
   

    recognizer = sr.Recognizer()
    # mic = sr.Microphone()

    print('=====================start===========')
    # while(True):
    print('======================recording===========================')
    # with mic as source:
        # recognizer.adjust_for_ambient_noise(source, duration=1)
        # audio = recognizer.listen(source,timeout=None, phrase_time_limit=5)
    # Chuyển đổi âm thanh thành văn bản
    with sr.AudioFile("output.wav") as source:
        audio_data = recognizer.record(source)  # Đọc toàn bộ file WAV

    try:
        # Chuyển âm thanh thành văn bản (hỗ trợ tiếng Việt)
        text = recognizer.recognize_google(audio_data, language='vi-VN')
        # if(text.lower().find('stop')>=0 or text.lower().find('dừng')>=0): break
        print("Bạn vừa nói:", text)
        
        response = decode_sequence(text)
        response = response.replace("[start] ","")
        response = response.replace(" [end]","")
        response = response.replace("[start]","")
        response = response.replace("[end]","")
        response= response.replace(" m ", "'m ")
        response= response.replace(" s ", "'s ")
        response=response.replace(" d ", "'d ")
        response=response.replace(" re ", "'re ")
        # if response=="": continue
        print(response)
        text2speech(response)
    except sr.UnknownValueError:
        print("Không nhận dạng được giọng nói.")
    except sr.RequestError as e:
        print(f"Lỗi kết nối: {e}")


Bạn vừa nói: Xin chào các bạn Xin chào các bạn Xin chào các bạn
hi guys come on guys
