# The Naturalness of Code: Analyzing Code at Token Level

In the last lecture we considered two very basic analyses (counting lines of code, and detecting code clones) at character level, by splitting lines. Since our clone analysis looks at lines, it can be very easily fooled simply by adding spurious whitespace (e.g. lines breaks). For example, here is our example function from the last lecture.

In [None]:
code1 = """
public class Foo {
  public void foo(int x) {
    System.out.println("Hello Clone!");
    int j = 10;
    for(int i = 0; i < x; i++) {
      System.out.println("Another iteration");
    }
  }
}
"""

Here is a method in a different class that contains exactly the same code, but has some changes to whitespace.

In [None]:
code2 = """
public class Bar {
  public void bar(int x) {
    System.out.
            println("Hello Clone!");
    int j=10;
    for(int i = 0; 
        i < x;
        i++) {
        System.out.println("Another iteration");
    }
  }
}
"""

Let's have a look what our clone analysis tells us about these two files. For this we need to reproduce the functions we used last time. The first function splits the source code into lines, but ignores empty lines, lines that contain only braces, or comment lines.

In [None]:
def get_lines(code):
    lines = [l.replace("}", "").replace("{", "").strip() for l in code.split("\n")]
    code_lines = [l for l in lines if l and not l.startswith("//")]

    return code_lines

The resulting lines are compared directly.

In [None]:
def compare_lines(lines1, lines2):    
    matrix = []
    
    for line1 in lines1:
        row = []
        for line2 in lines2:
            row.append(1 if line1 == line2 else 0)
            
        matrix.append(row)
                
    return matrix

A clone is found if there are diagonals of `1`s in the matrix produced by `compare_lines`. We can get the length of such a diagonal for a given location as follows.

In [None]:
def get_block_at(matrix, x, y):
    block = []
    
    while (x < len(matrix) and y < len(matrix[x]) and matrix[x][y]):
        block.append((x, y))
        x += 1
        y += 1
    
    return block

To get all diagonals of a minimum size we used the following function.

In [None]:
def get_blocks(matrix, min_size = 5):
    blocks = []
    covered = set()
    
    width = len(matrix)
    height = len(matrix[0])
    
    for x in range(width):
        for y in range(height):
            if (x, y) in covered:
                continue
                
            block = get_block_at(matrix, x, y)
            if len(block) >= min_size:
                blocks.append(block)
                for (bx, by) in block:
                    covered.add((bx, by))
    
    return blocks

Finally, here is the output function that shows us our clones.

In [None]:
def print_clones(code1, code2):
    lines1 = get_lines(code1)
    lines2 = get_lines(code2)
    
    matrix = compare_lines(lines1, lines2)
    clones = get_blocks(matrix)
    
    for clone in clones:
        print("Code in snippet 1:")
        for i, j in clone:
            print(str(i + 1).rjust(3, ' '), ':', lines1[i])

        print("Code in snippet 2:")
        for i, j in clone:
            print(str(j + 1).rjust(3, ' '), ':', lines2[j])
        print("\n")

Can a clone be found by comparing `code1` and `code2`?

In [None]:
print_clones(code1, code2)

As expected, no clones were found. Although our `get_lines` function removes whitespace at the beginning and the end of lines, it does not look at whitespace within lines. One idea to improve our clone analysis would therefore be to not look at entire lines, but at _words_ that are separated by whitespaces.

## Splitting source code into words

In [None]:
code1.split()

We can easily adapt our clone analysis from using lines to the words produced by the `split` function.

In [None]:
def print_clones(code1, code2):
    lines1 = code1.split()
    lines2 = code2.split()
    
    matrix = compare_lines(lines1, lines2)
    clones = get_blocks(matrix)
    
    for clone in clones:
        print("Code in snippet 1:")
        for i, j in clone:
            print(str(i + 1).rjust(3, ' '), ':', lines1[i])

        print("Code in snippet 2:")
        for i, j in clone:
            print(str(j + 1).rjust(3, ' '), ':', lines2[j])
        print("\n")

Any luck?

In [None]:
print_clones(code1, code2)

It found something! However, the first clone is not really interesting, it's just because our minimum size of 3 probably is too low when looking at words rather than lines. The second clone is more interesting: the entire `for`-loop is now detected as a clone, which indeed it is. However, the two lines preceding the loop are not included. The reason is that natural text is separated into words with white spaces, but source code isn't (only). There are also special syntactical variants such as braces etc. In our example, `System.out.println` is not split into multiple words, even though it has multiple components from the point of view of a compiler reading the source code. Similarly, `int j=10` should be more than two words (`int`, `j=10`) -- ideally, the same number of words as `int j = 10` (`int`, `j`, `=`, `10`).

