Stephen Iota


iota@usc.edu

### Read training, dev and unlabeled test data

The following provides a starting code (Python 3) of how to read the labeled training and dev cipher text, and unlabeled test cipher text, into lists.

In [None]:
#train, dev, test = [], [], []

In [None]:
#for x in open('./train_enc.tsv', encoding='utf-8'):
#    x = x.rstrip('\n\r').split('\t')
#    # x[0] will be the label (0 or 1), and x[1] will be the ciphertext sentence.
#    x[0] = int(x[0]) 
#    train.append(x)
#print (len(train))
#print (train[:3])

In [None]:
#for x in open('./dev_enc.tsv', encoding='utf-8'):
#    x = x.rstrip('\n\r').split('\t')
#    # x[0] will be the label (0 or 1), and x[1] will be the ciphertext sentence.
#    x[0] = int(x[0]) 
#    dev.append(x)
#print (len(dev))
#print (dev[:3])

#### Different from 'train' and 'dev' that are both list of tuples, 'test' will be just a list.

In [None]:
#for x in open('./test_enc_unlabeled.tsv', encoding='utf-8'):
#    x = x.rstrip('\n\r')
#    test.append(x)
#print (len(test))
#print (test[:3])

#### You can split every sentence into lists of words by white spaces.

In [None]:
#train_split = [[x[0], x[1].split(' ')] for x in train]
#dev_split = [[x[0], x[1].split(' ')] for x in dev]
#test_split = [[x.split(' ')] for x in test]

### Custom dataclass

In [1]:
from typing import Union

from torch.utils.data import Dataset

class CipherTxtData(Dataset):
    """Dataclass for reading cipher-text data from provided files."""
    PATH = {"train": "data/train_enc.tsv",
            "dev": "data/dev_enc.tsv",
            "test": "data/test_enc_unlabeled.tsv"}
    def __init__(self,
                 mode: str = "train",
                 split: bool = True):
        super().__init__()
        self.data: list[list[str]] = []
        self.split: bool = split
        try:
            self.root: str = self.PATH[mode]
            self.mode: str = mode
            self.read()
        except KeyError:
            raise ValueError(f"Mode {mode} not supported.")

    def read(self) -> None:
        """Read datafile."""
        with open(self.PATH[self.mode], mode='r', encoding="utf-8") as f:
            for line in f:
                self._read(line, mode=self.mode)
        if self.split:
            self._split()

    def _read(self, line, mode):
        if mode == "test":
           x = line.rstrip('\n\r')
        else:
            x = line.rstrip('\n\r').split('\t')
            x[0] = int(x[0])
        self.data.append(x)

    def _split(self):
        if self.mode == "test":
            self.data = [[x.split(' ')] for x in self.data]
        else:
            self.data = [[x[0], x[1].split(' ')] for x in self.data]

    @property
    def X(self) -> Union[list, None]:
        """Documents."""
        if self.data is None:
            return None
        elif self.mode == "test":
            return self.data
        else:
            return [x[1] for x in self.data] if self.data is not None else None


    @property
    def y(self) -> Union[list, None]:
        """Class labels, either 0 or 1"""
        if self.mode != "test" and self.data is not None:
            return [x[0] for x in self.data]
        else:
            return None

    def __getitem__(self, i):
        return self.data[i]

    def __len__(self,):
        return len(self.data) if self.data is not None else 0

In [2]:
# from typing import Optional, Sequence, Callable

# import gensim

# class CipherCorpus:
#     """An iterator that yields sentences (lists of str)."""
#     def __init__(self, text):
#         self.text = text

#     def __iter__(self):
#         for line in self.text:
#             yield line

# class CipherNGramData(Dataset):
#     """Dataclass to yield ciphertext ngrams.

#     If ngrams are retrieved using `__get_item__` method,
#     returns one-hot encoding of ngrams.
#     """
#     def __init__(self, ciphertxtdata: CipherTxtData, context_size: int = 3):
#         self.context_size = context_size
#         self.text = [x for y in ciphertxtdata for x in y]
#         self.vocab = set(self.text)
#         self.vocab_size = len(self.vocab)
#         self.word_to_idx = {word: i for i, word in enumerate(self.vocab)}

#         self.ngrams = [
#             [[self.text[i - j - 1] for j in range(self.context_size)], self.text[i]]
#             for i in range(self.context_size, len(self.text))
#         ]

#         self.X = []
#         self.y = []
#         for i in range(self.context_size, len(self.text)):
#             self.X.append([self.text[i - j - 1] for j in range(self.context_size)])
#             self.y.append(self.text[i])

