In [9]:
# Install faker if not already installed
!pip install faker

from faker import Faker
import random

# Initialize faker with English locale
fake = Faker('en_US')
Faker.seed(42)
random.seed(42)

# Helper function to generate a more logical sentence
def generate_logical_sentence():
    name = fake.name()
    job = fake.job()
    company = fake.company()
    city = fake.city()
    action = random.choice([
        f"started working as a {job} at {company}.",
        f"moved to {city} for a new opportunity.",
        f"is currently employed at {company} as a {job}.",
        f"gave a presentation on {fake.catch_phrase().lower()}.",
        f"attended a conference in {city}.",
        f"recently published a report titled '{fake.bs().capitalize()}'."
    ])
    return f"{name} {action}"

# Generate 5,000 sentences
sentences = [generate_logical_sentence() for _ in range(5000)]

# Save to file
filename = "english_sentences_5k.txt"
with open(filename, "w", encoding="utf-8") as f:
    for sentence in sentences:
        f.write(sentence + "\n")

# Download in Colab
from google.colab import files
files.download(filename)




<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import random
from pathlib import Path

# Dataset Preparation
file_path = Path("english_sentences_5k.txt")
with file_path.open(encoding="utf-8") as f:
    sample_lines = [line.strip() for line in f if line.strip()]

lines = [sent.lower().split() for sent in sample_lines]
vocab = set(word for line in lines for word in line)
word2idx = {w: i + 2 for i, w in enumerate(sorted(vocab))}
word2idx["<pad>"] = 0
word2idx["<start>"] = 1
idx2word = {i: w for w, i in word2idx.items()}

vocab_size = len(word2idx)
max_len = max(len(l) for l in lines) + 1

def encode(line):
    return [word2idx['<start>']] + [word2idx[w] for w in line]

encoded = [encode(line) + [0] * (max_len - len(line) - 1) for line in lines]

class TextDataset(Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data, dtype=torch.long)
    def __len__(self): return len(self.data)
    def __getitem__(self, i): return self.data[i]

train_loader = DataLoader(TextDataset(encoded), batch_size=4, shuffle=True)

# Generator
class Generator(nn.Module):
    def __init__(self, vocab_size, emb_dim=32, hidden_dim=64, max_len=10):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
        self.vocab_size = vocab_size
        self.max_len = max_len

    def forward(self, x, hidden=None):
        emb = self.embedding(x)
        out, hidden = self.lstm(emb, hidden)
        logits = self.fc_out(out)
        return logits, hidden

    def sample(self, batch_size, device='cpu', start_token=1):
        x = torch.full((batch_size, 1), start_token, dtype=torch.long).to(device)
        samples = [x]
        hidden = None
        for _ in range(self.max_len - 1):
            logits, hidden = self.forward(x, hidden)
            prob = F.softmax(logits[:, -1, :], dim=-1)
            next_token = torch.multinomial(prob, num_samples=1)
            samples.append(next_token)
            x = next_token
        return torch.cat(samples, dim=1)

