In [18]:
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoConfig
from sklearn.model_selection import train_test_split
import torchviz
import pandas as pd
from tqdm import tqdm
tqdm.pandas()

# Load Data

In [None]:
df_cleantech = pd.read_json('data/cleantech.json')
df_cleantech['label'] = 1
df_non_cleantech = pd.read_json('data/non_cleantech.json')
df_non_cleantech['label'] = 0
df = pd.concat([df_cleantech, df_non_cleantech], ignore_index=True)
# Drop all columns except APPLN_ID and APPLN_ABSTRACT
df.drop(df.columns.difference(['APPLN_ID', 'APPLN_ABSTRACT']), 1, inplace=True)

In [19]:
# Create small examples for testing
test_text = "This is a test sentence. This is another test sentence."
test_text2 = "This is a test sentence. This is another test sentence. This is a third test sentence."

df_cleantech = pd.DataFrame({'appln_id': [1], 'appln_abstract': [test_text], 'label': [1]})
df_non_cleantech = pd.DataFrame({'appln_id': [2], 'appln_abstract': [test_text2], 'label': [0]})

df = pd.concat([df_cleantech, df_non_cleantech], ignore_index=True)

# Data Preprocessing

In [20]:
def preprocess_text(text):
    # Lowercasing (if the model is not case-sensitive)
    text = text.lower()

    # Removing URLs
    text = re.sub(r'https?://\S+|www\.\S+', '', text)

    # Removing Email addresses
    text = re.sub(r'\S+@\S+', '', text)

    # Removing special characters and numbers
    text = re.sub(r'[^A-Za-z\s]', '', text)

    # Removing extra whitespaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

In [21]:
df['appln_abstract'] = df['appln_abstract'].progress_apply(preprocess_text)

100%|██████████| 2/2 [00:00<00:00, 7090.96it/s]


# Initialize Torch Model

In [22]:
model_path = 'climatebert/distilroberta-base-climate-f'

# Initialize the tokenizer from HuggingFace
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Tokenize the sentences
tokenized = df['appln_abstract'].apply(lambda x: tokenizer.encode(x, add_special_tokens=True))

# Padding and creating tensor dataset
max_len = 768
padded = torch.tensor([i + [0]*(max_len-len(i)) for i in tokenized])

# Define labels
labels = torch.tensor(df['label'].values)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [23]:
# Custom Dataset Class
class SentenceDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {'input_ids': self.encodings[idx], 'labels': self.labels[idx]}
        return item

    def __len__(self):
        return len(self.labels)

In [24]:
# Split the data into training and testing
train_encodings, test_encodings, train_labels, test_labels = train_test_split(padded, labels, test_size=0.2)

In [25]:
# Create datasets
train_dataset = SentenceDataset(train_encodings, train_labels)
test_dataset = SentenceDataset(test_encodings, test_labels)

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True)

# Define NN Architecture

In [26]:
class FNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, text):
        embedded = text.float()
        hidden = self.relu(self.fc1(embedded))
        output = self.sigmoid(self.fc2(hidden))
        return output

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # Additional hidden layer
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        embedded = text.float()
        hidden = self.relu(self.fc1(embedded))
        hidden = self.relu(self.fc2(hidden))
        output = self.sigmoid(self.fc3(hidden))
        return output

In [None]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        embedded = self.embedding(text)
        output, hidden = self.rnn(embedded)
        assert torch.equal(output[-1, :, :], hidden.squeeze(0))
        return self.sigmoid(self.fc(hidden.squeeze(0)))

In [None]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(LSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        embedded = self.embedding(text)
        output, (hidden, _) = self.lstm(embedded)
        return self.sigmoid(self.fc(hidden.squeeze(0)))

In [29]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(fs, embedding_dim)) 
            for fs in filter_sizes
        ])
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text):
        text = text.permute(1, 0)
        embedded = self.embedding(text)
        embedded = embedded.unsqueeze(1)
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        cat = self.fc(torch.cat(pooled, dim=1))
        return self.sigmoid(cat)

# Train Model

In [None]:
# Instantiate the model
# model = FNN(input_dim=768, hidden_dim=100, output_dim=1)
model = MLP(input_dim=768, hidden_dim=100, output_dim=1)

# Loss and Optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training Loop
for epoch in range(4):  # number of epochs
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids']
        labels = batch['labels'].float()  # ensure labels are float
        outputs = model(input_ids.float())

        # Squeeze the output only if it's not a scalar
        if outputs.dim() > 1:
            outputs = outputs.squeeze(1)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f'Epoch {epoch+1}, Loss: {loss.item()}')


In [None]:
def evaluate(model, data_loader):
    model.eval()
    total, correct = 0, 0

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids']
            labels = batch['labels']
            outputs = model(input_ids.float())
            predicted = (outputs > 0).float()  # Using 0 as the threshold
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return correct / total

# Example usage after each training epoch
accuracy = evaluate(model, test_loader)
print(f'Test Accuracy: {accuracy:.4f}')