# **Naïve Compression of DNA Sequences**

In [1]:
from typing import Any, Dict, List

# Quantification of Information

Information theory is based on the observation that knowing that a likely event has occurred is less informative than knowing that an unlikely event has occurred.

A quantification of information should have the following properties:
- Likely events should have a low information content, and events that are certain to occur should have no information content at all. Less likely events should have a higher information content.
- Independent events should have additive information content.

The self-information of an event $x$ is hence defined as

$$I(x)=-\log{}P(x).$$

By using the base-2 logarithm, the unit of self-information is bit.
Hence, one bit is the amount of information gained by observing an event of probability $\frac{1}{2}$.

Self-information deals only with a single event $x$.
By computing the expectation of the self-information with respect to the entire probability distribution $P(\text{x})$ we obtain the entropy

$$H(\text{x})=\mathbb{E}_{\text{x}\sim{}P}[I(\text{x}=x)]=-\mathbb{E}_{\text{x}\sim{}P}[\log{}P(\text{x}=x)]=-\sum_{x}P(x)\log{}P(x).$$

The entropy gives the average information that is expected in an event $x$ drawn from probability distribution $P(\text{x})$.

##### ❓ Computing entropy

Complete the function `entropy()` to compute the entropy of the given sequence.
Then, compute the entropy in bit per symbol of the sequences `AAAA`, `AACC`, `ACGT`.

In [2]:
import collections
import math


def entropy(data: Any, base: int = 2) -> float:
    """Compute the entropy of a list of data."""
    if len(data) <= 1:
        return 0.0

    counts = collections.Counter()
    for datum in data:
        counts[datum] += 1

    eta = 0.0
    probs = [(float(c) / len(data)) for c in counts.values()]
    for prob in probs:
        if prob > 0.0:
            eta -= prob * math.log(prob, base)

    return eta

In [3]:
for sequence in ["AAAA", "AACC", "ACGT"]:
    eta = entropy(data=sequence)
    print(f"Entropy of '{sequence}': {round(eta, 2):.2f} bit/symbol")

Entropy of 'AAAA': 0.00 bit/symbol
Entropy of 'AACC': 1.00 bit/symbol
Entropy of 'ACGT': 2.00 bit/symbol


# The FASTQ Format

The [FASTQ format](https://en.wikipedia.org/wiki/FASTQ_format) is the de-facto standard for storing both a biological sequence (usually nucleotide sequence) and its corresponding quality scores.
Both the sequence letter and quality score are each encoded with a single ASCII character.

Each sequence, i.e., read, is represented by a single FASTQ record, which consists of four lines:
- The first line contains the **read identifier**. It starts with `@`. Typically, sequencing machine vendors generate read identifiers in a proprietary systematic way.
- The second line contains the **sequence**, where each symbol is represented with a single ASCII character.
- The third line starts with `+` and contains an optional **description**. Usually this line is left empty; it then only contains `+` as separator between the sequence and the quality scores.
- The fourth line contains the **quality scores**. A quality score is a value indicating the confidence in a base call.

The following function can be used to convert a FASTQ record into a dictionary:

In [4]:
def fastq_lines_to_dict(lines: List[str]) -> Dict[str, str]:
    """Convert a list of FASTQ lines to a dictionary."""
    keys = ["id", "seq", "desc", "qual"]
    return dict(zip(keys, lines))

##### ❓ Parsing a FASTQ file

Complete the following code to parse the FASTQ file `example.fastq`.
Print all FASTQ records in the following format:

```
Record 0: {'id': '@id0', 'seq': 'GATTTG...', 'desc': '+', 'qual': "!''*((..."}
Record 1: {'id': '@id1', 'seq': 'GATTTG...', 'desc': '+', 'qual': "!''*((..."}
...
```

In [5]:
def read_fastq_file(file_path: str) -> List[Dict[str, str]]:
    """Read a FASTQ file and return a list of records."""
    with open(file=file_path, mode="r") as file:
        records = []
        lines = []
        for line in file:
            lines.append(line.rstrip())
            if (len(lines)) == 4:
                records.append(fastq_lines_to_dict(lines=lines))
                lines = []

        return records


records = read_fastq_file(file_path="data/example.fastq")
for i, record in enumerate(records):
    print(f"Record {i:2}: {record}")

Record  0: {'id': '@id00', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  1: {'id': '@id01', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  2: {'id': '@id02', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  3: {'id': '@id03', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  4: {'id': '@id04', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  5: {'id': '@id05', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  6: {'id': '@id06', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++)(%%%%).1***-"}
Record  7: {'id': '@id07', 'seq': 'GATTTGGGGTTCAAAGCAGTATCGATCAAATA', 'desc': '+', 'qual': "!''*((((***+))%%%++

# Compression of Nucleotide Sequences

##### ❓ Compressing DNA sequence reads

Concatenate all nucleotide sequences from the FASTQ file `example.fastq`.
Compute the entropy (in bit per symbol) and the maximum (worst-case) compressed size in bit and byte.

> The assumption here is that every well-designed compressor that makes uses of any statistics beyond the per-symbol probabilites must yield a compressed bitstream that is smaller or equal to the entropy.

Then, use gzip to beat the estimated worst-case compression.

> Use the functions `gzip.compress()` and `gzip.decompress()`.
> Use UTF-8 encoding.

In [6]:
import gzip

# Concatenate all sequences.
seq = ""
for record in records:
    seq += record["seq"]
seq_len = len(seq)
print(f"Concatenated sequence length: {seq_len}")

# Compute the entropy (in bit per symbol) and the maximum (worst-case) compressed size
# in bit and byte.
eta = entropy(data=seq)
max_compressed_size_in_bit = math.ceil(eta * seq_len)
max_compressed_size_in_byte = math.ceil(max_compressed_size_in_bit / 8)
print(f"Entropy: {round(number=eta, ndigits=2):.2f} bit/symbol")
print(
    f"Maximum compressed size: "
    f"{max_compressed_size_in_bit} bit \u2259"
    f"{max_compressed_size_in_byte} byte"
)
print(f"Worst-case compression ratio: {seq_len / max_compressed_size_in_byte:.1f}x")

# Use gzip to beat the estimated worst-case compression.
compressed_seq = gzip.compress(data=bytes(seq, encoding="utf-8"))
decompressed_seq = gzip.decompress(data=compressed_seq).decode(encoding="utf-8")
print(f"Gzip compression ratio: {seq_len / len(compressed_seq):.1f}x")

Concatenated sequence length: 384
Entropy: 1.92 bit/symbol
Maximum compressed size: 738 bit ≙93 byte
Worst-case compression ratio: 4.1x
Gzip compression ratio: 7.4x