# Discriminator
class Discriminator(nn.Module):
    def __init__(self, vocab_size, emb_dim=32, num_filters=64, filter_sizes=[2, 3, 4], dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.convs = nn.ModuleList([
            nn.Conv2d(1, num_filters, (fs, emb_dim)) for fs in filter_sizes
        ])
        self.fc = nn.Linear(num_filters * len(filter_sizes), 1)
        self.dropout = nn.Dropout(dropout)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.embedding(x).unsqueeze(1)
        convs = [F.relu(conv(x)).squeeze(3) for conv in self.convs]
        pools = [F.max_pool1d(c, c.size(2)).squeeze(2) for c in convs]
        out = self.dropout(torch.cat(pools, dim=1))
        return self.sigmoid(self.fc(out))

# Pretraining
def pretrain_generator(generator, dataloader, criterion, optimizer, num_epochs=10, device='cpu'):
    generator.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in dataloader:
            batch = batch.to(device)
            inputs = batch[:, :-1]
            targets = batch[:, 1:]
            logits, _ = generator(inputs)
            loss = criterion(logits.view(-1, generator.vocab_size), targets.reshape(-1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Generator Pretrain Loss [{epoch+1}]: {total_loss / len(dataloader):.4f}")

def pretrain_discriminator(discriminator, generator, dataloader, criterion, optimizer, num_epochs=10, device='cpu'):
    discriminator.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for real_batch in dataloader:
            real_batch = real_batch.to(device)
            fake_batch = generator.sample(real_batch.size(0), device=device)
            all_data = torch.cat([real_batch, fake_batch], dim=0)
            labels = torch.cat([
                torch.ones(real_batch.size(0), 1),
                torch.zeros(fake_batch.size(0), 1)
            ]).to(device)
            pred = discriminator(all_data)
            loss = criterion(pred, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Discriminator Pretrain Loss [{epoch+1}]: {total_loss / len(dataloader):.4f}")

# Monte Carlo Rollout
class Rollout:
    def __init__(self, generator, update_rate=0.8):
        self.generator = generator
        self.device = next(generator.parameters()).device
        self.aux_generator = Generator(
            generator.vocab_size,
            generator.embedding.embedding_dim,
            generator.lstm.hidden_size,
            generator.max_len
        ).to(self.device)
        self.aux_generator.load_state_dict(generator.state_dict())
        self.update_rate = update_rate

    def get_reward(self, partial_seqs, rollout_num, discriminator, device='cpu'):
        rewards = []
        batch_size, seq_len = partial_seqs.size()
        partial_seqs = partial_seqs.to(self.device)
        for t in range(1, seq_len):
            samples = []
            for _ in range(rollout_num):
                samples_t = self.rollout_step(partial_seqs[:, :t], t)
                samples.append(samples_t)
            samples = torch.cat(samples, dim=0)
            scores = discriminator(samples).view(rollout_num, batch_size)
            avg_scores = torch.mean(scores, dim=0)
            rewards.append(avg_scores)
        return torch.stack(rewards, dim=1).detach()

    def rollout_step(self, partial_seq, t):
        self.aux_generator.eval()
        samples = partial_seq.to(self.device)
        with torch.no_grad():
            hidden = None
            for _ in range(self.aux_generator.max_len - t):
                logits, hidden = self.aux_generator(samples, hidden)
                prob = F.softmax(logits[:, -1, :], dim=-1)
                next_token = torch.multinomial(prob, num_samples=1)
                samples = torch.cat([samples, next_token], dim=1)
        return samples

    def update_params(self):
        for target_param, source_param in zip(self.aux_generator.parameters(), self.generator.parameters()):
            target_param.data = self.update_rate * target_param.data + (1 - self.update_rate) * source_param.data

# Policy Gradient Loss
def generator_pg_loss(generator, samples, rewards, device='cpu'):
    inputs = samples[:, :-1]
    targets = samples[:, 1:]
    logits, _ = generator(inputs.to(device))
    log_probs = F.log_softmax(logits, dim=-1)
    log_probs = log_probs.gather(2, targets.unsqueeze(2).to(device)).squeeze(2)
    return -torch.mean(log_probs * rewards.to(device))

def update_generator_with_pg(generator, samples, rewards, optimizer, device='cpu'):
    generator.train()
    loss = generator_pg_loss(generator, samples, rewards, device)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss.item()

# Adversarial Training
def adversarial_train(generator, discriminator, rollout, dataloader, gen_optimizer, disc_optimizer, g_steps=1, d_steps=3, rollout_num=16, num_epochs=30, device='cpu'):
    for epoch in range(num_epochs):
        print(f"Adversarial Epoch {epoch+1}")
        for _ in range(g_steps):
            samples = generator.sample(batch_size=4, device=device)
            rewards = rollout.get_reward(samples, rollout_num, discriminator, device=device)
            loss = update_generator_with_pg(generator, samples, rewards, gen_optimizer, device)
            print(f"  Generator RL Loss: {loss:.4f}")
        rollout.update_params()
        for _ in range(d_steps):
            for real_batch in dataloader:
                real_batch = real_batch.to(device)
                fake_batch = generator.sample(real_batch.size(0), device=device)
                all_data = torch.cat([real_batch, fake_batch], dim=0)
                labels = torch.cat([
                    torch.ones(real_batch.size(0), 1),
                    torch.zeros(fake_batch.size(0), 1)
                ]).to(device)
                pred = discriminator(all_data)
                d_loss = F.binary_cross_entropy(pred, labels)
                disc_optimizer.zero_grad()
                d_loss.backward()
                disc_optimizer.step()
        print(f"  Discriminator Loss: {d_loss.item():.4f}")

# Run
device = 'cuda' if torch.cuda.is_available() else 'cpu'
gen = Generator(vocab_size, max_len=max_len).to(device)
disc = Discriminator(vocab_size).to(device)
gen_opt = torch.optim.Adam(gen.parameters(), lr=1e-3)
disc_opt = torch.optim.Adam(disc.parameters(), lr=1e-4)

print("Pretraining Generator...")
pretrain_generator(gen, train_loader, nn.CrossEntropyLoss(), gen_opt, num_epochs=10, device=device)

print("Pretraining Discriminator...")
pretrain_discriminator(disc, gen, train_loader, nn.BCELoss(), disc_opt, num_epochs=10, device=device)

print("Starting Adversarial Training...")
rollout = Rollout(gen)
adversarial_train(gen, disc, rollout, train_loader, gen_opt, disc_opt, num_epochs=20, device=device)

# Generate samples
gen.eval()
samples = gen.sample(4, device=device)
for row in samples:
    words = []
    for idx in row:
        word = idx2word[idx.item()]
        if word == '<pad>':
            break
        words.append(word)
    print("output:", ' '.join(words))

Pretraining Generator...
Generator Pretrain Loss [1]: 2.9496
Generator Pretrain Loss [2]: 2.0163
Generator Pretrain Loss [3]: 1.8391


In [None]:
# to save the model for later use
torch.save(gen.state_dict(), "seqgan_generator.pth")


In [None]:
# To load the model later
gen_loaded = Generator(vocab_size, max_len=max_len).to(device)
gen_loaded.load_state_dict(torch.load("seqgan_generator.pth", map_location=device))
gen_loaded.eval()


In [None]:
def generate_from_prompt(generator, prompt, word2idx, idx2word, max_len=20, device='cpu'):
    generator.eval()
    words = prompt.lower().strip().split()
    input_ids = [word2idx.get(w, word2idx['<pad>']) for w in words]
    input_tensor = torch.tensor([[word2idx['<start>']] + input_ids], dtype=torch.long).to(device)

    hidden = None
    output_seq = input_tensor
    for _ in range(max_len - input_tensor.size(1)):
        logits, hidden = generator(output_seq, hidden)
        probs = F.softmax(logits[:, -1, :], dim=-1)
        next_token = torch.multinomial(probs, num_samples=1)
        output_seq = torch.cat([output_seq, next_token], dim=1)
        if next_token.item() == word2idx['<pad>']:
            break

    output_words = [idx2word[idx.item()] for idx in output_seq[0] if idx.item() > 1]
    return ' '.join(output_words)


In [None]:
prompt = "Allison Hill"
generated_text = generate_from_prompt(gen, prompt, word2idx, idx2word, max_len=20, device=device)
print("Generated:", generated_text)


In [None]:
import matplotlib.pyplot as plt
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction

# Prepare reference and hypothesis sentences
smoothie = SmoothingFunction().method4
gen.eval()
generated = gen.sample(4, device=device)

references = [[line] for line in lines[:4]]  # List of lists of tokens
hypotheses = []
for row in generated:
    sentence = []
    for idx in row:
        word = idx2word[idx.item()]
        if word == '<pad>':
            break
        sentence.append(word)
    hypotheses.append(sentence)

# Compute final BLEU score using corpus_bleu
final_bleu = corpus_bleu(references, hypotheses, smoothing_function=smoothie)
print(f"Final BLEU Score of Generator: {final_bleu:.4f}")


In [None]:
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score

# Get real and fake batches
real_batch = next(iter(train_loader)).to(device)
fake_batch = gen.sample(real_batch.size(0), device=device)

# True labels: 1 for real, 0 for fake
y_true = torch.cat([
    torch.ones(real_batch.size(0)),   # real
    torch.zeros(fake_batch.size(0))   # fake
]).to(device)

# Get discriminator predictions (rounded to 0 or 1)
disc.eval()
with torch.no_grad():
    all_data = torch.cat([real_batch, fake_batch], dim=0)
    y_pred = disc(all_data).squeeze().round()

# Convert to CPU for sklearn
y_true_np = y_true.cpu().numpy()
y_pred_np = y_pred.cpu().numpy()

# Compute confusion matrix
cm = confusion_matrix(y_true_np, y_pred_np)
tn, fp, fn, tp = cm.ravel()

# Print matrix with labels
print("Confusion Matrix:")
print(f"               Predicted Fake   Predicted Real")
print(f"Actual Fake       {tn}               {fp}")
print(f"Actual Real       {fn}               {tp}")

# Compute metrics
accuracy  = accuracy_score(y_true_np, y_pred_np)
precision = precision_score(y_true_np, y_pred_np)
recall    = recall_score(y_true_np, y_pred_np)
f1        = f1_score(y_true_np, y_pred_np)

print(f"\nAccuracy:  {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall:    {recall:.2f}")
print(f"F1 Score:  {f1:.2f}")

# Plot confusion matrix
labels = ['Fake', 'Real']
fig, ax = plt.subplots(figsize=(5, 5))
im = ax.imshow(cm, cmap='viridis')
ax.set_title("Discriminator Confusion Matrix")
ax.set_xticks([0, 1])
ax.set_yticks([0, 1])
ax.set_xticklabels(labels)
ax.set_yticklabels(labels)
ax.set_xlabel("Predicted Label")
ax.set_ylabel("True Label")

# Annotate matrix cells
for i in range(2):
    for j in range(2):
        ax.text(j, i, cm[i, j], ha='center', va='center', color='white' if cm[i,j]>1 else 'black')

plt.colorbar(im, ax=ax)
plt.tight_layout()
plt.show()
