In [1]:
import pandas as pd
from transformers import BertTokenizer, BertModel

# Define paths to labeled data
labeled_data_path = '../semi_supervised_data/labeled_data.csv'
unlabeled_data_path = '../semi_supervised_data/unlabeled_data.csv'

# Load labeled and data
labeled_data = pd.read_csv(labeled_data_path)
unlabeled_data = pd.read_csv(unlabeled_data_path)

# Load pre-trained model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

In [2]:
import torch

def tokenize_text(text, max_length=512, truncation=True, padding='max_length'):
    return tokenizer(text, max_length=max_length, truncation=truncation, padding=padding, return_tensors='pt')

# Function to get embeddings for a given text, ensuring tokens do not exceed 512
def get_embeddings(text):
    tokens = tokenize_text(text)
    with torch.no_grad():
        output = model(**tokens)
    return output.last_hidden_state.mean(dim=1).squeeze().numpy().astype('float32')


In [3]:
import pandas as pd
import numpy as np

# Convert labels to integers with type long
labeled_data['label'] = labeled_data['label'].map({'democrat': 0, 'republic': 1, 'other': 2})
labeled_data['label'] = labeled_data['label'].astype(np.int64)  # Ensure type is long

# Get embeddings for data and convert to list
labeled_data['embeddings'] = labeled_data['text'].apply(lambda x: get_embeddings(x).tolist())
unlabeled_data['embeddings'] = unlabeled_data['text'].apply(lambda x: get_embeddings(x).tolist())

# Save labeled and data with embeddings
labeled_data.to_csv('../semi_supervised_data/labeled_data_embeddings.csv', index=False)
unlabeled_data.to_csv('../semi_supervised_data/unlabeled_data_embeddings.csv', index=False)

In [4]:
import ast
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np

class TextDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data.iloc[idx]['embeddings'], self.data.iloc[idx]['label']
    
class Classifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = x.float()
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        for embeddings, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(embeddings)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}')
        
    return model