There's another problem. Recall that _type 2_ clones may differ in terms of literals or identifiers and should still be considered as code clones:

In [None]:
code3 = """
public class Bar {
  public void bar(int x) {
    System.out.println("Completely different text!");
    int j = 200; // completely different numbers
    for(int i = 100; i < x; i++) {
      System.out.println("More complete different text");
    }
  }
}
"""

This snippet is identical to the first snippet, execpt for variable names and literals. However, the clones we can find are not particularly interesting.

In [None]:
print_clones(code1, code3)

Although there are multiple clones, these just make us wish we had set `min_size` to something much larger than 3, because none of these clones is interesting.

To identify type 2 clones we would need to modify our clone analysis such that it compares all parts of the program except the identifiers and literals. But how can our analysis know what are variables and literals, and how can we get around the problem that words are not always separated by whitespace?

## Lexing Source Code

Source code is processed by a compiler to create an internal tree-representation that allows it to translate it to another language (e.g. assembly), or to interpret it directly. The analysis phase of a compiler consists of two parts: A low-level part called a lexical analyser (mathematically, a finite automaton based on a regular grammar), and a high-level part called a syntax analyser, or parser (mathematically, a push-down automaton based on a context-free grammar, or BNF). Today, we will consider the first part, the lexical analysis.

A lexer identifies substrings of the source program that belong together; these substrings are called *lexemes*.

For example, given the string `for(int i = 0; i < x; i++) {` we would like to build a lexer that outputs the following lexemes:
- `for`
- `(`
- `int`
- `i`
- `=`
- `0`
- `;`
- `i`
- `<`
- `x`
- `;`
- `i`
- `++`
- `)`
- `{`

Some of the following examples are based on https://medium.com/@pythonmembers.club/building-a-lexer-in-python-a-tutorial-3b6de161fe84

We will start by producing lexemes that separate strings on whitespaces. A simple way to do this would be to simply iterate over a string and store a lexeme whenever we encounter whitespace:

In [None]:
string = 'I love software analysis'
white_space = ' '
lexemes = []

lexeme = ''
for i,char in enumerate(string):
    lexeme += char
    if (i+1 < len(string)):
        if string[i+1] == white_space:
            lexemes.append(lexeme)
            lexeme = ''

lexemes

One issue here is that our string does not end in whitespace, so we need to always add the final lexeme:

In [None]:
string = 'I love software analysis'
white_space = ' '
lexemes = []

lexeme = ''
for i,char in enumerate(string):
    lexeme += char
    if (i+1 < len(string)):
        if string[i+1] == white_space:
            lexemes.append(lexeme)
            lexeme = ''

if lexeme:
    lexemes.append(lexeme) 

lexemes

We are still including the whitespace in our lexemes, which we should avoid really.

In [None]:
string = 'I love software analysis'
white_space = ' '
lexemes = []

lexeme = ''
for i,char in enumerate(string):
    if char != white_space:
        lexeme += char
    if (i+1 < len(string)):
        if string[i+1] == white_space:
            lexemes.append(lexeme)
            lexeme = ''

if lexeme:
    lexemes.append(lexeme) 

lexemes

We've thus covered lexemes separated by whitespace, but not those separated by syntactical structures of source code. What we need is to define *keywords* that allow our lexer to identify when lexemes represent special syntactical source code elements. Keywords include reserved words like `public`, `class`, but we will treat symbols such as `(` or `{` the same way.

In [None]:
symbols = ['{', '}', '(', ')', '[', ']', '.', '"', '*', '\n', ':', ',', ';', '=']

In [None]:
keywords = ['public', 'class', 'void', 'main', 'String', 'int', 'for', '++']

In [None]:
KEYWORDS = symbols + keywords

In [None]:
white_space = [' ', '\t', '\n']

In [None]:
lexemes = []
string = code1

lexeme = ''
for i,char in enumerate(string):
    if char not in white_space:
        lexeme += char
        
    if (i+1 < len(string)):
        if string[i+1] in white_space or string[i+1] in KEYWORDS or lexeme in KEYWORDS:
            if lexeme:
                lexemes.append(lexeme)
            lexeme = ''

if lexeme:
    lexemes.append(lexeme) 

In [None]:
lexemes

Let's put this in a function.

In [None]:
def tokenize(code):
    lexemes = []
    lexeme = ""
    for i,char in enumerate(code):
        if char not in white_space:
            lexeme += char
        if (i+1 < len(code)):
            if code[i+1] in white_space or code[i+1] in KEYWORDS or lexeme in KEYWORDS:
                if lexeme:
                    lexemes.append(lexeme)
                    lexeme = ''
    if lexeme:
        lexemes.append(lexeme)
    return lexemes

