In [1]:
import numpy as np
import matplotlib.pyplot as plt


# 3 LZW Codes:

We create encoder as below.

In [68]:
class LZW_Model:
    def __init__(self, codebook_init = None, encoding_word_size=9):
        #LZW Model, including encoder and decoder, codebook difference, and compression ratio
        #data: string to be encoded
        #encoding_word_size: number of bits to encode each word in the code, default 2e12 =4096 bits 
        self.data = ""
        self.encoding_word_size = encoding_word_size #bits
        if codebook_init is None:
            self.codebook_init = {chr(i):i for i in range(256)}
        else:
            self.codebook_init = codebook_init
        print(f"codebook_init: {self.codebook_init}")
        
    def run(self, data, file_name=None, debug = False):  
        #runs the LZW model on the input data
        #returns the compression ratio
        #Inputs: data: string to be encoded 
        #        debug [True, False, "Verbose"]: boolean to print debug statements
        #Prints: Input-Ouput Verification: True if the input and output are the same, False otherwise
        #        Compression ratio: the compression ratio of the input and output
        #        code: the encoded data
        #        encoder_codebook: the codebook of the encoder
        #        decoder_codebook: the codebook of the decoder
        if file_name is not None:
            print(f"-------------------\nRunning LZW Model on input file: {file_name} of length: {len(data)}\n-------------------")
        else:
            print(F"-------------------\nRunning LZW Model on input data length: {len(data)}\n-------------------")
        print(f"encoding_word_size: {self.encoding_word_size}")
        self.data = data
        code, encoder_codebook = self.encode(data, debug)

        if code is None:
            return None
        output, decoder_codebook = self.decode(code, debug) 
        print(f"Input-Ouput Verification: {output == data}") 
        print(f"codebook length: {len(encoder_codebook)}")
        print(f"Compression ratio: {self.get_compression_ratio(data, code)}")
        try: 
            if debug.lower() == "verbose":
                print(f"code: {code}")
                print(f"encoder_codebook: {encoder_codebook}")
                print(f"decoder_codebook: {decoder_codebook}")
        except:
            if debug:
                print(f"codebook_difference: {self.get_codebook_difference(encoder_codebook, decoder_codebook)}")
                print(f"Input: {data}\nOutput: {output}")
        print(F"-------------------\n Done \n-------------------")

    def encode(self, data, debug = False):
        # Perform LZW encoding on string input
        # Input: string
        # Output: encoding (LZW code) in string
        curr = ""
        code = []
        codebook = self.codebook_init.copy()
        try:
            if debug.lower() == "verbose":
                print(f"first 10 letters: {list(data[:10])}")
        except:
            pass
        
        for i in range(len(data)):
            try:
                if debug.lower() == "verbose":
                    print(f"i:{i} curr: {curr}, data[i]: {data[i]}, curr + data[i]: {curr + data[i]}")
                codebook[curr + data[i]]
            except:
                pass
            
            if curr + data[i] not in codebook:
                code += [codebook[curr]]
                codebook[curr + data[i]] = len(codebook)
                curr = data[i]
            else:
                curr += data[i]
        if curr:
            code += [codebook[curr]]
            
        if len(codebook) >= 2**(self.encoding_word_size): #not efficient but failsafe
            #exit and return failure
            print(f"Codebook size: {len(codebook)} exceeded {(self.encoding_word_size)} bits")
            
            return None, None
        return code, codebook

    def decode(self, code, debug = False):
        # Perform LZW decoding on string input
        # Input: list
        # Output: decoding (LZW code) in string
        if debug:
            print(f"Decoding")
        codebook = {v: k for k, v in self.codebook_init.items()}
        data = codebook[code[0]]
        for i in range(len(code)-1):
                
            curr = code[i]
            next = code[i+1]
            if debug:
                print(f"i: {i} curr: {curr}, next: {next}")
            if next in codebook:
                data += codebook[next]
                codebook[len(codebook)] = codebook[curr] + codebook[next][0] 
            else:
                if debug:
                    print(F"next: {next}, not in code book")
                    print(f"codebook: {codebook}")
                    print(f"i: {i} codebook[curr]: {codebook[curr]} + codebook[curr][0]: {codebook[curr][0]}")
                codebook[next] = codebook[curr] + codebook[curr][0]
                data += codebook[next]
            
        return data, codebook
    
   
        
    def get_codebook_difference(self, encoder_codebook, decoder_codebook):
        #returns the difference between the codebooks of the encoder and decoder
        #returns a dictionary of the differences   
        difference = {}
        for key in encoder_codebook:
            if key not in decoder_codebook.values():
                difference[key] = encoder_codebook[key]
        return difference
    
    def get_compression_ratio(self, data, code):
        num_bits_data_utf8 = len(data.encode('utf-8'))*8 #factor of 8 for bytes to bits
        num_bits_code = len(code)*self.encoding_word_size
        print(f"num_bits_data_utf8: {num_bits_data_utf8}, num_bits_code: {num_bits_code}")
        compression_ratio = num_bits_data_utf8/ num_bits_code
        return compression_ratio
    
    def get_data(self):
        return self.data
    
    def run_onfile(self, filename, debug = False):
        #runs the LZW model on the input file
        #returns the compression ratio
        #Inputs: filename: string of the file to be encoded 
        #        debug [True, False, "Verbose"]: boolean to print debug statements
        #Prints: Input-Ouput Verification: True if the input and output are the same, False otherwise
        #        Compression ratio: the compression ratio of the input and output
        #        code: the encoded data
        #        encoder_codebook: the codebook of the encoder
        #        decoder_codebook: the codebook of the decoder
        with open(filename, "r") as f:
            data = f.read()
        self.run(data, filename, debug)

