<a href="https://colab.research.google.com/github/thiteixeira/Python/blob/master/deduplication.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import unittest
import tempfile
import hashlib
import random
import pickle


#original = 'original.data'
#deduped = 'deduped_file.data'
#reduped = 'reduped_file.data'
BLOCK_SIZE = 1024


def dedup(inpath, outpath):
    hashes = {}
    index = 0

    # read file block of 1 kB at a time
    with open(inpath, 'rb') as f, open(outpath, "wb") as outFile:
        while(True):
            md5 = hashlib.md5()
            chunk = f.read(BLOCK_SIZE)
            if not chunk:
                # we are done here, write outFile!
                #print(hashes)
                pickle.dump(hashes, outFile, protocol=pickle.HIGHEST_PROTOCOL)

                return

            # hash each chunk
            md5.update(chunk)
            chunk_id = (md5.hexdigest())

            duplicates = hashes.get(chunk_id)

            # if current hash is already in the dictionary, update value
            if (duplicates):
                hashes[chunk_id].append(index)

            else:
                hashes[chunk_id] = []
                hashes[chunk_id].append(chunk)
                hashes[chunk_id].append(index)

            index += 1

def redup(inpath, outpath):
    # read reduped binary file
    result = ()
    with open(inpath, 'rb') as f:#, open(outpath, "wb") as outFile:
        bytes_list = []
        a_dict = pickle.load(f)
        for values_list in sorted(a_dict.values()):
            #print(values_list)
            file_bytes = values_list.pop(0)
            for val in values_list:
                bytes_list.insert(val, file_bytes)
        #print(bytes_list)
        result = tuple(bytes_list)
        #print(result)
    with open(outpath, 'wb') as f:
        for chunk in result:
            f.write(chunk)
    return


def gen_chunk():
    return bytes(random.randrange(256) for _ in range(1024))

def write_file(filepath, chunks):
    md5 = hashlib.md5()
    with open(filepath, 'wb') as f:
        for chunk in chunks:
            md5.update(chunk)
            f.write(chunk)
    return md5.hexdigest()

def read_file(filepath):
    md5 = hashlib.md5()
    with open(filepath, 'rb') as f:
        md5.update(f.read())
    return md5.hexdigest()

def process(chunks):
    print('Creating original file')
    before = write_file(original, chunks)
    print('Original hash: ' + str(before))
    dedup(original, deduped)
    size = os.path.getsize(original), os.path.getsize(deduped)
    if (size[0] <= size[1]):
        print("\nAssertion failure -- Deduplicated file is larger than the original.\n")
    print('\nOriginal Size {} (bytes) -> Deduped Size {} (bytes)\n'.format(size[0], size[1]))


    redup(deduped, reduped)
    after = read_file(reduped)
    print('Reduped hashed: ' + str(after))
    size = os.path.getsize(deduped), os.path.getsize(reduped)
    print('\nDeduped Size {} (bytes) -> Reduped Size {} (bytes)\n'.format(size[0], size[1]))
    if (before == after):
        print('True')
    else:
        print("Assertion failure -- Reduplicated file is not the same as the original.")


# Note: the class must be called Test
class Test(unittest.TestCase):
    def setUp(self):
        with tempfile.NamedTemporaryFile('wb', delete=False) as f:
            self.original = f.name

        with tempfile.NamedTemporaryFile('wb', delete=False) as f:
            self.deduped = f.name

        with tempfile.NamedTemporaryFile('wb', delete=False) as f:
            self.reduped = f.name

    def gen_chunk(self):
        return bytes(random.randrange(256) for _ in range(1024))

    def write_file(self, filepath, chunks):
        md5 = hashlib.md5()
        with open(filepath, 'wb') as f:
            for chunk in chunks:
                md5.update(chunk)
                f.write(chunk)
        return md5.hexdigest()

    def read_file(self, filepath):
        md5 = hashlib.md5()
        with open(filepath, 'rb') as f:
            md5.update(f.read())
        return md5.hexdigest()

    def process(self, chunks):
        before = self.write_file(self.original, chunks)
        dedup(self.original, self.deduped)

        size = os.path.getsize(self.original), os.path.getsize(self.deduped)
        self.assertTrue(size[0] >= size[1], "Assertion failure -- Deduplicated file is larger than the original.")
        print('Original Size {} (bytes) -> Deduped Size {} (bytes)'.format(size[0], size[1]))

        redup(self.deduped, self.reduped)
        after = self.read_file(self.reduped)
        self.assertEqual(before, after, "Assertion failure -- Reduplicated file is not the same as the original.")

    def test_basic_test(self):
        random.seed(0)
        chunk = self.gen_chunk()
        chunks = chunk, chunk, chunk
        self.process(chunks)

    def test_small_file(self):
        random.seed(0)
        pool = [self.gen_chunk() for _ in range(2)]
        chunks = pool[0], pool[1], pool[0], pool[0], pool[1]
        self.process(chunks)


if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)




..

Original Size 3072 (bytes) -> Deduped Size 1090 (bytes)
Original Size 5120 (bytes) -> Deduped Size 2164 (bytes)



----------------------------------------------------------------------
Ran 2 tests in 0.012s

OK
