In [1]:
# prompt: mount gdrive

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
class AtalantaEncoder:
    def __init__(self, model):

        self.HIGH = 0xFFFF  # Equivalent to unsigned int max value (32 bits).
        self.LOW = 0x0000

        self.range_length = 16 # The range length is always kept to 16

        self.UBC = 0   # To keep track of underflow bits.

        self.OFS_out = [] # Offset bit stream
        self.OFS_r = [] # Offset bit length stream

        self.CODE_out = [] # Symbol stream
        self.CODE_c = [] # Symbol length stream

        '''
        self.PCNT = [  # Symbol & Probability Count Table (Table 1) #TODO: fill this table
            {'v_min': 0x00, 'v_max': 0x03, 't_low': 0x000, 't_high': 0x1EB, 'OL': 2},  # Row 0
            {'v_min': 0x04, 'v_max': 0x07, 't_low': 0x1EB, 't_high': 0x229, 'OL': 2},  # Row 1
            {'v_min': 0x08, 'v_max': 0x0B, 't_low': 0x229, 't_high': 0x238, 'OL': 3},  # Row 2
            {'v_min': 0x0C, 'v_max': 0x3F, 't_low': 0x238, 't_high': 0x23A, 'OL': 6},  # Row 3
            {'v_min': 0x40, 'v_max': 0x4F, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 4
            {'v_min': 0x50, 'v_max': 0x5F, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 5
            {'v_min': 0x60, 'v_max': 0x6F, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 6
            {'v_min': 0x70, 'v_max': 0x7F, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 7
            {'v_min': 0x80, 'v_max': 0x8F, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 8
            {'v_min': 0x90, 'v_max': 0x9F, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 9
            {'v_min': 0xA0, 'v_max': 0xAF, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 10
            {'v_min': 0xB0, 'v_max': 0xBF, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 11
            {'v_min': 0xC0, 'v_max': 0xCF, 't_low': 0x23A, 't_high': 0x23A, 'OL': 4},  # Row 12
            {'v_min': 0xD0, 'v_max': 0xF3, 't_low': 0x23A, 't_high': 0x23C, 'OL': 6},  # Row 13
            {'v_min': 0xF4, 'v_max': 0xFB, 't_low': 0x23C, 't_high': 0x3FF, 'OL': 3},  # Row 14
            {'v_min': 0xFC, 'v_max': 0xFF, 't_low': 0x276, 't_high': 0x3FF, 'OL': 2},  # Row 15
            ]
        '''

        self.PCNT = model

    def get_probability(self, c):
        try:
            i = next(i for i, entry in enumerate(self.PCNT) if entry['v_min'] <= c <= entry['v_max'])
            PCNT_row = self.PCNT[i]
            return PCNT_row
        except StopIteration:
            return None

    def decimal_to_bits(self, n, bit_length=None):
        """
        Convert a decimal integer to a list of bits.

        :param n: The decimal integer to convert.
        :param bit_length: The number of bits in the output (optional).
                          If not provided, the minimum number of bits to represent `n` will be used.
        :return: A list of bits (0s and 1s).
        """
        if n < 0:
            raise ValueError("Only non-negative integers are supported.")

        # Convert the integer to binary and strip the "0b" prefix.
        #bits = list(map(int, bin(n)[2:]))
        bits = bin(n)

        # Pad with leading zeros if a fixed bit_length is specified.
        if bit_length is not None:
            if len(bits) > bit_length:
                raise ValueError("Bit length is too small to represent the number.")
            bits = [0] * (bit_length - len(bits)) + bits

        return bits

    def decimal_to_hex(self, decimal_num):
        """
        Converts a decimal integer to a hexadecimal string.

        Args:
            decimal_num (int): The decimal number to convert.

        Returns:
            str: The hexadecimal representation of the number.
        """
        if not isinstance(decimal_num, int):
            raise ValueError("Input must be an integer.")

        return hex(decimal_num).upper()  # Remove '0x' prefix and convert to uppercase

    def mask_16(self, value):
      return value & 0xFFFF

    def output_bit(self, bit):
        """
        Placeholder for outputting a single bit.
        You can define how to handle the bitstream (e.g., write to a list, file, etc.).
        """

        self.CODE_out.append(bit) # Example: Print bits to the console.

    def output_bit_plus_pending(self, bit):
        """
        Outputs a bit and resolves all pending bits.
        :param bit: The bit to output.
        """
        self.output_bit(bit)  # Output the MSB.
        while self.UBC > 0:
            self.output_bit(1 - bit)  # Output inverse of the current bit for all pending bits.
            self.UBC -= 1
        return self.UBC + 1

    def print_bin_hex_dec(self, string, value):
        print(string+": ", self.decimal_to_bits(value), self.decimal_to_hex(value), value)

    def encode(self, input_stream):
        """
        Encodes the input stream using arithmetic encoding.
        :param input_stream: An iterable containing characters to encode.
        """
        for c in input_stream:
            # Get the probability entry
            PCNT_row = self.get_probability(c)  # Returns {'v_min', 'v_max','t_low', 't_high', 'OL'}
            if PCNT_row is None:
                raise ValueError(f"Character {c} not found in the probability model.")

            #print("##################################################################")

            #print("Current value: ", c)
            #self.print_bin_hex_dec("v_min: ", PCNT_row['v_min'])
            #self.print_bin_hex_dec("v_max",  PCNT_row['v_max'])
            #self.print_bin_hex_dec("t_low",  PCNT_row['t_low'])
            #self.print_bin_hex_dec("t_high", PCNT_row['t_high'])

            # Output to Offset stream
            offset = c - PCNT_row['v_min']
            if offset.bit_length() > PCNT_row['OL']:
                raise ValueError(f"Offset {offset} is larger than OL.")
            else:

                self.OFS_out.append(offset)
                self.OFS_r.append(PCNT_row['OL'])

                #self.print_bin_hex_dec("Offset", offset)
                #self.print_bin_hex_dec("Offset length", PCNT_row['OL'])

            # Adjust HIGH & LOW
            range_val = self.HIGH - self.LOW + 1
            self.HIGH = self.LOW + ((range_val * PCNT_row['t_high']) >> 10) -1
            self.LOW = self.LOW + ((range_val * PCNT_row['t_low']) >> 10)

            #self.print_bin_hex_dec("HIGH", self.HIGH)
            #self.print_bin_hex_dec("LOW", self.LOW)

            c_out_length = 0

            #print("PROCESSING HIGH AND LOW")
            # Output stable bits while MSBs of high and low match or overlap in the middle.
            count = 0
            while True:
                if self.HIGH < 0x8000:  # Case 1: MSB is 0 for both high and low.
                    prefix_length = self.output_bit_plus_pending(0)
                    #c_out_length += prefix_length
                    #c_out_length += 1
                    self.LOW <<= 1
                    self.LOW = self.mask_16(self.LOW)
                    self.HIGH <<= 1
                    self.HIGH = self.mask_16(self.HIGH)
                    self.HIGH |= 1  # Shift and set LSB to 1.
                    #print("--------------------------------------")
                    #print("Shifted common MSB = 0")
                    #self.print_bin_hex_dec("HIGH", self.HIGH)
                    #self.print_bin_hex_dec("LOW", self.LOW)
                    #print("--------------------------------------")
                elif self.LOW >= 0x8000:  # Case 2: MSB is 1 for both high and low.
                    prefix_length =self.output_bit_plus_pending(1)
                    #c_out_length += prefix_length
                    #c_out_length += 1
                    self.LOW <<= 1
                    self.LOW = self.mask_16(self.LOW)
                    self.HIGH <<= 1
                    self.HIGH = self.mask_16(self.HIGH)
                    self.HIGH |= 1  # Shift and set LSB to 1.
                    #print("--------------------------------------")
                    #print("Shifted common MSB = 1")
                    #self.print_bin_hex_dec("HIGH", self.HIGH)
                    #self.print_bin_hex_dec("LOW", self.LOW)
                    #print("--------------------------------------")
                elif self.LOW >= 0x4000 and self.HIGH < 0xC000:  # Case 3: Deleting second MSB if HIGH is of form 1000... and LOW is of the form 0111...
                    self.UBC += 1  # Increment underflow bits

                    self.LOW <<= 1
                    self.LOW &= 0x7FFF  # set MSB to 0
                    self.LOW = self.mask_16(self.LOW)

                    self.HIGH <<= 1
                    self.HIGH = self.mask_16(self.HIGH)
                    self.HIGH |= 0x8001  # set MSB to 1 and set LSB to 1.

                    #print("--------------------------------------")
                    #print("UBC = ", self.UBC)
                    #self.print_bin_hex_dec("HIGH", self.HIGH)
                    #self.print_bin_hex_dec("LOW", self.LOW)
                    #print("--------------------------------------")
                else:
                    break

        self.UBC += 1
        if self.LOW < 0x4000:
            self.output_bit_plus_pending(0)
        else:
            self.output_bit_plus_pending(1)

            # Output the length of the symbol
            #self.CODE_c.append(c_out_length)
            #print("Symbol length: ", c_out_length)
            #self.print_bin_hex_dec("HIGH", self.HIGH)
            #self.print_bin_hex_dec("LOW", self.LOW)
            #print("##################################################################")

    def finalize(self):
        #return self.CODE_out, self.CODE_c, self.OFS_out, self.OFS_r
        return self.CODE_out, self.OFS_out, self.OFS_r



