# 23.3. Dictionary coding I - 10 points

- Compress and decompress files from Lab 1. using LZ77 or LZSS (7pts)
- Try different sizes of the sliding window: 4kB, 16kB, 32kB and lengths of the uncompressed part. (1pt)
- Try to compute zero order entropy of each field in a token and use it to compute approximate final compressed file size. (1pt)
- Compute the length of the output file and prepare a report summarizing the results. (1pt)

## Compress and decompress files from Lab 1. using LZ77 or LZSS (7pts)

In [2]:
import os
import sys

def adding_module_path():
    module_path = os.path.abspath(os.path.sep.join([".."]*2))

    if module_path not in sys.path:
        sys.path.append(module_path)

adding_module_path()

In [231]:
from src.load_data import get_dataset
from src.load_data import DataSets
from src.get_probs import get_sorted_probs_as_df
import numpy as np
import pandas as pd
import time
from src.types.table_types import Fields
from src.save import save_both
from enum import Enum
import re
from ast import literal_eval


In [7]:
class LZType(Enum):
    LZ77 = "LZ77"
    LZSS = "LZSS"

In [9]:
data_dna, path_dna = get_dataset(DataSets.dna)

Loading C://Users//Vojta//Desktop//iv//AKS//datasets\dna\dna.50MB


In [10]:
test_text = "abbabbabbbaab"

### LZ77

In [336]:
class LZ77:
    def __init__(self):
        pass

    def encode(self, text):
        items = []
        unprocessed = text
        processed = ""
        while True:
            pointer, match_length = None, None

            if len(unprocessed) == 0:
                break

            if len(unprocessed) == 1:
                current_item = {
                    "Processed": processed + unprocessed,
                    "Unprocessed": "",
                    "P": 0,
                    "L": 0,
                    "N": unprocessed
                }
                items.append(current_item)
                break


            if len(items) == 0:
                pointer, match_length, match_text = self.find_longest_match(None, unprocessed)
            else:
                last_item = items[-1]
                pointer, match_length, match_text = self.find_longest_match(last_item["Processed"], last_item["Unprocessed"])
            

            next_symbol = unprocessed[match_length]
            processed +=  match_text + next_symbol
            unprocessed = unprocessed[match_length+1:]
            

            current_item = {
                "Processed": processed,
                "Unprocessed": unprocessed,
                "P": pointer,
                "L": match_length,
                "N": next_symbol
            }

            items.append(current_item)
        return items

    def find_longest_match(self, text_processed, text_unprocessed):
        #start of match
        pointer = 0

        #match length
        match_length = 0

        #text which is matched without replication
        match_text = ""

        if text_processed is None:
            return pointer, match_length, match_text

        for i in range(len(text_processed)):
            sub_sequence = text_unprocessed[:i+1]

            matches = [(match.span(), match.string[match.span()[0]:match.span()[1]])for match in re.finditer(sub_sequence, text_processed)]
            sorted_matches = list(sorted(matches, key=lambda x: x[0][0], reverse=True))


            if len(sorted_matches) > 0:
                last = sorted_matches[0]
                span, text = last
                start, end = span

                if end == len(text_processed):
                    #try replicate
                    test_length = 0
                    while True:
                        current_char = text[test_length % len(text)]
                        if current_char == text_unprocessed[test_length]:
                            test_length += 1
                        else:
                            break

                    #maybe equal?
                    if test_length > match_length:
                        match_length = test_length
                        pointer = len(text_processed) - start
                        match_text = text_unprocessed[0:test_length]
                else:
                    if len(text) > match_length:
                        match_length = len(text)
                        pointer = len(text_processed) - start
                        match_text = text_unprocessed[0:len(text)]


        return pointer, match_length, match_text

    def create_from_triplets_code(self, triplets):
        return ";".join(str((x["P"], x["L"], x["N"])) for x in triplets)

    def create_from_code_triplets(self, code):
        return code.split(';')

    def tiplets_to_table(self, triplets):
        return pd.DataFrame(triplets)
    
    def decode(self, triplets):
        processed = ""

        for triplet in triplets:
            new_processed = processed
            pointer, sequnce_length, next_symbol = literal_eval(triplet)

            if pointer == 0 and sequnce_length == 0:
                new_processed += next_symbol
                processed = new_processed
                continue
            
            i = pointer
            counter = 0
            while counter < sequnce_length:
                new_processed += processed[-i]
                counter += 1
                i -= 1
                if i == 0:
                    i = pointer

            new_processed += next_symbol
            processed = new_processed


        return processed

In [337]:
instance = LZ77()

In [338]:
instance.find_longest_match("abba", "bbabbbaab")

(3, 5, 'bbabb')

In [348]:
triplets = instance.encode(test_text)

In [349]:
instance.tiplets_to_table(triplets)

Unnamed: 0,Processed,Unprocessed,P,L,N
0,a,bbabbabbbaab,0,0,a
1,ab,babbabbbaab,0,0,b
2,abba,bbabbbaab,1,1,a
3,abbabbabbb,aab,3,5,b
4,abbabbabbbaa,b,4,1,a
5,abbabbabbbaab,,0,0,b


In [350]:
code = instance.create_from_triplets_code(triplets)

In [351]:
code

