In [1]:
import collections
import random
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import seutil as su
from tqdm.notebook import tqdm

work_dir = Path.home() / "projects" / "cs846mlse-1249-demos" / "_work"
raw_data_dir = work_dir / "raw-data"

In [2]:
# Load dataset
# We take the Python part of the dataset from Rahman et al., "Natural Software Revisited", in ICSE 2019.
# original link: https://www.dropbox.com/scl/fo/4vagrbe4wopt78zi0vb5s/AAW7wWFGFpvUp06v_Hx52u0/Python?rlkey=xvs8pd8wexfg9khk5qkfn2dwa&subfolder_nav_tracking=1&dl=0
data_dir = raw_data_dir / "nsr-python" / "Projects"

proj2docs = collections.defaultdict(list)
for proj_dir in data_dir.iterdir():
    if not proj_dir.is_dir():
        continue
    proj_name = proj_dir.name
    for doc_path in proj_dir.glob("*.py.tokens"):
        try:
            proj2docs[proj_name].append(su.io.load(doc_path, su.io.fmts.txt).split())
        except UnicodeDecodeError:
            continue

projs = list(sorted(proj2docs.keys()))
num_train_projs = int(len(projs) * 0.8)
train_projs = ["boto", "django", "django-cms", "scikit_learn", "tornado"]

train_docs = [doc for proj in train_projs for doc in proj2docs[proj]]

# Collect some statistics on training data
print(f"Number of projects: {len(train_projs)}")
print(f"Number of docs: {len(train_docs)}")
print(f"Number of tokens: {sum(len(doc) for doc in train_docs)}")
print(f"Number of unique tokens: {len(set(token for doc in train_docs for token in doc))}")


Number of projects: 5
Number of docs: 3245
Number of tokens: 2838323
Number of unique tokens: 113399


# Unigram language model

In [3]:
# language model interface
class LanguageModel:
    def train(self, docs: List[List[str]]):
        """Trains the language model on the given documents."""
        raise NotImplementedError()

    def prob_next(self, context: List[str], token: str) -> float:
        """Returns the probability of the next token in the given context."""
        raise NotImplementedError()

    def prob_sentence(self, sentence: List[str]) -> float:
        """Returns the probability of the given sentence."""
        assert len(sentence) > 0
        context = []
        prob = 1.0
        for token in sentence:
            prob *= self.prob_next(context, token)
            context.append(token)
        return prob
    
    def generate_next(self, context: List[str]) -> str:
        """Generates the next token given the context."""
        raise NotImplementedError()

In [4]:
# unigram language model
class UnigramLanguageModel(LanguageModel):
    def __init__(self):
        self.counter = collections.Counter()
        self.total = 0

    def train(self, docs: List[List[str]]):
        for doc in docs:
            for token in doc:
                self.counter[token] += 1
            self.total += len(doc)

    def prob_next(self, context: List[str], token: str) -> float:
        return self.counter[token] / self.total
    
    def generate_next(self, context: List[str]) -> str:
        return random.choices(list(self.counter.keys()), weights=list(self.counter.values()))[0]
    
unigram_lm = UnigramLanguageModel()
unigram_lm.train(train_docs)

In [5]:
# some sanity checks
print(f'{unigram_lm.prob_next(["def"], "main")=}')
print(f'{unigram_lm.prob_next(["def", "main"], "(")=}')
print(f'{unigram_lm.prob_next(["def", "main", "("], ")")=}')
print(f'{unigram_lm.prob_next(["def", "main", "("], "]")=}')   

unigram_lm.prob_next(["def"], "main")=5.143882496812378e-05
unigram_lm.prob_next(["def", "main"], "(")=0.07494319709208572
unigram_lm.prob_next(["def", "main", "("], ")")=0.07494319709208572
unigram_lm.prob_next(["def", "main", "("], "]")=0.01643082904940699


In [6]:
# generate some code from it
# ... but how do we end? No idea. For now, let's just generate 100 tokens
context = []
for _ in range(100):
    token = unigram_lm.generate_next(context)
    context.append(token)
    print(token, end=" ")