Let's compare the lexemes for our two variants of the same code.

In [None]:
lexemes1 = tokenize(code1)
lexemes2 = tokenize(code2)

for i in range(min(len(lexemes1), len(lexemes2))):
    print(lexemes1[i].ljust(20, ' '), lexemes2[i])

This looks promising, so let's adapt our clone detection to use our lexer.

In [None]:
def print_clones(code1, code2):
    lexemes1 = tokenize(code1)
    lexemes2 = tokenize(code2)
    
    matrix = compare_lines(lexemes1, lexemes2)
    clones = get_blocks(matrix, 20) # more than 3 
    
    for clone in clones:
        print("Code in snippet 1:")
        for i, j in clone:
            print(str(i + 1).rjust(3, ' '), ':', lexemes1[i])

        print("Code in snippet 2:")
        for i, j in clone:
            print(str(j + 1).rjust(3, ' '), ':', lexemes2[j])
        print("\n")

In [None]:
print_clones(code1, code2)

Our clone detection now matches the entire code of the two variants of the code snippet.

However, let's consider a type 2 clone:

In [None]:
code3 = """
public class Bar {
  public void bar(int x) {
    System.out.println("This is a different string!");
    int j = 50;
    for(int i = 100; i < x; i++) {
      System.out.println("Yet some more different text");
    }
  }
}
"""

In [None]:
print_clones(code1, code3)

As expected, no code clones were detected because the strings and numbers are different. An obvious way to fix this would be to replace all strings and numbers with some fixed values. However, how do we know which of our lexemes represent strings and numbers?

## From lexemes to tokens

Lexemes match a character pattern, which is associated with a lexical category called a *token*. A token is the name for a set of lexemes, all of which have the same grammatical significance for the parser. 

We define a token as a named tuple that tells us the lexeme (its value), the type of token, and its position in the source code.

In [None]:
from collections import namedtuple
Token = namedtuple('Token', ['value', 'type', 'line', 'col'])

For our code examples, we might want to distinguish the following token types:

In [None]:
from enum import Enum
class TokenType(Enum):
    INT = 1
    STRING = 2
    KEYWORD = 3
    SYNTAX = 4
    IDENTIFIER = 5

The tokenizer needs to distinguish token types based on the characters encountered.

In [None]:
def tokenize(code):
    tokens = []
    lexeme = ""
    line = 0
    col = 0
    i = 0
    while i < len(code):
        char = code[i]
        col += 1
        if char in white_space:
            if char == '\n':
                line += 1
                col = 0
        elif char in KEYWORDS:
            tokens.append(Token(char, TokenType.SYNTAX, line, col))
            lexeme = ''
        else:
            lexeme += char 
            while code[i+1] not in KEYWORDS and code[i+1] not in white_space:
                i += 1
                lexeme += code[i]
            if lexeme in KEYWORDS:
                tokens.append(Token(lexeme, TokenType.KEYWORD, line, col))
            else:
                tokens.append(Token(lexeme, TokenType.IDENTIFIER, line, col))
            lexeme = ''
        i += 1
        
    return tokens

In [None]:
tokenize(code1)

We can also identify number tokens if the first character of the lexeme is a digit, string tokens if the first character of a lexeme is a quote, and it is common to skip comments.

In [None]:
def tokenize(code):
    tokens = []
    lexeme = ""
    line = 0
    col = 0
    i = 0
    while i < len(code):
        char = code[i]
        col += 1
        if char == '/':
            if code[i+1] == '/':
                # Skip comments until end
                i += 1
                while code[i] != '\n':
                    i += 1
        elif char.isnumeric():
            lexeme += char
            while code[i+1].isnumeric():
                i += 1
                char = code[i]
                lexeme += char
            tokens.append(Token(lexeme, TokenType.INT, line, col))
            lexeme = ''
        elif char in white_space:
            if char == '\n':
                line += 1
                col = 0
        elif char == '"':
            while code[i+1] != '"':
                i += 1
                lexeme += code[i]
            i += 1
            tokens.append(Token(lexeme, TokenType.STRING, line, col))
            lexeme = ''
        elif char in KEYWORDS:
            tokens.append(Token(char, TokenType.SYNTAX, line, col))
            lexeme = ''
        else:
            
            lexeme += char 
            while code[i+1] not in KEYWORDS and code[i+1] not in white_space:
                i += 1
                lexeme += code[i]
            if lexeme in KEYWORDS:
                tokens.append(Token(lexeme, TokenType.KEYWORD, line, col))
            else:
                tokens.append(Token(lexeme, TokenType.IDENTIFIER, line, col))
            lexeme = ''
        i += 1
            
    return tokens