In [None]:
import numpy as np
import pandas as pd
import os
import dask.dataframe as dd
import csv
from tabulate import tabulate


def main():

    def filename_to_key(filename):
        # Remove the prefix and suffix
        base_name = filename.removeprefix("pt_").removesuffix(".csv")
        # Split the remaining part into components
        #parts = base_name.split("_")
        return base_name

    def csv_to_dict(csv_path):
        # Load the CSV into a DataFrame
        df = pd.read_csv(csv_path)
        # Convert the DataFrame to a list of dictionaries
        data_dict = df.to_dict(orient='records')
        return data_dict


    def get_probability_tables(path):
        csv_paths = []
        pt_dict = dict()
        # Ensure the path exists
        if os.path.exists(path):
            # Get list of all files and directories in the specified path
            files_and_dirs = os.listdir(path)

            # Filter out directories to get only files
            csv_paths = [f for f in files_and_dirs if os.path.isfile(os.path.join(path, f))]

        else:
            print(f"The specified path '{path}' does not exist.")
        for csv_path in csv_paths:
            pt_dict[filename_to_key(csv_path)] = csv_to_dict(os.path.join(path, csv_path))
        return pt_dict

    def run_quantization(input_array):
        # Handle non-finite values
        input_array = np.nan_to_num(input_array, nan=0, posinf=255, neginf=0)

        # Normalize to 0-255 range and convert to uint8
        input_array = ((input_array - input_array.min()) / (input_array.max() - input_array.min()) * 255).astype(np.uint8)

        return input_array

    def add_row_to_csv(row, output_file):
        # Append the row to the file
        with open(output_file, mode='a', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=['Model_Name', 'Layer', 'Type', 'Symbol_Stream', 'Offset_Stream', 'Offset_Length_Stream'])
            writer.writerow(row)

    def run_atalanta(input_stream, prob_table):
        # Initialize the encoder
        encoder = AtalantaEncoder(prob_table)

        # Run the encoder
        encoder.encode(input_stream.tolist())

        # Finalize the encoding process
        symbol_stream, offset_stream, offset_length_stream = encoder.finalize()

        return symbol_stream, offset_stream, offset_length_stream

    def print_encoded_summary_table(summary_table):

        # Convert rows to tabulate format
        table = [list(row.values()) for row in summary_table]
        headers = summary_table[0].keys()  # Use keys of the first row as headers

        # Pretty-print the table
        print(tabulate(table, headers=headers, tablefmt="grid"))

    def output_summary_to_csv(csv_table, output_file):

        # Write to the CSV file
        with open(output_file, mode='w', newline='') as file:
            # Create a CSV DictWriter object
            writer = csv.DictWriter(file, fieldnames=csv_table[0].keys())

            # Write the header row
            writer.writeheader()

            # Write each row
            for row in csv_table:
                writer.writerow(row)

        print(f"Data has been written to {output_file}")


    # Path to your CSV file
    pt_weights_csv_path = '/content/drive/MyDrive/CSCE_614/Project/probability_table_gen_results/weights_probability_tables'
    pt_act_csv_path = '/content/drive/MyDrive/CSCE_614/Project/probability_table_gen_results/activations_probability_tables'

    weights_csv_path = '/content/drive/MyDrive/CSCE_614/Project/weights_all_layers.csv'
    act_csv_path = '/content/drive/MyDrive/CSCE_614/Project/activations_all_layers.csv'

    #results_output_directory = '/content/drive/MyDrive/CSCE_614/Project/atalanta_outputs'
    # Create the output directory if it doesn't exist
    #os.makedirs(os.path.dirname(results_output_directory), exist_ok=True)

    # Output file paths
    weights_encoded_output_file = '/content/drive/MyDrive/CSCE_614/Project/atalanta_outputs/atalanta_encoded_output_weights.csv'
    act_encoded_output_file = '/content/drive/MyDrive/CSCE_614/Project/atalanta_outputs/atalanta_encoded_output_activations.csv'


    # Output CSV file paths
    weights_summary_file = '/content/drive/MyDrive/CSCE_614/Project/atalanta_outputs/atalanta_encoded_summary_weights.csv'
    act_summary_file = '/content/drive/MyDrive/CSCE_614/Project/atalanta_outputs/aatalanta_encoded_summary_activations.csv'




    file_path_dict = {
    'weights' : {'pt_tables': pt_weights_csv_path , 'input_stream': weights_csv_path, 'encoded_output': weights_encoded_output_file, 'encoded_summary': weights_summary_file},
    'activaitions' : {'pt_tables': pt_act_csv_path , 'input_stream': act_csv_path, 'encoded_output': act_encoded_output_file, 'encoded_summary': act_summary_file},
    }

    for vtype in file_path_dict.keys():
        pt_csv_path = file_path_dict[vtype]['pt_tables']
        values_csv_path = file_path_dict[vtype]['input_stream']
        encoded_output_file = file_path_dict[vtype]['encoded_output']
        csv_summary_file = file_path_dict[vtype]['encoded_summary']

        # Write the header row (only once)
        with open(encoded_output_file, mode='w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=['Model_Name', 'Layer', 'Type', 'Symbol_Stream', 'Offset_Stream', 'Offset_Length_Stream'])
            writer.writeheader()

        # Get the probability tables
        probability_tables = get_probability_tables(pt_csv_path)
        #print(probability_tables.keys())

        # Read the weights and activaitions CSV file
        #input_df = pd.read_csv(weights_csv_path)

        summary_table = []
        csv_file_out = []

        # Process CSV line by line
        with open(values_csv_path, 'r') as csvfile:
            csvreader = csv.reader(csvfile)
            headers = next(csvreader)  # Read header row

            for row_data in csvreader:
                try:
                    row = {'Model Name':row_data[0] , 'Layer Number':row_data[1], 'Type':row_data[2]}

                    # Extract numeric values after the first three columns
                    input_array = np.array(row_data[3:], dtype=np.uint8)


                    # Get the probability table
                    pt_file_name = f"{row['Model Name']}_{row['Layer Number']}_{row['Type']}"
                    prob_table = probability_tables[pt_file_name]

                    # encode using Atalanta Encoder
                    symbol_stream, offset_stream, offset_length_stream = run_atalanta(input_array,prob_table)

                    output_row = {
                        'Model_Name': row['Model Name'],
                        'Layer': row['Layer Number'],
                        'Type': row['Type'],
                        'Symbol_Stream': symbol_stream,
                        'Offset_Stream': offset_stream,
                        'Offset_Length_Stream': offset_length_stream
                    }

                    # Append the row to the CSV file
                    add_row_to_csv(output_row, encoded_output_file)

                    input_stream_length = len(input_array)
                    input_stream_length_bits = input_stream_length*8
                    symbol_stream_length = len(symbol_stream)
                    offset_length_stream_length = sum(offset_length_stream)
                    compression_ratio = (input_stream_length_bits)/(symbol_stream_length + offset_length_stream_length)
                    compression_percentage = (1-(1/compression_ratio))*100

                    output_summary = {
                        'Model_Name': row['Model Name'],
                        'Layer_Number': row['Layer Number'],
                        'Type': row['Type'],
                        'Input_Stream_Length (values)': input_stream_length,
                        'Original_Length (bits)': input_stream_length_bits,
                        'Symbol_Stream_Length (bits)': symbol_stream_length,
                        'Offset_Stream_Length (bits)': offset_length_stream_length,
                        'After Compression (bits)': (symbol_stream_length + offset_length_stream_length),
                        'Compression_Ratio': compression_ratio,
                        'Compression_Percentage': compression_percentage
                        }

                    summary_table.append(output_summary)

                    csv_summary = {
                        'Model_Name': row['Model Name'],
                        'Layer_Number': row['Layer Number'],
                        'Type': row['Type'],
                        'Input_Stream_Length (values)': input_stream_length,
                        'Original (bits)': input_stream_length_bits,
                        'After Compression (bits)': (symbol_stream_length + offset_length_stream_length),
                        'Compression_Ratio': compression_ratio,
                        'Compression_Percentage': compression_percentage
                        }

                    csv_file_out.append(csv_summary)
                except Exception as e:
                    print(f"Error processing row: {e}")
                    continue


        # Print the summary table
        print_encoded_summary_table(summary_table)
        output_summary_to_csv(csv_file_out, csv_summary_file)

if __name__ == "__main__":
    main()


+--------------+-----------------------------------+---------+--------------------------------+--------------------------+-------------------------------+-------------------------------+----------------------------+---------------------+--------------------------+
| Model_Name   | Layer_Number                      | Type    |   Input_Stream_Length (values) |   Original_Length (bits) |   Symbol_Stream_Length (bits) |   Offset_Stream_Length (bits) |   After Compression (bits) |   Compression_Ratio |   Compression_Percentage |
| Resnet50     | conv1.weight                      | weights |                           9408 |                    75264 |                         30703 |                         32443 |                      63146 |             1.1919  |                 16.1007  |
+--------------+-----------------------------------+---------+--------------------------------+--------------------------+-------------------------------+-------------------------------+-------------------

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
class AtalantaDecoder:
    def __init__(self, PCNT):
        """
        Initialize the decoder with the given probability count table (PCNT).
        """
        self.HIGH = 0xFFFF  # Max value for 16 bits
        self.LOW = 0x0000  # Min value for 16 bits
        self.PCNT = PCNT   # Symbol & Probability Count Table
        self.value = 0     # Decoded value from input bits
        self.input_bits = []  # Input bitstream (to be provided for decoding)

    def mask_16(self, value):
        """
        Mask the given value to ensure it fits within 16 bits.
        """
        return value & 0xFFFF

    def load_initial_value(self):
        """
        Load the initial value by reading the first 16 bits from the input bitstream.
        """
        self.value = 0
        for _ in range(16):  # Read 16 bits
            if self.input_bits:
                bit = self.input_bits.pop(0)
                self.value = (self.value << 1) | bit
                self.value = self.mask_16(self.value)  # Keep `value` within 16 bits
            else:
                raise ValueError("Insufficient bits in the input stream to load initial value.")

    def get_symbol_from_range(self):
        """
        Identify the symbol corresponding to the current range.
        """
        range_val = self.HIGH - self.LOW + 1
        scaled_value = ((self.value - self.LOW + 1) * 1024 - 1) // range_val

        # Find the symbol whose range includes the scaled value
        for entry in self.PCNT:
            if entry['t_low'] <= scaled_value < entry['t_high']:
                return entry
        raise ValueError(f"Scaled value {scaled_value} does not match any range in PCNT.")

    def decode(self, bitstream):
        """
        Decodes the given bitstream into the original symbols.
        :param bitstream: A list of bits representing the encoded input stream.
        :return: A list of decoded symbols.
        """
        self.input_bits = bitstream  # Initialize the input bitstream
        self.load_initial_value()   # Load the initial value from the first 16 bits

        decoded_symbols = []

        while self.input_bits:
            # Step 1: Get the symbol from the current range
            symbol_entry = self.get_symbol_from_range()
            decoded_symbols.append(symbol_entry['v_min'])

            # Step 2: Update HIGH and LOW based on the symbol's range
            range_val = self.HIGH - self.LOW + 1
            self.HIGH = self.LOW + ((range_val * symbol_entry['t_high']) >> 10) - 1
            self.LOW = self.LOW + ((range_val * symbol_entry['t_low']) >> 10)

            # Ensure HIGH and LOW remain 16-bit values
            self.HIGH = self.mask_16(self.HIGH)
            self.LOW = self.mask_16(self.LOW)

            # Step 3: Adjust HIGH and LOW by processing input bits to stabilize the range
            while True:
                if self.HIGH < 0x8000:  # Case 1: MSB of both HIGH and LOW is 0
                    self.HIGH = (self.HIGH << 1) | 1
                    self.LOW = (self.LOW << 1)
                    self.value = (self.value << 1) | self._consume_bit()

                elif self.LOW >= 0x8000:  # Case 2: MSB of both HIGH and LOW is 1
                    self.HIGH = (self.HIGH << 1) | 1
                    self.LOW = (self.LOW << 1)
                    self.value = (self.value << 1) | self._consume_bit()

                elif self.LOW >= 0x4000 and self.HIGH < 0xC000:  # Case 3: Underflow
                    self.HIGH = ((self.HIGH << 1) & 0xFFFF) | 0x8001
                    self.LOW = ((self.LOW << 1) & 0x7FFF)
                    self.value = ((self.value << 1) | self._consume_bit()) & 0x7FFF

                else:
                    break

                # Mask all values to keep them 16 bits
                self.HIGH = self.mask_16(self.HIGH)
                self.LOW = self.mask_16(self.LOW)
                self.value = self.mask_16(self.value)

        return decoded_symbols

    def _consume_bit(self):
        """
        Consumes the next bit from the input stream. If no bits are left, appends a 0.
        """
        if self.input_bits:
            return self.input_bits.pop(0)
        return 0