"(0, 0, 'a');(0, 0, 'b');(1, 1, 'a');(3, 5, 'b');(4, 1, 'a');(0, 0, 'b')"

In [352]:
triplets = instance.create_from_code_triplets(code)

In [353]:
decoded = instance.decode(triplets)

#### Test if works correctly

In [354]:
decoded == test_text

True

### LZSS

In [362]:
test_text = "abbabbabbbaab"

In [410]:
class LZSS:
    def __init__(self):
        pass

    def encode(self, text):
        items = []
        unprocessed = text
        processed = ""
        while True:
            pointer, match_length = None, None

            if len(unprocessed) == 0:
                break

            if len(unprocessed) == 1:
                current_item = {
                    "Processed": processed + unprocessed,
                    "Unprocessed": "",
                    "Output": (0, unprocessed)
                }
                items.append(current_item)
                break


            if len(items) == 0:
                pointer, match_length, match_text = self.find_longest_match(None, unprocessed)
            else:
                last_item = items[-1]
                pointer, match_length, match_text = self.find_longest_match(last_item["Processed"], last_item["Unprocessed"])
            
            if pointer == 0:
                next_symbol = unprocessed[match_length]
                processed += next_symbol
                unprocessed = unprocessed[match_length+1:]
                current_item = {
                    "Processed": processed,
                    "Unprocessed": unprocessed,
                    "Output": (0, next_symbol)
                }
                items.append(current_item)

            else:
                processed +=  match_text
                unprocessed = unprocessed[match_length:]
                
                current_item = {
                    "Processed": processed,
                    "Unprocessed": unprocessed,
                    "Output": (1, pointer, match_length)
                }
                items.append(current_item)
            
        return items

    def find_longest_match(self, text_processed, text_unprocessed):
        #start of match
        pointer = 0

        #match length
        match_length = 0

        #text which is matched without replication
        match_text = ""

        if text_processed is None:
            return pointer, match_length, match_text

        for i in range(len(text_processed)):
            sub_sequence = text_unprocessed[:i+1]

            matches = [(match.span(), match.string[match.span()[0]:match.span()[1]])for match in re.finditer(sub_sequence, text_processed)]
            sorted_matches = list(sorted(matches, key=lambda x: x[0][0], reverse=True))


            if len(sorted_matches) > 0:
                last = sorted_matches[0]
                span, text = last
                start, end = span

                if end == len(text_processed):
                    #try replicate
                    test_length = 0
                    while True:
                        current_char = text[test_length % len(text)]
                        if current_char == text_unprocessed[test_length]:
                            test_length += 1
                        else:
                            break

                    #maybe equal?
                    if test_length > match_length:
                        match_length = test_length
                        pointer = len(text_processed) - start
                        match_text = text_unprocessed[0:test_length]
                else:
                    if len(text) > match_length:
                        match_length = len(text)
                        pointer = len(text_processed) - start
                        match_text = text_unprocessed[0:len(text)]


        return pointer, match_length, match_text

    def create_from_triplets_code(self, triplets):
        return ";".join(str((x["Output"])) for x in triplets)

    def create_from_code_triplets(self, code):
        return code.split(';')

    def tiplets_to_table(self, triplets):
        return pd.DataFrame(triplets)
    
    def decode(self, triplets):
        processed = ""

        for triplet in triplets:
            new_processed = processed
            tup = literal_eval(triplet)

            if tup[0] == 0:
                flag, next = tup
                new_processed += next
                processed = new_processed
                continue

            else:
                flag, pointer, sequnce_length = tup

                i = pointer
                counter = 0
                while counter < sequnce_length:
                    new_processed += processed[-i]
                    counter += 1
                    i -= 1
                    if i == 0:
                        i = pointer

                processed = new_processed


        return processed

In [411]:
instance = LZSS()

In [412]:
instance.find_longest_match("abbabbabbbba", "ab")

(6, 2, 'ab')

In [413]:
triplets = instance.encode(test_text)

In [414]:
#chyba v prezentaci v poslednim radku :((

instance.tiplets_to_table(triplets)

Unnamed: 0,Processed,Unprocessed,Output
0,a,bbabbabbbaab,"(0, a)"
1,ab,babbabbbaab,"(0, b)"
2,abb,abbabbbaab,"(1, 1, 1)"
3,abbabbabb,baab,"(1, 3, 6)"
4,abbabbabbba,ab,"(1, 4, 2)"
5,abbabbabbbaab,,"(1, 5, 2)"


In [415]:
code = instance.create_from_triplets_code(triplets)

In [416]:
code

"(0, 'a');(0, 'b');(1, 1, 1);(1, 3, 6);(1, 4, 2);(1, 5, 2)"

In [417]:
triplets = instance.create_from_code_triplets(code)

In [418]:
decoded = instance.decode(triplets)

#### Test if works correctly

In [419]:
decoded == test_text

True

## Try different sizes of the sliding window: 4kB, 16kB, 32kB and lengths of the uncompressed part. (1pt)

Co je sliding window?

## Try to compute zero order entropy of each field in a token and use it to compute approximate final compressed file size. (1pt)

Vytvořit graf v kterém budou entropie pro každý column!

## Compute the length of the output file and prepare a report summarizing the results. (1pt)

Co uložit do souboru?
