In [100]:
def estimate_output_probabilities(training_file, smoothing_delta, output_probs_filename):
    # Count occurrences of each (token, tag) pair
    token_tag_count = {}
    tag_count = {}
    with open(training_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line:
                token, tag = line.split('\t')
                token_tag_count[(token, tag)] = token_tag_count.get((token, tag), 0) + 1
                tag_count[tag] = tag_count.get(tag, 0) + 1

    # Calculate output probabilities
    output_probs = {}
    unique_tokens = set(token for (token, _) in token_tag_count.keys())
    num_unique_tokens = len(unique_tokens)
    for (token, tag), count in token_tag_count.items():
        output_probs[(token, tag)] = (count + smoothing_delta) / (tag_count[tag] + smoothing_delta * (num_unique_tokens + 1))

    # Write output probabilities to file
    with open(output_probs_filename, 'w', encoding='utf-8') as f:
        for (token, tag), prob in output_probs.items():
            f.write(f"{token}\t{tag}\t{prob}\n")

# Example usage:
estimate_output_probabilities('twitter_train.txt', smoothing_delta=0.6, output_probs_filename='naive_output_probs.txt')


In [101]:
def naive_predict(in_output_probs_filename, in_test_filename, out_prediction_filename):
    # Load output probabilities from file
    output_probs = {}
    with open(in_output_probs_filename, 'r', encoding='utf-8') as f:
        for line in f:
            token, tag, prob = line.strip().split('\t')
            output_probs[(token, tag)] = float(prob)

    # Predict tags for test data
    with open(in_test_filename, 'r', encoding='utf-8') as f_in, open(out_prediction_filename, 'w', encoding='utf-8') as f_out:
        for line in f_in:
            token = line.strip()
            if token:  # Non-empty line
                max_prob = -1
                predicted_tag = None
                for tag in set(tag for (t, tag) in output_probs.keys() if t == token):
                    prob = output_probs.get((token, tag), 0)
                    if prob > max_prob:
                        max_prob = prob
                        predicted_tag = tag
                if predicted_tag is not None:
                    f_out.write(predicted_tag + '\n')
                else:
                    # Handle case when no tags are found for the token
                    f_out.write('UNKNOWN\n')
            else:  # Empty line (end of tweet)
                f_out.write('\n')

# Example usage:
naive_predict('naive_output_probs.txt', 'twitter_dev_no_tag.txt', 'naive_predictions.txt')


In [102]:
def evaluate(in_prediction_filename, in_answer_filename):
    """Do not change this method"""
    with open(in_prediction_filename) as fin:
        predicted_tags = [l.strip() for l in fin.readlines() if len(l.strip()) != 0]

    with open(in_answer_filename) as fin:
        ground_truth_tags = [l.strip() for l in fin.readlines() if len(l.strip()) != 0]

    assert len(predicted_tags) == len(ground_truth_tags)
    correct = 0
    for pred, truth in zip(predicted_tags, ground_truth_tags):
        if pred == truth: correct += 1
    return correct, len(predicted_tags), correct/len(predicted_tags)

In [103]:
in_ans_filename='twitter_dev_ans.txt'
naive_prediction_filename='naive_predictions.txt'
correct, total, acc = evaluate(naive_prediction_filename, in_ans_filename)
print(f'Naive prediction accuracy:     {correct}/{total} = {acc}')

Naive prediction accuracy:     908/1378 = 0.6589259796806967


In [104]:
def naive_predict2(in_output_probs_filename, in_train_filename, in_test_filename, out_prediction_filename):
    # Load output probabilities from file
    output_probs = {}
    with open(in_output_probs_filename, 'r', encoding='utf-8') as f:
        for line in f:
            token, tag, prob = line.strip().split('\t')
            output_probs[(token, tag)] = float(prob)

    # Load tag probabilities from training data
    tag_probs = {}
    total_tags = 0
    with open(in_train_filename, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line:
                _, tag = line.split('\t')
                tag_probs[tag] = tag_probs.get(tag, 0) + 1
                total_tags += 1

    # Normalize tag probabilities
    for tag in tag_probs:
        tag_probs[tag] /= total_tags

    # Predict tags for test data
    with open(in_test_filename, 'r', encoding='utf-8') as f_in, open(out_prediction_filename, 'w', encoding='utf-8') as f_out:
        for line in f_in:
            token = line.strip()
            if token:  # Non-empty line
                max_prob = -1
                predicted_tag = None
                for tag in set(tag for (_, tag) in output_probs.keys()):
                    prob = output_probs.get((token, tag), 0) * tag_probs.get(tag, 0)
                    if prob > max_prob:
                        max_prob = prob
                        predicted_tag = tag
                f_out.write(predicted_tag + '\n')
            else:  # Empty line (end of tweet)
                f_out.write('\n')

# Example usage:
naive_predict2('naive_output_probs.txt', 'twitter_train.txt', 'twitter_dev_no_tag.txt', 'naive_predictions2.txt')


In [105]:
correct, total, accuracy = evaluate('naive_predictions2.txt', 'twitter_dev_ans.txt')
print(f'Accuracy: {correct}/{total} = {accuracy:.2%}')

Accuracy: 909/1378 = 65.97%


In [106]:
def compute_transition_probabilities(training_file, smoothing_delta, trans_probs_filename):
    # Count occurrences of each (previous tag, current tag) pair
    transition_count = {}
    tag_count = {}
    
    # Initialize tag_count to account for START tag
    tag_count['*'] = 0

    with open(training_file, 'r', encoding='utf-8') as f:
        prev_tag = '*'
        for line in f:
            line = line.strip()
            if line:
                token, tag = line.split('\t')
                tag_count[tag] = tag_count.get(tag, 0) + 1
                transition_count[(prev_tag, tag)] = transition_count.get((prev_tag, tag), 0) + 1
                prev_tag = tag
            else:  # Empty line (end of tweet)
                tag_count['*'] += 1  # Increment count for STOP tag
                prev_tag = '*'  # Reset prev_tag for next tweet

    # Smooth transition counts
    num_tags = len(tag_count)
    for prev_tag in tag_count:
        for tag in tag_count:
            transition_count[(prev_tag, tag)] = (transition_count.get((prev_tag, tag), 0) + smoothing_delta) / \
                                                (tag_count.get(prev_tag, 0) + smoothing_delta * num_tags)

    # Write transition probabilities to file
    with open(trans_probs_filename, 'w', encoding='utf-8') as f:
        for (prev_tag, tag), prob in transition_count.items():
            f.write(f"{prev_tag}\t{tag}\t{prob}\n")

# Example usage:
compute_transition_probabilities('twitter_train.txt', smoothing_delta=0.1, trans_probs_filename='trans_probs.txt')


In [107]:
def viterbi_predict(in_tags_filename, in_trans_probs_filename, in_output_probs_filename, in_test_filename,
                    out_predictions_filename):
    # Load tags
    with open(in_tags_filename, 'r', encoding='utf-8') as f:
        tags = [line.strip() for line in f]

    # Load transition probabilities
    trans_probs = {}
    with open(in_trans_probs_filename, 'r', encoding='utf-8') as f:
        for line in f:
            prev_tag, tag, prob = line.strip().split('\t')
            trans_probs[(prev_tag, tag)] = float(prob)

    # Load output probabilities
    output_probs = {}
    with open(in_output_probs_filename, 'r', encoding='utf-8') as f:
        for line in f:
            token, tag, prob = line.strip().split('\t')
            output_probs[(token, tag)] = float(prob)

    # Viterbi algorithm
    with open(in_test_filename, 'r', encoding='utf-8') as f_in, open(out_predictions_filename, 'w', encoding='utf-8') as f_out:
        for line in f_in:
            tokens = line.strip().split()
            if tokens:  # Non-empty line
                n = len(tokens)
                best_scores = {}
                back_pointers = {}
                for tag in tags:
                    # Initialization
                    best_scores[(0, tag)] = trans_probs.get(('*', tag), 0) * output_probs.get((tokens[0], tag), 0)
                    back_pointers[(0, tag)] = None

                for i in range(1, n):
                    for tag in tags:
                        best_score, back_pointer = max(
                            ((best_scores[(i - 1, prev_tag)] * trans_probs.get((prev_tag, tag), 0) *
                              output_probs.get((tokens[i], tag), 0), prev_tag) for prev_tag in tags)
                        )
                        best_scores[(i, tag)] = best_score
                        back_pointers[(i, tag)] = back_pointer

                # Find the best final tag
                best_final_tag = max(tags, key=lambda tag: best_scores[(n - 1, tag)])

                # Trace back to find the best tag sequence
                predicted_tags = [best_final_tag]
                prev_tag = best_final_tag
                for i in range(n - 1, 0, -1):
                    prev_tag = back_pointers[(i, prev_tag)]
                    predicted_tags.insert(0, prev_tag)

                # Write predicted tags to output file
                for token, tag in zip(tokens, predicted_tags):
                    f_out.write(tag + '\n')
            else:  # Empty line (end of tweet)
                f_out.write('\n')

# Example usage:
viterbi_predict('twitter_tags.txt', 'trans_probs.txt', 'naive_output_probs.txt', 'twitter_dev_no_tag.txt', 'viterbi_predictions.txt')


In [108]:
# Example usage to calculate accuracy
correct, total, accuracy = evaluate('viterbi_predictions.txt', 'twitter_dev_ans.txt')
print(f'Accuracy: {correct}/{total} = {accuracy:.2%}')

Accuracy: 949/1378 = 68.87%


In [109]:
def compute_transition_probabilities_with_smoothing(training_file, smoothing_delta):
    # Count occurrences of each (previous tag, current tag) pair
    transition_count = {}
    tag_count = {}
    
    # Initialize tag_count to account for START tag
    tag_count['*'] = 0

    with open(training_file, 'r', encoding='utf-8') as f:
        prev_tag = '*'
        for line in f:
            line = line.strip()
            if line:
                _, tag = line.split('\t')
                tag_count[tag] = tag_count.get(tag, 0) + 1
                transition_count[(prev_tag, tag)] = transition_count.get((prev_tag, tag), 0) + 1
                prev_tag = tag
            else:  # Empty line (end of tweet)
                tag_count['*'] += 1  # Increment count for STOP tag
                prev_tag = '*'  # Reset prev_tag for next tweet

    # Smooth transition counts
    num_tags = len(tag_count)
    for prev_tag in tag_count:
        for tag in tag_count:
            transition_count[(prev_tag, tag)] = (transition_count.get((prev_tag, tag), 0) + smoothing_delta) / \
                                                (tag_count.get(prev_tag, 0) + smoothing_delta * num_tags)

    # Write transition probabilities to file
    with open('trans_probs2.txt', 'w', encoding='utf-8') as f:
        for (prev_tag, tag), prob in transition_count.items():
            f.write(f"{prev_tag}\t{tag}\t{prob}\n")


def compute_output_probabilities_with_smoothing(training_file, smoothing_delta):
    # Count occurrences of each (token, tag) pair
    output_count = {}
    tag_count = {}

    with open(training_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line:
                token, tag = line.split('\t')
                tag_count[tag] = tag_count.get(tag, 0) + 1
                output_count[(token, tag)] = output_count.get((token, tag), 0) + 1

    # Smooth output counts
    vocabulary_size = len(output_count)  # Number of unique (token, tag) pairs
    num_tags = len(tag_count)
    for token, tag in output_count:
        output_count[(token, tag)] = (output_count.get((token, tag), 0) + smoothing_delta) / \
                                     (tag_count.get(tag, 0) + smoothing_delta * vocabulary_size)

    # Write output probabilities to file
    with open('output_probs2.txt', 'w', encoding='utf-8') as f:
        for (token, tag), prob in output_count.items():
            f.write(f"{token}\t{tag}\t{prob}\n")

# Example usage:
compute_transition_probabilities_with_smoothing('twitter_train.txt', smoothing_delta=0.1)
compute_output_probabilities_with_smoothing('twitter_train.txt', smoothing_delta=0.1)


In [153]:
def viterbi_predict2(in_tags_filename, in_trans_probs_filename, in_output_probs_filename, in_test_filename,
                     out_predictions_filename):
    # Load tags
    with open(in_tags_filename, 'r', encoding='utf-8') as f:
        tags = [line.strip() for line in f]

    # Compute transition probabilities with Laplace smoothing
    compute_transition_probabilities_with_smoothing('twitter_train.txt', smoothing_delta=3)

    # Compute output probabilities with Laplace smoothing
    compute_output_probabilities_with_smoothing('twitter_train.txt', smoothing_delta=0.1)

    # Load transition probabilities with Laplace smoothing
    trans_probs = {}
    with open('trans_probs2.txt', 'r', encoding='utf-8') as f:
        for line in f:
            prev_tag, tag, prob = line.strip().split('\t')
            trans_probs[(prev_tag, tag)] = float(prob)

    # Load output probabilities with Laplace smoothing
    output_probs = {}
    with open('output_probs2.txt', 'r', encoding='utf-8') as f:
        for line in f:
            token, tag, prob = line.strip().split('\t')
            output_probs[(token, tag)] = float(prob)

    # Apply Viterbi algorithm with improved probabilities
    with open(in_test_filename, 'r', encoding='utf-8') as f_in, open(out_predictions_filename, 'w', encoding='utf-8') as f_out:
        for line in f_in:
            tokens = line.strip().split()
            if tokens:  # Non-empty line
                n = len(tokens)
                best_scores = {}
                back_pointers = {}
                for tag in tags:
                    # Initialization
                    best_scores[(0, tag)] = trans_probs.get(('*', tag), 0) * output_probs.get((tokens[0], tag), 0)
                    back_pointers[(0, tag)] = None

                for i in range(1, n):
                    for tag in tags:
                        best_score, back_pointer = max(
                            ((best_scores[(i - 1, prev_tag)] * trans_probs.get((prev_tag, tag), 0) *
                              output_probs.get((tokens[i], tag), 0), prev_tag) for prev_tag in tags)
                        )
                        best_scores[(i, tag)] = best_score
                        back_pointers[(i, tag)] = back_pointer

                # Find the best final tag
                best_final_tag = max(tags, key=lambda tag: best_scores[(n - 1, tag)])

                # Trace back to find the best tag sequence
                predicted_tags = [best_final_tag]
                prev_tag = best_final_tag
                for i in range(n - 1, 0, -1):
                    prev_tag = back_pointers[(i, prev_tag)]
                    predicted_tags.insert(0, prev_tag)

                # Write predicted tags to output file
                for token, tag in zip(tokens, predicted_tags):
                    f_out.write(tag + '\n')
            else:  # Empty line (end of tweet)
                f_out.write('\n')

# Example usage:
viterbi_predict2('twitter_tags.txt', 'trans_probs.txt', 'output_probs.txt', 'twitter_dev_no_tag.txt', 'viterbi_predictions2.txt')


In [154]:
# Example usage to calculate accuracy
correct, total, accuracy = evaluate('viterbi_predictions2.txt', 'twitter_dev_ans.txt')
print(f'Accuracy: {correct}/{total} = {accuracy:.2%}')

Accuracy: 956/1378 = 69.38%