In [None]:
tokenize(code1)

In [None]:
tokenize(code2)

Given our new tokenizer, we can now define a function that normalizes strings and numbers by replacing them with a constant placeholder value.

In [None]:
def normalized_tokens(tokens):
    normalized_tokens = []
    for token in tokens:
        if token.type == TokenType.INT:
            normalized_tokens.append(Token("<INT>", TokenType.INT, token.line, token.col))
        elif token.type == TokenType.STRING:
            normalized_tokens.append(Token("<STR>", TokenType.STRING, token.line, token.col))
        else:
            normalized_tokens.append(token)
    
    return normalized_tokens

In [None]:
normalized_tokens(tokenize(code1))

To use this in our clone analysis we need to refine our matrix generation to look at the lexemes of the tokens, since the comparison should not consider the location.

In [None]:
def compare_tokens(tokens1, tokens2):
    matrix = []
    
    for token1 in tokens1:
        row = []
        for token2 in tokens2:
            row.append(1 if token1.value == token2.value else 0)
            
        matrix.append(row)
                
    return matrix

Finally, here's our refined clone analysis that works at token level. We also refine the analysis to print the affected lines instead of lists of tokens.

In [None]:
def print_clones(code1, code2):
    tokens1 = tokenize(code1)
    tokens2 = tokenize(code2)
    
    normalized_tokens1 = normalized_tokens(tokens1)
    normalized_tokens2 = normalized_tokens(tokens2)
   
    matrix = compare_tokens(normalized_tokens1, normalized_tokens2)
    
    clones = get_blocks(matrix, 20)
    
    for clone in clones:
        print("Clone")
        lines1 = []
        lines2 = []
        for i, j in clone:
            line = tokens1[i].line
            if line not in lines1:
                lines1.append(line)
                
            line = tokens2[i].line
            if line not in lines2:
                lines2.append(line)
        
        print("Code in snippet 1:")
        code_lines = code1.split('\n')
        for line in lines1:
            print(f"{line+1}: {code_lines[line+1]}")

        print("Code in snippet 2:")
        code_lines = code2.split('\n')
        for line in lines2:
            print(f"{line+1}: {code_lines[line+1]}")
        print("\n")

First a sanity check: Does it still work on our type 1 clone?

In [None]:
print_clones(code1, code2)