def evaluate_model(model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for embeddings, labels in test_loader:
            outputs = model(embeddings)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    return correct / total

# Function to convert string representation of list to actual list
def parse_embeddings(embeddings_str):
    try:
        return np.array(ast.literal_eval(embeddings_str))
    except Exception as e:
        print(f"Error parsing embeddings: {e}")
        return np.zeros((768,))  # Return a zero vector in case of error

In [5]:
# Load labeled data with embeddings
labeled_data = pd.read_csv('../semi_supervised_data/labeled_data_embeddings.csv')
unlabeled_data = pd.read_csv('../semi_supervised_data/unlabeled_data_embeddings.csv')


# Convert embeddings to numpy arrays
labeled_data['embeddings'] = labeled_data['embeddings'].apply(parse_embeddings)
unlabeled_data['embeddings'] = unlabeled_data['embeddings'].apply(parse_embeddings)

# Define hyperparameters
input_dim = 768
hidden_dim = 256
output_dim = 2
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Create train and test loaders
train_data = TextDataset(labeled_data)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

# Initialize model
model = Classifier(input_dim, hidden_dim, output_dim)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
model = train_model(model, train_loader, criterion, optimizer, num_epochs)

# Evaluate the model
train_accuracy = evaluate_model(model, train_loader)
print(f'Train accuracy: {train_accuracy}')

# Save model
torch.save(model.state_dict(), '../models/semi_supervised_clustering_model.pth')

Epoch 1/10, Loss: 0.604522168636322
Epoch 2/10, Loss: 0.46149516105651855
Epoch 3/10, Loss: 0.49714604020118713
Epoch 4/10, Loss: 0.5081111788749695
Epoch 5/10, Loss: 0.4623623490333557
Epoch 6/10, Loss: 0.29417622089385986
Epoch 7/10, Loss: 0.34449756145477295
Epoch 8/10, Loss: 0.3142215609550476
Epoch 9/10, Loss: 0.4035993814468384
Epoch 10/10, Loss: 0.2184717357158661
Train accuracy: 0.9166666666666666


In [6]:
# Predict labels for unlabeled data
unlabeled_data['embeddings'] = unlabeled_data['embeddings'].apply(lambda x: torch.tensor(x))
unlabeled_data['predicted_label'] = unlabeled_data['embeddings'].apply(lambda x: torch.argmax(model(x)).item())

# Save labeled data
unlabeled_data.to_csv('../semi_supervised_data/unlabeled_data_predicted.csv', index=False)

In [7]:
# print all text and label
for text, label in zip(unlabeled_data['text'], unlabeled_data['predicted_label']):
    print(f'Text: {text}, Label: {label}')

Text: biden needs to check in to the closest dementia unit where life is beautiful all the time, Label: 0
Text: we love trump, Label: 1
Text: you have to deport people that are here right now illegally because what kind of american system is this when illegal foreigners just come on in for free that not fair for all the other people that sacrificed worked hard waited a long time to just try to come to this great country to have a better life and future in a legal and right way the it should be but some of these people just come on in and they give them everything health care even a home to live in etc and our taxes also go to these people but the government doesnt help us when we need it is that fair no this is a country of all kinds of people from everywhere but we have to do stuff right, Label: 1
Text: is it bad i was cackling when biden went from to to to, Label: 0
Text: lolyour a winersomething snnapped in you when you lost, Label: 0
Text: why cant politicians answer the actual que

In [16]:
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
import torch
import torch.nn as nn
import torch.optim as optim

# Assuming you have your labeled and unlabeled data paths
labeled_data_path = '../semi_supervised_data/labeled_data.csv'
unlabeled_data_path = '../semi_supervised_data/unlabeled_data.csv'

# Load labeled data
labeled_df = pd.read_csv(labeled_data_path)
labeled_texts = labeled_df['text'].tolist()

# Map text labels to integers
label_mapping = {'democrat': 0, 'republic': 1, 'other': 2}
labeled_labels = labeled_df['label'].map(label_mapping).tolist()

# Load unlabeled data
unlabeled_df = pd.read_csv(unlabeled_data_path)
unlabeled_texts = unlabeled_df['text'].tolist()

# Example text representation neural network
class TextRepresentationNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(TextRepresentationNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Convert text to vector (you need to define this function based on your text representation method)
def text_to_vector(text):
    # Example: using a pre-trained embedding model (e.g., BERT, GloVe)
    # For simplicity, let's assume we have a dummy function that converts text to a fixed-size vector
    # You should replace this with your actual text vectorization logic
    vector = np.random.rand(300)  # Example vector of size 300
    return vector

# Convert texts to vectors
labeled_vectors = np.array([text_to_vector(text) for text in labeled_texts])
unlabeled_vectors = np.array([text_to_vector(text) for text in unlabeled_texts])

# Define your neural network parameters
input_dim = 300  # Example input dimension
hidden_dim = 128
output_dim = 50
alpha = 0.5  # Weight for unlabeled data importance
num_epochs = 50  # Number of epochs for training
K = 3  # Number of clusters, since we have 3 classes
l = 0.1  # Margin for hinge loss

# Initialize the model, loss function and optimizer
model = TextRepresentationNN(input_dim, hidden_dim, output_dim)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Semi-supervised loss function
def semi_supervised_loss(labeled_vectors, labeled_labels, unlabeled_vectors, cluster_centroids, model):
    total_loss = 0
    # Calculate the first term
    for vector in unlabeled_vectors:
        vector = torch.tensor(vector, dtype=torch.float32)
        representation = model(vector)
        min_dist = float('inf')
        for centroid in cluster_centroids:
            dist = torch.norm(representation - torch.tensor(centroid, dtype=torch.float32))
            if dist < min_dist:
                min_dist = dist
        total_loss += alpha * min_dist ** 2
    
    # Calculate the second term
    for vector, label in zip(labeled_vectors, labeled_labels):
        vector = torch.tensor(vector, dtype=torch.float32)
        representation = model(vector)
        correct_centroid = torch.tensor(cluster_centroids[label], dtype=torch.float32)
        loss1 = torch.norm(representation - correct_centroid) ** 2
        loss2 = 0
        for i, centroid in enumerate(cluster_centroids):
            if i != label:
                margin_loss = torch.max(torch.tensor(0.0), 
                                        l + torch.norm(representation - correct_centroid) ** 2 - torch.norm(representation - torch.tensor(centroid, dtype=torch.float32)) ** 2)
                loss2 += margin_loss
        total_loss += (1 - alpha) * (loss1 + loss2)
    
    return total_loss

# K-means++ initialization
kmeans = KMeans(n_clusters=K, init='k-means++')
kmeans.fit(unlabeled_vectors)
initial_centroids = kmeans.cluster_centers_

# Convert initial centroids to the same dimension as the neural network output
initial_centroids = np.array([model(torch.tensor(c, dtype=torch.float32)).detach().numpy() for c in initial_centroids])

# Training loop
for epoch in range(num_epochs):
    cluster_assignments = []
    # Assign clusters
    for vector in unlabeled_vectors:
        vector = torch.tensor(vector, dtype=torch.float32)
        representation = model(vector).detach().numpy()
        min_dist = float('inf')
        cluster_id = -1
        for i, centroid in enumerate(initial_centroids):
            dist = np.linalg.norm(representation - centroid)
            if dist < min_dist:
                min_dist = dist
                cluster_id = i
        cluster_assignments.append(cluster_id)
    
    # Estimate centroids
    new_centroids = np.zeros_like(initial_centroids)
    counts = np.zeros(K)
    for i, vector in enumerate(unlabeled_vectors):
        vector = torch.tensor(vector, dtype=torch.float32)
        representation = model(vector).detach().numpy()
        cluster_id = cluster_assignments[i]
        new_centroids[cluster_id] += representation
        counts[cluster_id] += 1
    for i in range(K):
        if counts[i] != 0:
            new_centroids[i] /= counts[i]
    
    # Update parameters
    optimizer.zero_grad()
    loss = semi_supervised_loss(labeled_vectors, labeled_labels, unlabeled_vectors, new_centroids, model)
    loss.backward()
    optimizer.step()
    
    # Check for convergence (optional)
    if np.allclose(new_centroids, initial_centroids):
        break
    initial_centroids = new_centroids

print("Training completed.")

# Test on unlabeled data
unlabeled_representations = [model(torch.tensor(text_to_vector(text), dtype=torch.float32)).detach().numpy() for text in unlabeled_texts]
unlabeled_cluster_assignments = []

for representation in unlabeled_representations:
    min_dist = float('inf')
    cluster_id = -1
    for i, centroid in enumerate(initial_centroids):
        dist = np.linalg.norm(representation - centroid)
        if dist < min_dist:
            min_dist = dist
            cluster_id = i
    unlabeled_cluster_assignments.append(cluster_id)

# Output the cluster assignments for the unlabeled data
for text, cluster_id in zip(unlabeled_texts, unlabeled_cluster_assignments):
    print(f'Text: "{text}" is assigned to cluster {cluster_id}')


Training completed.
Text: "biden needs to check in to the closest dementia unit where life is beautiful all the time" is assigned to cluster 0
Text: "we love trump" is assigned to cluster 1
Text: "you have to deport people that are here right now illegally because what kind of american system is this when illegal foreigners just come on in for free that not fair for all the other people that sacrificed worked hard waited a long time to just try to come to this great country to have a better life and future in a legal and right way the it should be but some of these people just come on in and they give them everything health care even a home to live in etc and our taxes also go to these people but the government doesnt help us when we need it is that fair no this is a country of all kinds of people from everywhere but we have to do stuff right" is assigned to cluster 2
Text: "is it bad i was cackling when biden went from to to to" is assigned to cluster 2
Text: "lolyour a winersomething