## 4.1 Testing:
Code tested against:
1) Wiki: https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
2) Geeks for Geeks: https://www.geeksforgeeks.org/lzw-lempel-ziv-welch-compression-technique/

We verify that encoding, code is correct.

In [65]:
#codebook_init equal 0 to 25 all caps alphabet
codebook_init = {chr(i+65):i for i in range(26)}
print(f"codebook_init: {codebook_init}")
LZW = LZW_Model(codebook_init = codebook_init, encoding_word_size=6)
LZW.run("TOBEORNOTTOBEORTOBEORNOT", debug = "verbose")
# LZW1 = LZW_Model(codebook_init = codebook_init)
# LZW1.run("BABAABAAA", debug = "verbose")



codebook_init: {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11, 'M': 12, 'N': 13, 'O': 14, 'P': 15, 'Q': 16, 'R': 17, 'S': 18, 'T': 19, 'U': 20, 'V': 21, 'W': 22, 'X': 23, 'Y': 24, 'Z': 25}
codebook_init: {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11, 'M': 12, 'N': 13, 'O': 14, 'P': 15, 'Q': 16, 'R': 17, 'S': 18, 'T': 19, 'U': 20, 'V': 21, 'W': 22, 'X': 23, 'Y': 24, 'Z': 25}
-------------------
Running LZW Model on input data length: 9
-------------------
encoding_word_size: 6
first 10 letters: ['B', 'A', 'B', 'A', 'A', 'B', 'B', 'B', 'A']
i:0 curr: , data[i]: B, curr + data[i]: B
i:1 curr: B, data[i]: A, curr + data[i]: BA
i:2 curr: A, data[i]: B, curr + data[i]: AB
i:3 curr: B, data[i]: A, curr + data[i]: BA
i:4 curr: BA, data[i]: A, curr + data[i]: BAA
i:5 curr: A, data[i]: B, curr + data[i]: AB
i:6 curr: AB, data[i]: B, curr + data[i]: ABB
i:7 curr: B, data[i]: B, curr + data[i]: B

In [66]:
LZW = LZW_Model(codebook_init = codebook_init)
LZW.run("ABB", debug = "verbose")
LZW.run("AAA", debug = "verbose")
LZW.run("AAC", debug = "verbose")
LZW.run("QBQQQ",  debug = "verbose")