#     def __len__(self):
#         return len(self.y)

#     def __getitem__(self, i):
#         X = self.X[i]
#         X = torch.tensor([self.word_to_idx[w] for w in X], dtype=torch.long)
#         y = self.y[i]
#         y = torch.tensor([self.word_to_idx[y]])

#         return X, y


# class CipherW2VData(Dataset):
#     """Dataclass that generates w2v embeddings."""
#     OOV = '~'
#     def __init__(self,
#                  corpus: Union[CipherCorpus, list],
#                  wv: Optional[gensim.models.Word2Vec] = None,
#                  **w2vparams) -> None:
#         super().__init__()
#         if not isinstance(corpus, CipherCorpus):
#             corpus = CipherCorpus(corpus)
#         self.corpus = corpus

#         if not wv:
#             model = gensim.models.Word2Vec(sentences=self.corpus, **w2vparams)
#             self.wv = model.wv
#             del model
#         else:
#             self.wv = wv

#     def __len__(self):
#         return len(self.corpus.text)

#     def __getitem__(self, i):
#         sentence = self.corpus.text[i]

#         sentence_emb = []
#         for word in sentence:
#             if word in self.wv:
#                 emb = self.wv[word]
#             else:
#                 emb = self.wv[self.OOV]
#             emb = torch.tensor(emb)
#             sentence_emb.append(emb)


#         return sentence_emb


# class CipherVecData(Dataset):
#     """General dataclass for ciphertext embeddings."""
#     def __init__(self, X: Sequence, y: Optional[Sequence] = None,
#                  transform: Callable = None,
#                  target_transform: Callable= None) -> None:
#         super().__init__()
#         self.X = X
#         self.y = y
#         self.transform = transform
#         self.target_transfrom = target_transform

#     def __len__(self,):
#         return len(self.X) if self.X is not None else 0

#     def __getitem__(self, i):
#         x = self.X[i]
#         if self.transform:
#             x = self.transform(x)
#         if self.target_transfrom and self.y is not None:
#             y = self.target_transfrom(self.y[i])
#         elif self.y is not None:
#             y = self.y[i]

#         return x, y if self.y is not None else x

In [3]:
traindata = CipherTxtData(mode="train", split=False)
devdata = CipherTxtData(mode="dev", split=False)
testdata = CipherTxtData(mode="test", split=False)

### Main Code Body

You may choose to experiment with different methods using your program. However, you need to embed the training and inference processes at here. We will use your prediction on the unlabeled test data to grade, while checking this part to understand how your method has produced the predictions.

In [4]:
from fasttext import FastText

In [5]:
## Helper functions

In [6]:
def save_to_txt(text, labels=None, file_name="test.txt"):
    with open(file_name, "w") as f:
        if labels:
            for seq, label in zip(text, labels):
                f.write(f"__label__{str(label)} {seq}\n")
        else:
            for seq in text:
                f.write(seq)

In [7]:
def predict(model, data):
    predictions = []
    for text in data:
        pred = model.predict(text)
        predictions.append(pred)
    
    return predictions

In [8]:
def accuracy_score_scalers(predictions, truth):
    score = 0
    for pred, true in zip(predictions, truth):
        if pred[0][0] == "__label__1" and true == 1:
            score += 1
        elif pred[0][0] == "__label__0" and true == 0:
            score += 1
    return score / len(truth)

import torch
@torch.no_grad()
def accuracy_score_logits(logits: torch.tensor, y_true: torch.tensor,
                          normalize: bool = True) -> Union[float, int]:
    """Score predictions against ref given logits,
    i.e., argmax(logits[i]) = pred[i]
    """
    score = 0
    for pair, true in zip(logits, y_true):
        pred = torch.argmax(pair)
        if pred == true:
            score += 1

    return score / len(y_true) if normalize else int(score)

from pathlib import Path
from typing import Union
def save_checkpoint(model_state: dict,
                    optim_state: dict,
                    file_name: Union[str, Path],
                    **params) -> None:
    """Checkpoint model params during training."""
    checkpoint = {"model_state_dict": model_state,
                  "optim_state_dict": optim_state}
    for key, val in params.items():
        checkpoint[key] = val
    torch.save(checkpoint, file_name)


def load_checkpoint(file_name: Union[str, Path]) -> dict:
    """Retrieve saved model state dict."""
    return torch.load(file_name)

In [9]:
# Eventually, results need to be a list of 2028 0 or 1's
#results = []

In [10]:
def predict_fasttext(model, data):
    predictions = []
    for text in data:
        pred = model.predict(text)
        predictions.append(pred)
    
    return predictions

