In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.legacy.data import Field, BucketIterator
from torchtext.legacy.datasets import Multi30k
import spacy
import random
import speech_recognition as sr
import cv2
import pytesseract
from transformers import MarianMTModel, MarianTokenizer
from sacrebleu import corpus_bleu
from tqdm import tqdm

In [None]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load language models for tokenization
spacy_german = spacy.load('de_core_news_sm')
spacy_english = spacy.load('en_core_web_sm')

def tokenize_german(text):
    """Tokenize German text."""
    return [token.text for token in spacy_german.tokenizer(text)]

def tokenize_english(text):
    """Tokenize English text."""
    return [token.text for token in spacy_english.tokenizer(text)]

# Set up fields for text data
SOURCE = Field(tokenize=tokenize_german, init_token='<sos>', eos_token='<eos>', lower=True)
TARGET = Field(tokenize=tokenize_english, init_token='<sos>', eos_token='<eos>', lower=True)

# Load and split the dataset
train_data, validation_data, test_data = Multi30k.splits(exts=('.de', '.en'), fields=(SOURCE, TARGET))

# Build vocabularies
SOURCE.build_vocab(train_data, min_freq=2)
TARGET.build_vocab(train_data, min_freq=2)

# Create data iterators
BATCH_SIZE = 32
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, validation_data, test_data),
    batch_size=BATCH_SIZE,
    device=device
)


In [None]:

class Encoder(nn.Module):
    """Encodes the input sequence."""
    def __init__(self, input_dim, embed_dim, hidden_dim, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, num_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        _, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

class Decoder(nn.Module):
    """Decodes the encoded sequence."""
    def __init__(self, output_dim, embed_dim, hidden_dim, num_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, num_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

class Seq2SeqTranslator(nn.Module):
    """Sequence-to-sequence model for translation."""
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        
        input = trg[0,:]
        
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1
        
        return outputs


In [None]:

# Set up model parameters
INPUT_DIM = len(SOURCE.vocab)
OUTPUT_DIM = len(TARGET.vocab)
EMBED_DIM = 256
HIDDEN_DIM = 512
NUM_LAYERS = 2
DROPOUT = 0.5

# Initialize the model
encoder = Encoder(INPUT_DIM, EMBED_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
decoder = Decoder(OUTPUT_DIM, EMBED_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
model = Seq2SeqTranslator(encoder, decoder, device).to(device)

# Set up optimizer and loss function
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=TARGET.vocab.stoi[TARGET.pad_token])

# Evaluation metrics
def calculate_bleu(model, iterator, target_vocab, device):
    model.eval()
    predictions = []
    references = []
    
    with torch.no_grad():
        for batch in iterator:
            src = batch.src
            trg = batch.trg

            output = model(src, trg, 0)  # Turn off teacher forcing
            output = output.argmax(dim=-1)
            
            for j in range(output.shape[1]):
                trg_tokens = [target_vocab.itos[token] for token in trg[1:, j]]
                pred_tokens = [target_vocab.itos[token] for token in output[1:, j]]
                
                # Remove <eos> token
                trg_tokens = trg_tokens[:trg_tokens.index('<eos>')]
                pred_tokens = pred_tokens[:pred_tokens.index('<eos>') if '<eos>' in pred_tokens else None]
                
                references.append([trg_tokens])
                predictions.append(pred_tokens)

    return corpus_bleu(predictions, references)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for batch in iterator:
            src = batch.src
            trg = batch.trg

            output = model(src, trg, 0)  # Turn off teacher forcing

            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

# Training function (updated with progress bar)
def train_model(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    progress_bar = tqdm(iterator, desc="Training", leave=False)
    
    for batch in progress_bar:
        src = batch.src
        trg = batch.trg
        
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
        epoch_loss += loss.item()
        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})
    
    return epoch_loss / len(iterator)

# Prediction function for the custom model
def predict_translation(model, sentence, source_vocab, target_vocab, device, max_length=50):
    model.eval()
    
    tokens = [token.lower() for token in tokenize_german(sentence)]
    tokens = ['<sos>'] + tokens + ['<eos>']
    src_indexes = [source_vocab.stoi[token] for token in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)
    
    with torch.no_grad():
        encoder_outputs, hidden = model.encoder(src_tensor)

    trg_indexes = [target_vocab.stoi['<sos>']]

    for i in range(max_length):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        
        with torch.no_grad():
            output, hidden, _ = model.decoder(trg_tensor, hidden, encoder_outputs)
        
        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)

        if pred_token == target_vocab.stoi['<eos>']:
            break

    trg_tokens = [target_vocab.itos[i] for i in trg_indexes]
    return trg_tokens[1:-1]  # Remove <sos> and <eos>


In [None]:
# Audio translation setup
speech_recognizer = sr.Recognizer()

def translate_audio(audio_file, src_lang='en', tgt_lang='de'):
    """Translate audio file."""
    with sr.AudioFile(audio_file) as source:
        audio = speech_recognizer.record(source)
    try:
        text = speech_recognizer.recognize_google(audio, language=src_lang)
        return translate_text(text, src_lang, tgt_lang)
    except sr.UnknownValueError:
        return "Speech recognition could not understand the audio"
    except sr.RequestError:
        return "Could not request results from speech recognition service"



In [None]:
# Video translation setup
def translate_video(video_file, src_lang='en', tgt_lang='de'):
    """Extract and translate text from video frames."""
    video = cv2.VideoCapture(video_file)
    fps = video.get(cv2.CAP_PROP_FPS)
    translations = []
    
    while video.isOpened():
        ret, frame = video.read()
        if not ret:
            break
        
        text = pytesseract.image_to_string(frame)
        if text.strip():
            translated = translate_text(text, src_lang, tgt_lang)
            timestamp = video.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
            translations.append((timestamp, translated))
    
    video.release()
    return translations



In [None]:
# Text translation using pre-trained model
marian_model = MarianMTModel.from_pretrained("Helsinki-NLP/opus-mt-en-de")
marian_tokenizer = MarianTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-de")

def translate_text(text, src_lang='en', tgt_lang='de'):
    inputs = marian_tokenizer(text, return_tensors="pt", padding=True)
    translated = marian_model.generate(**inputs)
    return marian_tokenizer.decode(translated[0], skip_special_tokens=True)



In [None]:
# Main execution
if __name__ == "__main__":
    # Train the custom model (uncomment to train)
    # N_EPOCHS = 10
    # CLIP = 1
    # for epoch in range(N_EPOCHS):
    #     train_loss = train_model(model, train_iterator, optimizer, criterion, CLIP)
    #     print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}')

    # Example usage
    print("Text Translation:")
    print(translate_text("Hello, how are you?", 'en', 'de'))

    print("\nAudio Translation:")
    print(translate_audio("path_to_audio_file.wav", 'en', 'de'))

    print("\nVideo Translation:")
    video_translations = translate_video("path_to_video_file.mp4", 'en', 'de')
    for timestamp, translation in video_translations:
        print(f"At {timestamp:.2f}s: {translation}")