In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
import os
import sys
os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + 'C:\\Users\\rdpuser\\Documents\\Github\\parkit;'
sys.path.append('C:\\Users\\rdpuser\\Documents\\Github\\parkit')

In [3]:
import logging
import queue
import random

import parkit as pk

logger = logging.getLogger(__name__)

pk.import_site('C:\\Users\\rdpuser\\Desktop\\test')

In [4]:
class CustomDict(pk.Dict):
    
    def get_metadata(self, value):
        return value
    
    def decode_value(self, value, meta):
        value = super().decode_value(value)
        assert meta == value
        return value
    
class CustomArray(pk.Array):
    
    def get_metadata(self, value):
        return value
    
    def decode_value(self, value, meta):
        value = super().decode_value(value)
        assert meta == value
        return value
    
class CustomQueue(pk.Queue):
    
    def get_metadata(self, value):
        return value
    
    def decode_value(self, value, meta):
        value = super().decode_value(value)
        assert meta == value
        return value

In [5]:
class Test():
    
    def __init__(self):
        self._obj1 = None
        self._obj2 = None
        
    def random_operation(self):
        if self.sizeof(self._obj1) > 1e6:
            self._obj1.clear()
            self._obj2.clear()
        random.choice(self._ops)()
        
    def sizeof(self, obj):
        return len(obj)
    
    def random_value(self):
        return random.randint(0, 1e6)
    
    def finish(self):
        if self._obj2 is not None:
            self._obj2.drop()

In [6]:
class MutableSequenceTest(Test):
    
    def __init__(self, obj_class):
        self._limit = 1e2
        self._obj1 = []
        self._obj2 = obj_class()
        self._ops = [
            self.set, self.extend,
            self.append, self.get, self.contains,
            self.maybe_contains, self.count, self.len,
            self.index, self.iterate
        ]
            
    def compare(self):
        assert len(self._obj1) == len(self._obj2)
        for i, _ in enumerate(self._obj1):
            assert self._obj2[i] == self._obj1[i]
        return True
        
    def get(self):
        if len(self._obj1):
            i = random.randrange(0, len(self._obj1))
            assert self._obj1[i] == self._obj2[i]
            
    def maybe_contains(self):
        value = self.random_value()
        assert (value in self._obj1) == (value in self._obj2)
        
    def iterate(self):
        if len(self._obj1):
            a = random.randrange(-len(self._obj1), len(self._obj1))
            b = random.randrange(-len(self._obj1), len(self._obj1))
            s1 = self._obj1[a:b]
            s2 = list(self._obj2[a:b])
            assert len(s1) == len(s2)
            for i in range(len(s1)):
                assert s1[i] == s2[i]
            
    def contains(self):
        if len(self._obj1):
            value = random.choice(self._obj1)
            assert value in self._obj2
    
    def index(self):
        if len(self._obj1):
            value = random.choice(self._obj1)
            assert self._obj1.index(value) == self._obj2.index(value)
        
    def count(self):
        if len(self._obj1):
            value = random.choice(self._obj1)
            assert self._obj1.count(value) == self._obj2.count(value)
        
    def len(self):
        assert len(self._obj1) == len(self._obj2)
        
    def append(self):
        value = self.random_value()
        self._obj1.append(value)
        self._obj2.append(value)
        
    def extend(self):
        values = [self.random_value() for _ in range(random.randint(0, self._limit))]
        self._obj1.extend(values)
        self._obj2.extend(values)
        
    def set(self):
        if len(self._obj1):
            i = random.randrange(0, len(self._obj1))
            value = self.random_value()
            self._obj1[i] = value
            self._obj2[i] = value
    
    def clear(self):
        self._obj1.clear()
        self._obj2.clear()