h the . obj path , for ( assertTrue , returned algorithm global_page less_equal None input_type , : ( if ( date_format ( ) ( forms that delegate new_http_connection path . ( ( = if logical_not ) _ , ) [ params terminate in self . 0.18 upload_to else try def self , ] X ] snapshot_ids ] , labels. custom_qs ( get_fields_from_path weights_init string np + self [ page between . alias moderate : y = len ( [ , = fit 'language' == 32 do callable_default_value ) ':' ( ( get_key assertTrue : Samples k "home" ) , 

# Bigram language model

In [7]:
# bigram language model
class BigramLanguageModel(LanguageModel):
    def __init__(self):
        self.bos = "<s>"
        self.eos = "</s>"
        self.ctx2counter: Dict[Tuple[str, ...], collections.Counter] = collections.defaultdict(collections.Counter)
    
    def _normalize_context(self, context: List[str]) -> Tuple[str, ...]:
        if len(context) == 0:
            return (self.bos,)
        return tuple(context[-1:])

    def train(self, docs: List[List[str]]):
        for doc in docs:
            context = []
            for token in doc:
                self.ctx2counter[self._normalize_context(context)][token] += 1
                context.append(token)
            self.ctx2counter[self._normalize_context(context)][self.eos] += 1

    def prob_next(self, context: List[str], token: str) -> float:
        return self.ctx2counter[self._normalize_context(context)][token] / sum(self.ctx2counter[self._normalize_context(context)].values())

    def generate_next(self, context: List[str]) -> str:
        return random.choices(list(self.ctx2counter[self._normalize_context(context)].keys()), weights=list(self.ctx2counter[self._normalize_context(context)].values()))[0]
    
bigram_lm = BigramLanguageModel()
bigram_lm.train(train_docs)

In [9]:
# some sanity checks
print(f'{bigram_lm.prob_next(["def"], "main")=}')
print(f'{bigram_lm.prob_next(["def", "main"], "(")=}')
print(f'{bigram_lm.prob_next(["def", "main", "("], ")")=}')
print(f'{bigram_lm.prob_next(["def", "main", "("], "]")=}')

bigram_lm.prob_next(["def"], "main")=0.001558846453624318
bigram_lm.prob_next(["def", "main"], "(")=0.5821917808219178
bigram_lm.prob_next(["def", "main", "("], ")")=0.11072195869551932
bigram_lm.prob_next(["def", "main", "("], "]")=0.0


In [10]:
# generate some code from it
# we generate at most 100 tokens, or stop at eos
context = []
for _ in range(100):
    token = bigram_lm.generate_next(context)
    if token == bigram_lm.eos:
        break
    context.append(token)
    print(token, end=" ")



# Use logprob and smoothing, and generic n-gram models

In [11]:
# language model interface, with logprob
class LanguageModel:
    def train(self, docs: List[List[str]]):
        """Trains the language model on the given documents."""
        raise NotImplementedError()

    def logprob_next(self, context: List[str], token: str) -> float:
        """Returns the log probability of the next token in the given context."""
        raise NotImplementedError()
    
    def prob_next(self, context: List[str], token: str) -> float:
        """Returns the probability of the next token in the given context."""
        return np.exp(self.logprob_next(context, token))

    def logprob_sentence(self, sentence: List[str]) -> float:
        """Returns the log probability of the given sentence."""
        if len(sentence) == 0:
            return 0.0
        context = []
        logprob = 0.0
        for token in sentence:
            logprob += self.logprob_next(context, token)
            context.append(token)
        return logprob
    
    def prob_sentence(self, sentence: List[str]) -> float:
        """Returns the probability of the given sentence."""
        return np.exp(self.logprob_sentence(sentence))
    
    def generate_next(self, context: List[str]) -> str:
        """Generates the next token given the context."""
        raise NotImplementedError()

In [12]:
# generic n-gram language model
class NgramLanguageModel(LanguageModel):
    def __init__(
        self,
        n: int,
    ):
        assert n > 0
        self.n = n

        self.bos = "<s>"
        self.eos = "</s>"
        self.ctx2counter: Dict[Tuple[str, ...], collections.Counter] = (
            collections.defaultdict(collections.Counter)
        )
        self.vocab = {self.bos, self.eos}
        self.vocab_list = None

    def _normalize_context(self, context: List[str]) -> Tuple[str, ...]:
        if self.n == 1:
            return tuple()
        used_context = context[-self.n + 1 :]
        if len(used_context) < self.n - 1:
            used_context = [self.bos] * (self.n - 1 - len(used_context)) + used_context
        return tuple(used_context)

    def train(self, docs: List[List[str]]):
        for doc in tqdm(docs, desc="Training"):
            context = []
            for token in doc:
                self.ctx2counter[self._normalize_context(context)][token] += 1
                context.append(token)
                self.vocab.add(token)
            self.ctx2counter[self._normalize_context(context)][self.eos] += 1
        self.vocab_list = list(self.vocab)

    def prob_next(self, context: List[str], token: str) -> float:
        context = self._normalize_context(context)
        count = self.ctx2counter[context][token]
        if count == 0:
            # last resort to prevent zero prob
            return 1 / sum(self.ctx2counter[context].values())
        else:
            return count / sum(self.ctx2counter[context].values())

    def logprob_next(self, context: List[str], token: str) -> float:
        return np.log(self.prob_next(context, token))

    def generate_next(self, context: List[str]) -> str:
        return random.choices(list(self.ctx2counter[self._normalize_context(context)].keys()), weights=list(self.ctx2counter[self._normalize_context(context)].values()))[0]

    
class NgramLanguageModelWithAddOneSmoothing(NgramLanguageModel):
    def __init__(self, n: int):
        super().__init__(n)

    def prob_next(self, context: List[str], token: str) -> float:
        context = self._normalize_context(context)
        return (self.ctx2counter[context][token] + 1) / (sum(self.ctx2counter[context].values()) + len(self.vocab))

    def generate_next(self, context: List[str]) -> str:
        return random.choices(self.vocab_list, weights=[self.prob_next(context, token) for token in self.vocab_list])[0]


class NgramLanguageModelWithBackoff(NgramLanguageModel):
    def __init__(self, n: int, alpha: float = 0.4):
        super().__init__(n)
        self.alpha = alpha
        if self.n > 1:
            self.backoff_lm = NgramLanguageModelWithBackoff(n - 1, alpha)

    def train(self, docs: List[List[str]]):
        super().train(docs)
        if self.n > 1:
            self.backoff_lm.train(docs)

    def prob_next(self, context: List[str], token: str) -> float:
        context = self._normalize_context(context)
        count = self.ctx2counter[context][token]
        if count == 0:
            if self.n > 1:
                return self.alpha * self.backoff_lm.prob_next(context, token)
            else:
                return 1 / sum(self.ctx2counter[context].values())
        else:
            return count / sum(self.ctx2counter[context].values())


In [13]:
# bigram model, add-one smoothing
bigram_lm_a1 = NgramLanguageModelWithAddOneSmoothing(n=2)
bigram_lm_a1.train(train_docs)
print(f'{bigram_lm_a1.prob_next(["def"], "main")=}')
print(f'{bigram_lm_a1.prob_next(["def", "main"], "(")=}')
print(f'{bigram_lm_a1.prob_next(["def", "main", "("], ")")=}')
print(f'{bigram_lm_a1.prob_next(["def", "main", "("], "]")=}')

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

bigram_lm_a1.prob_next(["def"], "main")=0.00028306405957409747
bigram_lm_a1.prob_next(["def", "main"], "(")=0.0007573956159123535
bigram_lm_a1.prob_next(["def", "main", "("], ")")=0.07222321028842675
bigram_lm_a1.prob_next(["def", "main", "("], "]")=3.066412358868371e-06


In [14]:
context = []
for _ in range(100):
    token = bigram_lm_a1.generate_next(context)
    if token == bigram_lm_a1.eos:
        break
    context.append(token)
    print(token, end=" ")

eugene@lazutkin.com) test_ovo_decision_function $(\gamma Intrusion name="date_year" 'DBInstance:%s' n_samples_b test_get_support paragraph value="2324">Lovrenc authorize r_table "building render_revalidation_failure "port" '\d+\s+dir\s+(\d+)' (25.0, sqs 'endifchanged' _dummy_thread "both" 'Embedding dash maxs: ComplexSortedPerson HeaderInfoMap default_list 1j)) 'WB' penalty=%s, '''Shuffle-Group(s)-Out combine bee_set test02_bad_query "placeholders" ramdisk_ids pcontributor family "'spherical', test_learning_curve_with_boolean_indices OBJ_TAG_RE "2.1 'FW' pdp a=alpha pri DjangoAdminDefaultSettings "strs RUNTESTS_DIR save_linecache_getlines old_slug anim TransactionRollbackTests comment-moderation ud_url 'templates' 'delayed' 'isbn' self.%s' "sep" 'cache _.*:.*$\n' deciding u"Kinnula" '2a02::223:6cff:fe8a:2e8a' 'elasticloadbalancing.eu-west-1.amazonaws.com' get_urlconf "212-634-5789" r'j.m.Y' __serving Shuffle DEFAULT_ROOT 'MULTILINESTRING' nonzero_bic default_with_prefix_view test_model

In [15]:
# bigram model, backoff
bigram_lm_bo = NgramLanguageModelWithBackoff(n=2)
bigram_lm_bo.train(train_docs)
print(f'{bigram_lm_bo.prob_next(["def"], "main")=}')
print(f'{bigram_lm_bo.prob_next(["def", "main"], "(")=}')
print(f'{bigram_lm_bo.prob_next(["def", "main", "("], ")")=}')
print(f'{bigram_lm_bo.prob_next(["def", "main", "("], "]")=}')

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

bigram_lm_bo.prob_next(["def"], "main")=0.001558846453624318
bigram_lm_bo.prob_next(["def", "main"], "(")=0.5821917808219178
bigram_lm_bo.prob_next(["def", "main", "("], ")")=0.11072195869551932
bigram_lm_bo.prob_next(["def", "main", "("], "]")=0.006564826180475006


In [16]:
context = []
for _ in range(100):
    token = bigram_lm_bo.generate_next(context)
    if token == bigram_lm_bo.eos:
        break
    context.append(token)
    print(token, end=" ")

import PROVINCE_CHOICES = True ) ) self . exc_info ) alpha , 'blank' : os import scipy . assertEqual ( BaseEstimator ) simple_only_unlimited_args ( 'DC' , '/test_admin/admin/secure-view/' , accept_sparse = ResultSet from time ( 1 ] ) for X = _partition_estimators ( BinaryZlibFile ) : x , { } ) class AutoBatchingMixin , 'unique' : "(('publisher_is_draft', 'language', 'page'),)" } ) : 'True' } ) if mod = clf . generic ) , 'TIME_FORMAT' , value , '1234' ) : 'True' } fields import SSL . time ( url class TestU ( res in elem , : ( ) spectrum_ [ 

In [17]:
# trigram model, backoff
trigram_lm_bo = NgramLanguageModelWithBackoff(n=3)
trigram_lm_bo.train(train_docs)
print(f'{trigram_lm_bo.prob_next(["def"], "main")=}')
print(f'{trigram_lm_bo.prob_next(["def", "main"], "(")=}')
print(f'{trigram_lm_bo.prob_next(["def", "main", "("], ")")=}')
print(f'{trigram_lm_bo.prob_next(["def", "main", "("], "]")=}')

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

trigram_lm_bo.prob_next(["def"], "main")=0.0006235385814497272
trigram_lm_bo.prob_next(["def", "main"], "(")=1.0
trigram_lm_bo.prob_next(["def", "main", "("], ")")=0.6705882352941176
trigram_lm_bo.prob_next(["def", "main", "("], "]")=0.0026259304721900027


In [18]:
context = []
for _ in range(100):
    token = trigram_lm_bo.generate_next(context)
    if token == trigram_lm_bo.eos:
        break
    context.append(token)
    print(token, end=" ")

In [19]:
# 4-gram model, backoff
fourgram_lm_bo = NgramLanguageModelWithBackoff(n=4)
fourgram_lm_bo.train(train_docs)
print(f'{fourgram_lm_bo.prob_next(["def"], "main")=}')
print(f'{fourgram_lm_bo.prob_next(["def", "main"], "(")=}')
print(f'{fourgram_lm_bo.prob_next(["def", "main", "("], ")")=}')
print(f'{fourgram_lm_bo.prob_next(["def", "main", "("], "]")=}')

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

fourgram_lm_bo.prob_next(["def"], "main")=0.0002494154325798909
fourgram_lm_bo.prob_next(["def", "main"], "(")=0.4
fourgram_lm_bo.prob_next(["def", "main", "("], ")")=0.5263157894736842
fourgram_lm_bo.prob_next(["def", "main", "("], "]")=0.0010503721888760011


In [20]:
context = []
for _ in range(100):
    token = fourgram_lm_bo.generate_next(context)
    if token == fourgram_lm_bo.eos:
        break
    context.append(token)
    print(token, end=" ")

from __future__ import absolute_import , division , with_statement import unittest from . models import CMSPlugin , Page class CustomTemplateView ( generic . CreateView ) : model = A self . assertTrue ( in response . items ( ) : clf = LogisticRegression ( random_state = 0 ) , sample_weight , random_state ) : if mode == 3 : raise TemplateSyntaxError ( "%r tag takes at least these parameters: required -- Boolean that specifies whether the field is required. * birthday * This field is required. * birthday * This field is required. * birthday * This field is required.</li></ul> <p><label 

In [21]:
# 5-gram model, backoff
fivegram_lm_bo = NgramLanguageModelWithBackoff(n=5)
fivegram_lm_bo.train(train_docs)
print(f'{fivegram_lm_bo.prob_next(["def"], "main")=}')
print(f'{fivegram_lm_bo.prob_next(["def", "main"], "(")=}')
print(f'{fivegram_lm_bo.prob_next(["def", "main", "("], ")")=}')
print(f'{fivegram_lm_bo.prob_next(["def", "main", "("], "]")=}')

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

fivegram_lm_bo.prob_next(["def"], "main")=9.976617303195638e-05
fivegram_lm_bo.prob_next(["def", "main"], "(")=0.16000000000000003
fivegram_lm_bo.prob_next(["def", "main", "("], ")")=0.21052631578947367
fivegram_lm_bo.prob_next(["def", "main", "("], "]")=0.0004201488755504005


In [22]:
context = []
for _ in range(100):
    token = fivegram_lm_bo.generate_next(context)
    if token == fivegram_lm_bo.eos:
        break
    context.append(token)
    print(token, end=" ")

from django . db import models from cms . plugins . text . models import Text from cms . plugins . link . models import * class Migration : depends_on = ( ( "cms" , "0019_public_table_renames" ) , ) def forwards ( self , orm ) : db . rename_column ( "cmsplugin_picturepublic" , "publiccmsplugin_ptr_id" , "cmspluginpublic_ptr_id" ) db . alter_column ( 'cmsplugin_video' , 'movie' , orm [ 'video.video:movie' ] ) models = { : { : ( 'models.BooleanField' , [ '_("can edit")' ] , { 'default' : 'False' , 'blank' : 'True' } ) , : ( 'django.db.models.fields.PositiveIntegerField' , [ 

# Perplexity

In [23]:
def perplexity(lm: LanguageModel, docs: List[List[str]]) -> float:
    logprob = 0
    num_tokens = 0
    for doc in tqdm(docs, desc="Evaluating"):
        logprob += lm.logprob_sentence(doc)
        num_tokens += len(doc)
    return np.exp(-logprob / num_tokens)


In [30]:
test_projs_unseen = ["reddit"]
test_projs_seen = ["scikit_learn"]

test_docs_unseen = random.sample([doc for proj in test_projs_unseen for doc in proj2docs[proj]], 50)
test_docs_seen = random.sample(train_docs, 50)

In [25]:
unigram_lm = NgramLanguageModel(n=1)
unigram_lm.train(train_docs)

Training:   0%|          | 0/3245 [00:00<?, ?it/s]

In [31]:
print(f'{perplexity(unigram_lm, test_docs_seen)=}')
print(f'{perplexity(unigram_lm, test_docs_unseen)=}')


Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(unigram_lm, test_docs_seen)=np.float64(479.11326782036645)


Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(unigram_lm, test_docs_unseen)=np.float64(760.7371052845356)


In [32]:
print(f'{perplexity(bigram_lm_bo, test_docs_seen)=}')
print(f'{perplexity(bigram_lm_bo, test_docs_unseen)=}')

Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(bigram_lm_bo, test_docs_seen)=np.float64(39.96088947744293)


Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(bigram_lm_bo, test_docs_unseen)=np.float64(442.8792847881771)


In [33]:
print(f'{perplexity(trigram_lm_bo, test_docs_seen)=}')
print(f'{perplexity(trigram_lm_bo, test_docs_unseen)=}')

Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(trigram_lm_bo, test_docs_seen)=np.float64(5.367373097777401)


Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(trigram_lm_bo, test_docs_unseen)=np.float64(552.3271950520636)


In [34]:
print(f'{perplexity(fourgram_lm_bo, test_docs_seen)=}')
print(f'{perplexity(fourgram_lm_bo, test_docs_unseen)=}')

Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(fourgram_lm_bo, test_docs_seen)=np.float64(2.694121141376752)


Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(fourgram_lm_bo, test_docs_unseen)=np.float64(1069.53078960525)


In [35]:
print(f'{perplexity(fivegram_lm_bo, test_docs_seen)=}')
print(f'{perplexity(fivegram_lm_bo, test_docs_unseen)=}')

Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(fivegram_lm_bo, test_docs_seen)=np.float64(1.737392014245932)


Evaluating:   0%|          | 0/50 [00:00<?, ?it/s]

perplexity(fivegram_lm_bo, test_docs_unseen)=np.float64(2358.36844153302)