(Note that our clone detection is taking a number of shortcuts; we could improve how we are analyzing the matrix. If you reduce the `min_size` you'll currently see some redundant code clones.)

Now let's consider our type 2 clone.

In [None]:
print_clones(code1, code3)

It works! 

In practice, we wouldn't need to create a lexer by hand. Language recognition is an established problem in computer science, and compiler construction a mature topic with many supporting tools. The classical lexer generator tool is [Flex](https://github.com/westes/flex), which is based on the classic Unix utility [Lex](https://en.wikipedia.org/wiki/Lex_(software)). Tokens are specified as regular expressions, and Flex automatically generates the code that processes a character stream to generate tokens.

For Python code aiming to tokenize Java code, there is the  [javalang](https://github.com/c2nes/javalang) parser framework, which provides a tokenizer.

In [None]:
import javalang

The output in principle is similar to what our tokenizer does.

In [None]:
list(javalang.tokenizer.tokenize(code1))

It would be straightforward to adapt out clone detection to use javalang.

## Language Models

The tokenizer allows us to split source code propely into words, just like are able to do for regular text by whitespaces. 

Natural languages like English are rich and powerful, but in practice most human utterances are simple, repetitive and predictable. These utterances can be very usefully modeled using modern statistical methods. This has led to the phenomenal success of Natural Language Processing (NLP), i.e. statistical approaches to speech recognition, natural language translation, question-answering, and text mining and comprehension.

Since we can now split source code into words just like we can do for natural language, this raises the question whether we can apply NLP methods also to source code. Hindle et al. postulated that software is similarly natural, in the sense that it is created by humans at work, with all the attendant constraints and limitations, and is therefore also repetitive and predictable.

Abram Hindle, Earl T. Barr, Zhendong Su, Mark Gabel, and Premkumar Devanbu. On the naturalness of software. In 2012 34th International Conference on Software Engineering (ICSE), pages 837–847, 2012.6

The _Naturalness Hypothesis_ states that code can be usefully modeled by statistical language models, and such models can be leveraged to support software engineers. 

A language model essentially assigns a probability to an utterance. It is typically formulated in terms of conditional probabilities, where the probability of the next word in a sequence is conditioned on all previous words in the sequence. Let's take a closer look at language models in the scope of natural language processing, before moving on to see how they can be used with software.

### n-gram models

The n-gram model is a simple statistical language model. Consider the sequence of tokens in a document (in our case, a system s), $a_1 a_2 \ldots a_i \ldots a_n$. An n-gram model estimates the probability of a sequence by statistically estimating how likely tokens are to follow other tokens. Thus, we can estimate the probability of a document based on the product of a series of conditional probabilities:

$p(s) = p(a_1) \times p(a_2 | a_1) \times p(a_3 | a_1a_2) \ldots p(a_n | a_1 \ldots a_{n−1})$

A n-gram model assumes a Markov property, i.e., token occurrences are influenced only by a limited
prefix of length n, thus for 4-gram models, we assume

$p(a_i | a_1 \ldots a_{i−1}) ≊ p(a_i | a_{i−3}a_{i−2}a_{i−1})$

These models are estimated from a corpus using simple maximum-likelihood based frequency-counting of token sequences. Thus, if ∗ is a wildcard, we ask, how relatively often are the tokens a1 , a2 , a3 followed by a4:

$p(a_4 | a_1 a_2 a_3) = \frac{count(a_1 a_2 a_3 a_4)}{count(a_1 a_2 a_3 ∗)}$

We will use the well-established NLTK library for n-gram models.

In [None]:
from nltk.util import ngrams

Let's assume ab arbitary sentence in natural language.

In [None]:
string = "there is a cat licking your birthday cake"

Let's set `n=2` to start with. Using NLTK, we can extract all bigrams from our sentence easily.

In [None]:
n = 2
list(ngrams(string.split(), n))

For common values of `n` NLTK also offers functions we can directly call without specifying `n`:

In [None]:
from nltk.util import bigrams
list(bigrams(string.split()))

Note that the first (`there`) and last (`cake`) word only occur once, while all other words are part of two bigrams. In order to allow the model to capture how often sentences start with `there` and end with `cake` NLTK let's us add special padding symbols to the sentence before splitting it into n-grams.

In [None]:
from nltk.lm.preprocessing import pad_both_ends
list(bigrams(pad_both_ends(string.split(), n=2)))

To make our model more robust we could also train it on unigrams (single words) as well as bigrams, its main source of information. NLTK once again helpfully provides a function called everygrams.

In [None]:
from nltk.util import everygrams
list(everygrams(string.split(), max_len=2))

During training and evaluation our model will rely on a vocabulary that defines which words are "known" to the model. To create this vocabulary we need to pad our sentences (just like for counting ngrams) and then combine the sentences into one flat stream of words. This is done by the pipeline function.

In [None]:
from nltk.lm.preprocessing import padded_everygram_pipeline
string_tokens = ["there is a cat licking your birthday cake".split(),
                "he can't read so he does not know that the cake is not for him".split(),
                "it might be his birthday too but the chance of that is slim".split()
                ]

train, vocab = padded_everygram_pipeline(2, string_tokens)

So as to avoid re-creating the text in memory, both train and vocab are lazy iterators. They are evaluated on demand at training time.

For the sake of understanding the output of padded_everygram_pipeline, we'll "materialize" the lazy iterators by casting them into a list.

In [None]:
training_ngrams, padded_sentences = padded_everygram_pipeline(2, string_tokens)
for ngramlize_sent in training_ngrams:
    print(list(ngramlize_sent))

In [None]:
list(padded_sentences)

Having prepared our data we are ready to start training a model. As a simple example, let us train a Maximum Likelihood Estimator (MLE).

We only need to specify the highest ngram order to instantiate it.

In [None]:
from nltk.lm import MLE
lm = MLE(2)

The model initially has no content:

In [None]:
len(lm.vocab)

We need to train the model with our n-grams.

In [None]:
lm.fit(train, vocab)

In [None]:
len(lm.vocab)

We can look up vocabulary in the model, for example to check that our first sentence is contained in the model.

In [None]:
lm.vocab.lookup(string_tokens[0])

If we lookup the vocab on unseen sentences not from the training data,  NLTK automatically replace words not in the vocabulary with `<UNK>`.

In [None]:
lm.vocab.lookup('there is a cat licking your birthday foo'.split())

When it comes to ngram models the training boils down to counting up the ngrams from the training corpus.

In [None]:
print(lm.counts)

We can check how often individual unigrams occur.

In [None]:
lm.counts["licking"]

In [None]:
lm.counts["birthday"]

We can also check how often bigrams occur.

In [None]:
lm.counts[["might"]]["be"]

The real purpose of training a language model is to have it score how probable words are in certain contexts. This being MLE, the model returns the item's relative frequency as its score.

In [None]:
lm.score("licking")

In [None]:
lm.score("birthday")

In [None]:
lm.score("be", ["might"])

Items that are not seen during training are mapped to the vocabulary's "unknown label" token. All unknown tokens have the same probability.

In [None]:
lm.score("<UNK>") == lm.score("foo")

In [None]:
lm.score("<UNK>") == lm.score("bar")

To avoid underflow when working with many small score values it makes sense to take their logarithm. For convenience this can be done with the logscore method.

In [None]:
lm.logscore("licking")

In [None]:
lm.logscore("birthday")

In [None]:
lm.logscore("be", ["might"])

## Is Software Natural?

Now that we know what a language model is, let's return to software. The core of the naturalness hypothesis is, that software is similarly repetitive and predictable as natural language.

To determine how predictable a language is, a statistical language model, estimated carefully from a representative corpus, can be evaluated in terms of their _perplexity_ with respect to the contents of a new document drawn from the same population. A good model can guess the contents of the new document with very high probability; i.e., it will not find the new document particularly surprising or perplexing. 

The perplexity of a language model on a test set is the inverse probability of the test set, normalised by the number of words: $PP(W) = P(w_1w_2...w_N)^{-\frac{1}{N}}$

$PP(W) = \sqrt[N]{\prod_{i=1}^N{\frac{1}{P(w_i|w_{i-1})}}}$


Perplexity can also be seen as the weighted average branching factor of a language, i.e., the number of possible next words that can follow any word.

It is common to use the log-transformed variant of perplexity, called _cross entropy_:

$H(s)=-\frac{1}{N}log(P(a_1...a_n)$

NLTK of course offers a means to calculate the cross entropy. Let's first pick a dataset.

In [None]:
import nltk
from nltk.corpus import brown

# Might be necessary the first time:
# nltk.download('brown')

The Brown Corpus was the first million-word electronic corpus of English, created in 1961 at Brown University. This corpus contains text from 500 sources.

In [None]:
len(brown.words())

In NLP it is common to apply various preprocessing steps before training a language model. We will keep it simple and just build a corpus of lower case versions of the words in the brown corpus.

In [None]:
brown = nltk.corpus.brown
corpus = [[word.lower() for word in sent] for sent in brown.sents()]

In [None]:
corpus[0]

Let's split the dataset into 95% training data, and 5 test data.

In [None]:
split = int(95*len(corpus)/100)
train = corpus[:split]
test  = corpus[split:]

Now we can build a language model as we did previously, using a maximum likelihood estimator.

In [None]:
n = 2
train_data, padded_sents = padded_everygram_pipeline(n, train)

In [None]:
lm = MLE(n)

In [None]:
lm.fit(train_data, padded_sents)

To calculate the perplexity, we can use NLTK. The perplexity function in NLTK expects a list of n-grams as test set.

In [None]:
from nltk.lm.preprocessing import padded_everygrams
from nltk.lm.preprocessing import flatten

test_data = list(flatten(padded_everygrams(n, sent) for sent in test))

In [None]:
lm.perplexity(test_data)

We can also calculate the log-transformed version of perplexity, the cross-entropy:

In [None]:
lm.entropy(test_data)

Whoops, infinitely surprised?

This is a problem of data sparsity: Some n-grams may never occur in one corpus, but may in fact occur elsewhere. Consequently there may be some n-grams in the test data that are not in the training data.

Smoothing is a technique to handle cases we where have not seen the n-grams yet and still produce usable results with sufficient statistical rigor. There exist a variety of techniques for smoothing the estimates of a very large number of coefficients, some of which are larger than they should be and others smaller. 

The simplest smoothing technique is Laplace smoothing, which adds 1 to the count for every n-gram. In practice, this is not a recommended approach, and there are more sophisticated smoothing techniques such as Good-Turing estimates, Jelinek-Mercer smoothing, Katz smoothing, Witten-Bell smoothing, Absolute discounting, Kneser-Ney smoothing, Modified Kneser Ney smoothing, and others.

In [None]:
from nltk.lm import Laplace

In [None]:
n = 3
train_data, padded_sents = padded_everygram_pipeline(n, train)

In [None]:
brown_model = Laplace(n) 
brown_model.fit(train_data, padded_sents)

Let's first calculate the perplexity.

In [None]:
brown_model.perplexity(test_data)

...and now the cross entropy.

In [None]:
brown_model.entropy(test)

Hindle et al. evaluated the cross entropy for different values of `n` on the Brown and the Gutenberg corpus. We will replicate this experiment, but to keep the computation time down we'll skip the Gutenberg corpus and only use small values for `n`, and no cross-validation. It is worth noting, however, that the perplexity of two language models is only _directly_ comparable if they use identical vocabularies.

In [None]:
for n in range(1,5):
    train_data, padded_sents = padded_everygram_pipeline(n, train)
    brown_model = Laplace(n) 
    brown_model.fit(train_data, padded_sents)
    entropy = brown_model.entropy(test_data)
    print(f"n = {n}: {entropy}")

To see whether software is similar, we need a corpus of source code. Unfortunately, NLTK does not provide this for us. We will thus use an existing corpus provided by others.

In [None]:
# This may take a while so is commented out
#!wget https://s3.amazonaws.com/code2seq/datasets/java-small.tar.gz

We will only need the lexemes rather than the full tokens, so let's define a helper function for this.

In [None]:
def tokenize(code):
    try:
        tokens = [token.value for token in javalang.tokenizer.tokenize(code)]
    except:
        # Parse errors may occur
        return []
    return tokens

We use this to create a training and test corpus, where a "sentence" is represented as the tokenized version of a Java source code file.

In [None]:
import tarfile

java_training = []
java_test = []
with tarfile.open("java-small.tar.gz", "r") as f:
    for tf in f.getmembers():
        if tf.isfile() and tf.name.startswith("java-small/training"):
            f2=f.extractfile(tf)
            content=f2.read()
            java_training.append(tokenize(content))
        elif tf.isfile() and tf.name.startswith("java-small/test"):
            f2=f.extractfile(tf)
            content=f2.read()
            java_test.append(tokenize(content))

len(java_training)

In [None]:
java_test_data = list(flatten(padded_everygrams(n, sent) for sent in java_test if sent))

Given this dataset, the steps to create a language model are identical to those for a natural language text.

In [None]:
for n in range(1,5):
    train_data, padded_sents = padded_everygram_pipeline(n, java_training)
    java_model = Laplace(n) 
    java_model.fit(train_data, padded_sents)
    entropy = java_model.entropy(java_test_data)
    print(f"n = {n}: {entropy}")

## Stopwords

In NLP it is common to remove stopwords before processing data. In our experiments we did not do this, and in particular there is the question what this means for source code: Intuitively, source code contains quite a substantial amount of syntactical overhead. The effects of this have been investigated in the following paper:

Rahman, M., Palani, D., & Rigby, P. C. (2019, May). Natural software revisited. In 2019 IEEE/ACM 41st International Conference on Software Engineering (ICSE) (pp. 37-48). IEEE.

Lets also have a closer look at this. First we compare the language model on the Brown corpus with / without stopwords. We first build a 3-gram model with stopwords.

In [None]:
corpus = [[word for word in sent] for sent in brown.sents()]
split = int(95*len(corpus)/100)
train = corpus[:split]
test = corpus[split:]

In [None]:
n = 3
train_data, padded_sents = padded_everygram_pipeline(n, train)

lm_with = Laplace(n) 
lm_with.fit(train_data, padded_sents)

In [None]:
test_data = list(flatten(padded_everygrams(n, sent) for sent in test))

In [None]:
lm_with.entropy(test)

Now we build a pre-processed version of the corpus.

In [None]:
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
corpus_ns = [[word for word in sent if not word.lower() in stop_words] for sent in brown.sents()]

In [None]:
spl = int(95*len(corpus)/100)
train_ns = corpus_ns[:spl]
test_ns = corpus_ns[spl:]

n = 3
train_data, padded_sents = padded_everygram_pipeline(n, train_ns)

lm_without = Laplace(n) 
lm_without.fit(train_data, padded_sents)

In [None]:
test_data = list(flatten(padded_everygrams(n, sent) for sent in test))

In [None]:
lm_without.entropy(test)

Probably the effect is not large. However, let's now do this on source code.

In [None]:
n = 3
train_data, padded_sents = padded_everygram_pipeline(n, java_training)

In [None]:
java_with = Laplace(n) 
java_with.fit(train_data, padded_sents)

In [None]:
java_with.entropy(java_test_data)

Since our Java-corpus only contains the lexemes but no longer the token type information, we'll just re-build the corpus from scratch, but filter on separators.

In [None]:
def tokenize_without_stopwords(code):
    try:
        tokens = [token.value for token in javalang.tokenizer.tokenize(code) if not isinstance(token, javalang.tokenizer.Separator) ]
    except:
        return []
    return tokens

In [None]:
java_training = []
java_test = []
with tarfile.open("java-small.tar.gz", "r") as f:
    for tf in f.getmembers():
        if tf.isfile() and tf.name.startswith("java-small/training"):
            f2 = f.extractfile(tf)
            content = f2.read()
            tokens = tokenize_without_stopwords(content)
            if tokens:
                java_training.append(tokens)
        elif tf.isfile() and tf.name.startswith("java-small/test"):
            f2 = f.extractfile(tf)
            content = f2.read()
            tokens = tokenize_without_stopwords(content)
            if tokens:
                java_test.append(tokens)

In [None]:
n=3
train_data, padded_sents = padded_everygram_pipeline(n, java_training)

In [None]:
java_without = Laplace(n) 
java_without.fit(train_data, padded_sents)

In [None]:
test_data = list(flatten(padded_everygrams(n, sent) for sent in java_test))

In [None]:
java_without.entropy(test_data)

The entropy of Java without separator characters is higher than without -- this shows that to a certain degree the repetitiveness of software is influenced by the syntactic overhead.

## Code Completion

n-gram models can be used to generate text, and we start by doing this on a classical corpus of natural language text available at: https://www.kaggle.com/datasets/kingburrito666/better-donald-trump-tweets?resource=download

In [None]:
import pandas as pd
df = pd.read_csv('data/Donald-Tweets!.csv')
df.head()

We build the model as usual.

In [None]:
from nltk import word_tokenize
trump_corpus = list(df['Tweet_Text'].apply(word_tokenize))

In [None]:
# Preprocess the tokenized text for 3-grams language modelling
n = 3
train_data, padded_sents = padded_everygram_pipeline(n, trump_corpus)

In [None]:
trump_model = MLE(n) 
trump_model.fit(train_data, padded_sents)

In [None]:
trump_model.generate(10)

Let's use a helper function to turn this into more readable sentences.

In [None]:
# Taken from https://www.kaggle.com/code/alvations/n-gram-language-model-with-nltk/notebook

from nltk.tokenize.treebank import TreebankWordDetokenizer
detokenize = TreebankWordDetokenizer().detokenize

def generate_sent(model, num_words, random_seed=42):
    """
    :param model: An ngram language model from `nltk.lm.model`.
    :param num_words: Max no. of words to generate.
    :param random_seed: Seed value for random.
    """
    content = []
    for token in model.generate(num_words, random_seed=random_seed):
        if token == '<s>':
            continue
        if token == '</s>':
            break
        content.append(token)
    return detokenize(content)

In [None]:
generate_sent(trump_model, num_words=20, random_seed=0)

In [None]:
generate_sent(trump_model, num_words=20, random_seed=2)

In [None]:
generate_sent(trump_model, num_words=20, random_seed=21)

We can also provide a context for the prediction in terms of a sentence. The last (n-1) tokens of this sentence are used to find the most likely n-gram.

In [None]:
trump_model.generate(1, text_seed = "Democrats")

Similarly, a simple approach to implement code completion is to build an n-gram model of source code, use the last (n-1) tokens as context, and look at the most likely n-gram.

Suppose we have typed `System.out.` and want to know what's next.

In [None]:
context = "System.out."

In [None]:
tokens = [token.value for token in list(javalang.tokenizer.tokenize(context))]

In [None]:
java_with.generate(1, text_seed = tokens)

What about for-loops?

In [None]:
context = "for (int i = 0; i < model.size(); i"

In [None]:
tokens = [token.value for token in list(javalang.tokenizer.tokenize(context))]

In [None]:
java_with.generate(1, text_seed = tokens)

Note that an ngram model is restricted in how much preceding context it can take into account. For example, a trigram model can only condition its output on 2 preceding words. If you pass in a 4-word context, the first two words will be ignored.

## CodeBERT

Feng, Z., Guo, D., Tang, D., Duan, N., Feng, X., Gong, M., ... & Zhou, M. (2020). Codebert: A pre-trained model for programming and natural languages. arXiv preprint arXiv:2002.08155.

In [None]:
from transformers import RobertaConfig, RobertaTokenizer, RobertaForMaskedLM, pipeline

model = RobertaForMaskedLM.from_pretrained("microsoft/codebert-base-mlm")
tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base-mlm")

CODE = "if (x is not None) <mask> (x>1)"
fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)

fill_mask(CODE)

In [None]:
CODE = "System.out.<mask>"
fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)

fill_mask(CODE)

In [None]:
CODE = "for (int i = 0; i < model.size(); i<mask>) {"
fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)

fill_mask(CODE)

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
model = AutoModel.from_pretrained("microsoft/codebert-base")

In [None]:
code_tokens=tokenizer.tokenize("def max(a,b): if a>b: return a else return b")

In [None]:
nl_tokens=tokenizer.tokenize("return maximum value")

In [None]:
tokens=[tokenizer.cls_token]+nl_tokens+[tokenizer.sep_token]+code_tokens+[tokenizer.sep_token]

In [None]:
tokens_ids=tokenizer.convert_tokens_to_ids(tokens)

In [None]:
context_embeddings=model(torch.tensor(tokens_ids)[None,:])[0]

In [None]:
context_embeddings