In [11]:
def save_seqs_to_file(text, labels=None, file_name="test.txt"):
    with open(file_name, "w") as f:
        if labels:
            for seq, label in zip(text, labels):
                f.write(f"__label__{str(label)} {seq}\n")
        else:
            for seq in text:
                f.write(seq)

In [12]:
save_seqs_to_file(traindata.X, traindata.y, "data.train")
save_seqs_to_file(devdata.X, devdata.y, "data.dev")

In [13]:
## make final training data
X_final = traindata.X.copy()
X_final.extend(devdata.X.copy())
y_final = traindata.y.copy()
y_final.extend(devdata.y.copy())

save_seqs_to_file(X_final, y_final, "final.train")

In [15]:
import fasttext

In [None]:
#import fasttext

model = fasttext.train_supervised(input='data.train', autotuneValidationFile='data.dev')

In [None]:
dev_predictions = predict_fasttext(model, devdata.X)
dev_score = accuracy_score_scalers(dev_predictions, devdata.y)
dev_score

In [None]:
print(model.lrUpdateRate)
print(model.lr)
print(model.minCount)
print(model.epoch)
print(model.ws)
print(model.wordNgrams)

In [17]:
model = fasttext.train_supervised("data.train", 
                                  lr=0.55, 
                                  lrUpdateRate=100, 
                                  minCount=1, 
                                  epoch=15, 
                                  ws=5,
                                  wordNgrams=2)

dev_predictions = predict_fasttext(model, devdata.X)
dev_score = accuracy_score_scalers(dev_predictions, devdata.y)
dev_score

Read 0M words
Number of words:  20861
Number of labels: 2
Progress: 100.0% words/sec/thread: 1855690 lr:  0.000000 avg.loss:  0.132459 ETA:   0h 0m 0s100.0% words/sec/thread: 1856238 lr: -0.000013 avg.loss:  0.132459 ETA:   0h 0m 0s


0.9052787370498273

In [19]:
model = fasttext.train_supervised("final.train", 
                                  lr=0.55, 
                                  lrUpdateRate=100, 
                                  minCount=1, 
                                  epoch=15, 
                                  ws=5,
                                  wordNgrams=2)

Read 0M words
Number of words:  21421
Number of labels: 2
Progress: 100.0% words/sec/thread: 2102197 lr:  0.000000 avg.loss:  0.121651 ETA:   0h 0m 0s


In [22]:
test_predictions = predict_fasttext(model, testdata.X)

In [None]:
#model = fasttext.train_supervised(input="data.train", epoch=20, wordNgrams=3, minCount=5)
#dev_predictions = predict_fasttext(model, devdata.X)
#dev_score = accuracy_score_scalers(dev_predictions, devdata.y)
#dev_score

In [None]:
# for min_count in range(1, 50, 2):
#     model = FastText.train_supervised("train.txt", epoch=min_count, wordNgrams=2, minCount=2)

#     dev_predictions = predict_fasttext(model, devdata.X)
#     dev_score = accuracy_score_scalers(dev_predictions, devdata.y)

#     train_predictions = predict_fasttext(model, traindata.X)
#     train_score = accuracy_score_scalers(train_predictions, traindata.y)
    
#     print(f"min count={min_count}")
#     print(f"\ttrain score={train_score}")
#     print(f"\tdev score={dev_score}")

### Output Prediction Result File

You will need to submit a prediction result file. It should have 2028 lines, every line should be either 0 or 1, which is your model's prediction on the respective test set instance.

In [24]:
from typing import Sequence, Union
from pathlib import Path

def save_results(results: Sequence[int],
                 file_name: Union[str, Path] = "upload_predictions.txt") -> None:
    """Write final predictions to submission file."""
    with open(file_name, mode='w', encoding="utf-8") as f:
        for x in results:
            out = str(int(x))
            f.write(out + '\n')

In [29]:
results = []

for x in test_predictions:
    if x[0][0] == "__label__1":
        results.append(1)
    else:
        results.append(0)

In [32]:
# suppose you had your model's predictions on the 2028 test cases read from test_enc_unlabeled.tsv, and 
#those results are in the list called 'results'
assert (len(results) == 2028)

In [34]:
results[:6]

[0, 0, 0, 0, 0, 1]

In [33]:
save_results(results)

In [None]:
# make sure the results are not float numbers, but intergers 0 and 1
#results = [int(x) for x in results]

In [None]:
# write your prediction results to 'upload_predictions.txt' and upload that later
#with open('upload_predictions.txt', 'w', encoding = 'utf-8') as fp:
#    for x in results:
#        fp.write(str(x) + '\n')