In [7]:
class MutableMappingTest(Test):
    
    def __init__(self, obj_class):
        self._limit = 1e2
        self._obj1 = dict()
        self._obj2 = obj_class()
        self._ops = [
            self.get, self.contains,
            self.pop, self.iter,
            self.delete, self.keys,
            self.values, self.items,
            self.update, self.getdefault,
            self.setdefault
        ]
        self._ops.extend([self.set] * 10)
                
    def compare(self):
        assert len(self._obj1) == len(self._obj2)
        for key in self._obj1.keys():
            assert self._obj1[key] == self._obj2[key]
        return True
    
    def contains(self):
        if len(self._obj1):
            key = random.choice(list(self._obj1.keys()))
            assert (key in self._obj1) == (key in self._obj2)
        key = self.random_value()
        assert (key in self._obj1) == (key in self._obj2)
        
    def setdefault(self):
        if len(self._obj1):
            key = random.choice(list(self._obj1.keys()))
            default = self.random_value()
            assert self._obj1.setdefault(key, default) == self._obj2.setdefault(key, default)
        key = self.random_value()
        default = self.random_value()
        assert self._obj1.setdefault(key, default) == self._obj2.setdefault(key, default)
        
    def getdefault(self):
        if len(self._obj1):
            key = random.choice(list(self._obj1.keys()))
            assert self._obj1.get(key, True) == self._obj2.get(key, True)
        key = self.random_value()
        assert self._obj1.get(key, True) == self._obj2.get(key, True)
        key = self.random_value()
        assert self._obj1.get(key) == self._obj2.get(key)
            
    def delete(self):
        if len(self._obj1):
            key = random.choice(list(self._obj1.keys()))
            try:
                del self._obj1[key]
                assert key in self._obj2
                del self._obj2[key]
            except KeyError:
                assert False
                
    def iter(self):
        for key in self._obj1:
            assert key in self._obj2
        assert len(self._obj1) == len(self._obj2)
        
    def update(self):
        data = {self.random_value(): self.random_value()}
        self._obj1.update(data)
        self._obj2.update(data)
        data = [(self.random_value(), self.random_value())]
        self._obj1.update(data)
        self._obj2.update(data)
        
    def keys(self):
        assert sorted(list(self._obj1.keys())) == sorted(list(self._obj2.keys()))
        
    def items(self):
        assert sorted(list(self._obj1.items())) == sorted(list(self._obj2.items()))
        
    def values(self):
        assert sorted(list(self._obj1.values())) == sorted(list(self._obj2.values()))
        
    def pop(self):
        if len(self._obj1):
            key = random.choice(list(self._obj1.keys()))
            assert self._obj1.pop(key) == self._obj2.pop(key)
            
    def get(self):
        if len(self._obj1):
            key = random.choice(list(self._obj1.keys()))
            assert self._obj1[key] == self._obj2[key]
        
    def set(self):
        key = self.random_value()
        value = self.random_value()
        self._obj1[key] = value
        self._obj2[key] = value
    
    def clear(self):
        self._obj1.clear()
        self._obj2.clear()

In [8]:
class QueueTest(Test):
    
    def __init__(self, obj_class, maxsize = 0):
        super().__init__()
        self._obj1 = queue.Queue(maxsize)
        self._obj2 = obj_class(maxsize = maxsize)
        self._ops = [
            self.get, self.put, self.qsize,
            self.empty, self.full
        ]
        
    def sizeof(self, obj):
        return obj.qsize()
    
    def compare(self):
        assert self.sizeof(self._obj1) == self.sizeof(self._obj2)
        while True:
            try:
                item1 = self._obj1.get_nowait()
                item2 = self._obj2.get()
                assert item1 == item2
            except queue.Empty:
                break
        assert self.sizeof(self._obj1) == 0
        assert self.sizeof(self._obj2) == 0
        return True
    
    def qsize(self):
        assert self._obj1.qsize() == self._obj2.qsize()
        
    def empty(self):
        assert self._obj1.empty() == self._obj2.empty()
        
    def full(self):
        assert self._obj1.full() == self._obj2.full()
    
    def get(self):
        try:
            item1 = self._obj1.get_nowait()
        except queue.Empty:
            item1 = None
        try:
            item2 = self._obj2.get()
        except queue.Empty:
            item2 = None
        assert item1 == item2
        
    def put(self):
        value = self.random_value()
        error1 = False
        try:
            self._obj1.put_nowait(value)
        except queue.Full:
            error1 = True
        error2 = False
        try:
            self._obj2.put(value)
        except queue.Full:
            error2 = True
        assert error1 == error2
        
    def clear(self):
        while True:
            try:
                assert self._obj1.get_nowait() == self._obj2.get()
            except queue.Empty:
                break
        assert self.sizeof(self._obj1) == 0
        assert self.sizeof(self._obj2) == 0

In [9]:
@pk.task
def smoke_tests(iters):
    logger.info('start smoke tests')
    tests = [
        ('custom Dict', CustomDict, MutableMappingTest, ()),
        ('standard Dict', pk.Dict, MutableMappingTest, ()),
        ('custom Array', CustomArray, MutableSequenceTest, ()),
        ('standard Array', pk.Array, MutableSequenceTest, ()),
        ('custom Queue, maxsize = 0', CustomQueue, QueueTest, ()),
        ('standard Queue, maxsize = 0', pk.Queue, QueueTest, ()),
        ('custom Queue, maxsize = 1', CustomQueue, QueueTest, (1,)),
        ('standard Queue, maxsize = 1', pk.Queue, QueueTest, (1,))
    ]
    try:
        for name, obj_class, test_class, args in tests:
            logger.info('%s test start', name)
            test = None
            try:
                test = test_class(obj_class, *args)
                for _ in range(iters):
                    test.random_operation()
                assert test.compare()
                logger.info('%s test success', name)
            finally:
                if test is not None:
                    test.finish()                        
        logger.info('all smoke tests passed')
        return True
    except Exception:
        logger.exception('test error')
        logger.info('%s test failure', name)
        logger.info('smoke tests failed')
        return False