codebook_init: {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11, 'M': 12, 'N': 13, 'O': 14, 'P': 15, 'Q': 16, 'R': 17, 'S': 18, 'T': 19, 'U': 20, 'V': 21, 'W': 22, 'X': 23, 'Y': 24, 'Z': 25}
-------------------
Running LZW Model on input data length: 3
-------------------
encoding_word_size: 9
first 10 letters: ['A', 'B', 'B']
i:0 curr: , data[i]: A, curr + data[i]: A
i:1 curr: A, data[i]: B, curr + data[i]: AB
i:2 curr: B, data[i]: B, curr + data[i]: BB
Decoding
i: 0 curr: 0, next: 1
i: 1 curr: 1, next: 1
Input-Ouput Verification: True
num_bits_data_utf8: 24, num_bits_code: 27
Compression ratio: 0.8888888888888888
code: [0, 1, 1]
encoder_codebook: {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11, 'M': 12, 'N': 13, 'O': 14, 'P': 15, 'Q': 16, 'R': 17, 'S': 18, 'T': 19, 'U': 20, 'V': 21, 'W': 22, 'X': 23, 'Y': 24, 'Z': 25, 'AB': 26, 'BB': 27}
decoder_codebook: {0: 'A', 1: 'B', 2: 'C', 3: 'D'

## 4.2 Redundant Data 10 million

In [77]:
#generate highly repetitive text data:
#generate a random string of length n
n = 10000000
LZW = LZW_Model(codebook_init = {'A':0, 'B':1}, encoding_word_size=40)
data = ''.join(np.random.choice(['A','B'], size=n))
print(f"length of data: {len(data)}")

codebook_init: {'A': 0, 'B': 1}
length of data: 10000000


### Run 1

In [79]:
LZW.run(data)


-------------------
Running LZW Model on input data length: 10000000
-------------------
encoding_word_size: 40
Input-Ouput Verification: True
codebook length: 606305
num_bits_data_utf8: 80000000, num_bits_code: 24252160
Compression ratio: 3.298675252018789
-------------------
 Done 
-------------------


### Run 2

In [81]:
import math
LZW = LZW_Model(codebook_init = {'A':0, 'B':1}, encoding_word_size=math.ceil(np.log2(606305)))
LZW.run(data)


codebook_init: {'A': 0, 'B': 1}
-------------------
Running LZW Model on input data length: 10000000
-------------------
encoding_word_size: 20
Input-Ouput Verification: True
codebook length: 606305
num_bits_data_utf8: 80000000, num_bits_code: 12126080
Compression ratio: 6.597350504037578
-------------------
 Done 
-------------------


# 4.3

## 4.3.1 Data Cleaning
We clean all of our text files so that it only contains UTF-8 Characters.

In [10]:
#open each of the files in datasets/books and datasets/music, delete non UTF-8 characters, and overwrite file
#with the new UTF-8 file
import os
import codecs
import shutil
import sys
import re

def remove_non_utf8_characters(file_path):
    #removes non ascii 1-255 characters from the files, prints remove counts, save file into new path
    # filtered_datasets/books and filtered_datasets/music
    #file_path: path to the file to be filtered
    #returns: None
    #Prints: remove count

    with open(file_path, "rb") as f:
        data = f.read()
    #if not in ascii 1-255, replace with space
    data = re.sub(b'[^\x00-\x7F]+', b' ', data)
    remove_count = len(re.findall(b'[^\x00-\x7F]+', data))
    
    
    print(f"remove count: {remove_count}")
    new_file_path = file_path.replace("datasets", "filtered_datasets")
    new_dir = os.path.dirname(new_file_path)
    if not os.path.exists(new_dir):
        os.makedirs(new_dir)
    with open(new_file_path, "wb") as f:
        f.write(data)
    return None

    
#call remove_non_utf8_characters on all files in datasets/books and datasets/music
for root, dirs, files in os.walk("datasets"):
    for file in files:
        if file.endswith(".txt"):
            file_path = os.path.join(root, file)
            remove_non_utf8_characters(file_path)



remove count: 0
remove count: 0
remove count: 0
remove count: 0
remove count: 0
remove count: 0
remove count: 0
remove count: 0
remove count: 0


## Run LZW on datasets, books and music

In [27]:
#run LZW on all files in filtered_datasets/books and filtered_datasets/music
for encoding_size in [8,12,16,20,24,28,32,36,40]:
    
        LZW = LZW_Model(encoding_word_size=encoding_size)
        for root, dirs, files in os.walk("filtered_datasets/books"):
            for file in files:
                if file.endswith(".txt"):
                    file_path = os.path.join(root, file)
                    print(f"file_path: {file_path}")
                    try:
                        LZW.run_onfile(file_path)
                    except:
                        print(f"File: {file_path} failed on encoding size: {encoding_size}")
                
        for root, dirs, files in os.walk("filtered_datasets/music"):
            for file in files:
                if file.endswith(".txt"):
                    file_path = os.path.join(root, file)
                    print(f"file_path: {file_path}")
                    try:
                        LZW.run_onfile(file_path)
                    except:
                        print(f"File: {file_path} failed on encoding size: {encoding_size}")
    

codebook_init: {'\x00': 0, '\x01': 1, '\x02': 2, '\x03': 3, '\x04': 4, '\x05': 5, '\x06': 6, '\x07': 7, '\x08': 8, '\t': 9, '\n': 10, '\x0b': 11, '\x0c': 12, '\r': 13, '\x0e': 14, '\x0f': 15, '\x10': 16, '\x11': 17, '\x12': 18, '\x13': 19, '\x14': 20, '\x15': 21, '\x16': 22, '\x17': 23, '\x18': 24, '\x19': 25, '\x1a': 26, '\x1b': 27, '\x1c': 28, '\x1d': 29, '\x1e': 30, '\x1f': 31, ' ': 32, '!': 33, '"': 34, '#': 35, '$': 36, '%': 37, '&': 38, "'": 39, '(': 40, ')': 41, '*': 42, '+': 43, ',': 44, '-': 45, '.': 46, '/': 47, '0': 48, '1': 49, '2': 50, '3': 51, '4': 52, '5': 53, '6': 54, '7': 55, '8': 56, '9': 57, ':': 58, ';': 59, '<': 60, '=': 61, '>': 62, '?': 63, '@': 64, 'A': 65, 'B': 66, 'C': 67, 'D': 68, 'E': 69, 'F': 70, 'G': 71, 'H': 72, 'I': 73, 'J': 74, 'K': 75, 'L': 76, 'M': 77, 'N': 78, 'O': 79, 'P': 80, 'Q': 81, 'R': 82, 'S': 83, 'T': 84, 'U': 85, 'V': 86, 'W': 87, 'X': 88, 'Y': 89, 'Z': 90, '[': 91, '\\': 92, ']': 93, '^': 94, '_': 95, '`': 96, 'a': 97, 'b': 98, 'c': 99, 'd'