-
Notifications
You must be signed in to change notification settings - Fork 138
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
329 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#!/bin/sh | ||
|
||
python tests/test_helper.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
#! /usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# vim:fenc=utf-8 | ||
# | ||
# Copyright © 2018 YanMing <***REMOVED***> | ||
# | ||
# Distributed under terms of the MIT license. | ||
|
||
""" | ||
redis client wrapper | ||
""" | ||
|
||
import redis | ||
|
||
class RedisWrapper: | ||
def __init__(self, ip , port): | ||
self.r = redis.StrictRedis(host=ip, port=port) | ||
|
||
def get_instance(self): | ||
return self.r |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
#! /usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# vim:fenc=utf-8 | ||
# | ||
# Copyright © 2018 yongman <yming0221@gmail.com> | ||
# | ||
# Distributed under terms of the MIT license. | ||
|
||
""" | ||
unit test for hash type | ||
""" | ||
|
||
import unittest | ||
import time | ||
from rediswrap import RedisWrapper | ||
|
||
class HashTest(unittest.TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
print 'connect to 127.0.0.1:7379\n' | ||
cls.r = RedisWrapper('127.0.0.1', 7379).get_instance() | ||
cls.k1 = '__hash1__' | ||
cls.k2 = '__hash2__' | ||
|
||
cls.f1 = 'f1' | ||
cls.f2 = 'f2' | ||
cls.f3 = 'f3' | ||
cls.f4 = 'f4' | ||
|
||
cls.v1 = 'value1' | ||
cls.v2 = 'value2' | ||
cls.v3 = 'value3' | ||
cls.v4 = 'value4' | ||
|
||
def setUp(self): | ||
self.r.execute_command('hclear', self.k1) | ||
self.r.execute_command('hclear', self.k2) | ||
pass | ||
|
||
def test_hget(self): | ||
self.assertEqual(self.r.hset(self.k1, self.f1, self.v1), 1) | ||
self.assertEqual(self.v1, self.r.hget(self.k1, self.f1)) | ||
|
||
def test_hset(self): | ||
self.assertEqual(self.r.hset(self.k1, self.f1, self.v1), 1) | ||
self.assertEqual(self.v1, self.r.hget(self.k1, self.f1)) | ||
|
||
def test_hexists(self): | ||
self.assertEqual(self.r.hset(self.k1, self.f1, self.v1), 1) | ||
self.assertTrue(self.r.hexists(self.k1, self.f1)) | ||
|
||
def test_hstrlen(self): | ||
self.assertEqual(self.r.hset(self.k1, self.f1, self.v1), 1) | ||
self.assertEqual(self.r.hstrlen(self.k1, self.f1), len(self.v1)) | ||
|
||
def test_hlen(self): | ||
prefix = '__' | ||
for i in range(0, 200): | ||
f = '{}{}'.format(prefix, i) | ||
self.assertEqual(self.r.hset(self.k2, f, f), 1) | ||
self.assertEqual(self.r.hlen(self.k2), 200) | ||
|
||
def test_hmget(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertListEqual(self.r.hmget(self.k1, self.f1, self.f2, self.f3), [self.v1, self.v2, self.v3]) | ||
|
||
def test_hdel(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertEqual(self.r.hdel(self.k1, self.f1, self.f2, self.f3, self.f4), 3) | ||
self.assertEqual(self.r.hlen(self.k1), 0) | ||
|
||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertEqual(self.r.hdel(self.k1, self.f1, self.f2), 2) | ||
self.assertEqual(self.r.hlen(self.k1), 1) | ||
|
||
def test_hkeys(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertListEqual(self.r.hkeys(self.k1), [self.f1, self.f2, self.f3]) | ||
|
||
def test_hvals(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertListEqual(self.r.hvals(self.k1), [self.v1, self.v2, self.v3]) | ||
|
||
def test_hgetall(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertDictEqual(self.r.hgetall(self.k1), {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3}) | ||
|
||
def test_hclear(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
self.assertTrue(self.r.execute_command("HCLEAR", self.k1)) | ||
self.assertEqual(self.r.hlen(self.k1), 0) | ||
|
||
def test_hpexpire(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
# expire in 5s | ||
self.assertEqual(self.r.execute_command("HPEXPIRE", self.k1, 5000), 1) | ||
self.assertLessEqual(self.r.execute_command("HPTTL", self.k1), 5000) | ||
self.assertEqual(self.r.hlen(self.k1), 3) | ||
time.sleep(6) | ||
self.assertEqual(self.r.hlen(self.k1), 0) | ||
|
||
def test_pexpireat(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
# expire in 5s | ||
ts = int(round(time.time()*1000)) + 5000 | ||
self.assertEqual(self.r.execute_command('hpexpireat', self.k1, ts), 1) | ||
self.assertLessEqual(self.r.execute_command('hpttl', self.k1), 5000) | ||
self.assertEqual(self.r.hlen(self.k1), 3) | ||
time.sleep(6) | ||
self.assertEqual(self.r.hlen(self.k1), 0) | ||
|
||
def test_expire(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
# expire in 5s | ||
self.assertEqual(self.r.execute_command('hexpire', self.k1, 5), 1) | ||
self.assertLessEqual(self.r.execute_command('httl', self.k1), 5) | ||
self.assertEqual(self.r.hlen(self.k1), 3) | ||
time.sleep(6) | ||
self.assertEqual(self.r.hlen(self.k1), 0) | ||
|
||
def test_expireat(self): | ||
self.assertTrue(self.r.hmset(self.k1, {self.f1:self.v1, self.f2:self.v2, self.f3:self.v3})) | ||
# expire in 5s | ||
ts = int(round(time.time())) + 5 | ||
self.assertEqual(self.r.execute_command('hexpireat', self.k1, ts), 1) | ||
self.assertLessEqual(self.r.execute_command('httl', self.k1), 5) | ||
self.assertEqual(self.r.hlen(self.k1), 3) | ||
time.sleep(6) | ||
self.assertEqual(self.r.hlen(self.k1), 0) | ||
|
||
def tearDown(self): | ||
pass | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
cls.r.execute_command('hclear', cls.k1) | ||
cls.r.execute_command('hclear', cls.k2) | ||
print '\nclean up\n' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
#! /usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# vim:fenc=utf-8 | ||
# | ||
# Copyright © 2018 YanMing <***REMOVED***> | ||
# | ||
# Distributed under terms of the MIT license. | ||
|
||
""" | ||
all unit tests | ||
""" | ||
|
||
import unittest | ||
|
||
from test_string import StringTest | ||
from test_hash import HashTest | ||
|
||
if __name__ == '__main__': | ||
suite = unittest.TestSuite() | ||
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StringTest)) | ||
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(HashTest)) | ||
|
||
runner = unittest.TextTestRunner(verbosity=2) | ||
runner.run(suite) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
#! /usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# vim:fenc=utf-8 | ||
# | ||
# Copyright © 2018 yongman <yming0221@gmail.com> | ||
# | ||
# Distributed under terms of the MIT license. | ||
|
||
""" | ||
unit test for string type | ||
""" | ||
|
||
import unittest | ||
import time | ||
from rediswrap import RedisWrapper | ||
|
||
class StringTest(unittest.TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
print 'connect to 127.0.0.1:7379\n' | ||
cls.r = RedisWrapper('127.0.0.1', 7379).get_instance() | ||
cls.k1 = '__string1__' | ||
cls.v1 = 'value1' | ||
cls.k2 = '__string2__' | ||
cls.v2 = 'value2' | ||
|
||
def setUp(self): | ||
self.r.delete(self.k1) | ||
self.r.delete(self.k2) | ||
pass | ||
|
||
def test_get(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
v1 = self.r.get(self.k1) | ||
self.assertEqual(self.v1, v1, '{} != {}'.format(v1, self.v1)) | ||
|
||
def test_set(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
v1 = self.r.get(self.k1) | ||
self.assertEqual(self.v1, v1, '{} != {}'.format(v1, self.v1)) | ||
|
||
def test_del(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
v1 = self.r.get(self.k1) | ||
self.assertEqual(self.v1, v1, '{} != {}'.format(v1, self.v1)) | ||
v1 = self.r.delete(self.k1) | ||
self.assertEqual(v1, 1, '{} != 1'.format(v1)) | ||
v1 = self.r.get(self.k1) | ||
self.assertIsNone(v1, '{} != None'.format(v1)) | ||
|
||
def test_mget(self): | ||
self.assertTrue(self.r.mset({self.k1:self.v1, self.k2:self.v2})) | ||
self.assertListEqual(self.r.mget(self.k1, self.k2), [self.v1, self.v2]) | ||
|
||
def test_mset(self): | ||
self.assertTrue(self.r.mset({self.k1:self.v1, self.k2:self.v2})) | ||
self.assertListEqual(self.r.mget(self.k1, self.k2), [self.v1, self.v2]) | ||
|
||
def test_incr(self): | ||
# incr a new key | ||
self.assertEqual(self.r.incr(self.k1), 1) | ||
# incr a valid number key | ||
self.assertEqual(self.r.incr(self.k1), 2) | ||
|
||
# incr a invalid number | ||
self.assertTrue(self.r.set(self.k2, self.v2)) | ||
|
||
with self.assertRaises(Exception) as cm: | ||
self.r.incr(self.k2) | ||
err = cm.exception | ||
self.assertEqual(str(err), 'value is not an integer or out of range') | ||
|
||
def test_incrby(self): | ||
self.assertTrue(self.r.set(self.k1, 12345678)) | ||
self.assertEqual(self.r.incrby(self.k1, 12345678), 24691356) | ||
|
||
def test_decr(self): | ||
# decr a new key | ||
self.assertEqual(self.r.decr(self.k1), -1) | ||
# decr a valid number key | ||
self.assertEqual(self.r.decr(self.k1), -2) | ||
|
||
# decr a invalid number | ||
self.assertTrue(self.r.set(self.k2, self.v2)) | ||
|
||
with self.assertRaises(Exception) as cm: | ||
self.r.decr(self.k2) | ||
err = cm.exception | ||
self.assertEqual(str(err), 'value is not an integer or out of range') | ||
|
||
def test_decrby(self): | ||
self.assertTrue(self.r.set(self.k1, 12345678)) | ||
self.assertEqual(self.r.decr(self.k1, 12345679), -1) | ||
|
||
def test_strlen(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
self.assertEqual(self.r.strlen(self.k1), len(self.v1)) | ||
|
||
def test_pexpire(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
# expire in 5s | ||
self.assertTrue(self.r.pexpire(self.k1, 5000)) | ||
self.assertLessEqual(self.r.pttl(self.k1), 5000) | ||
self.assertEqual(self.r.get(self.k1), self.v1) | ||
time.sleep(5) | ||
self.assertIsNone(self.r.get(self.k1)) | ||
|
||
def test_pexpireat(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
# expire in 5s | ||
ts = int(round(time.time()*1000)) + 5000 | ||
self.assertTrue(self.r.pexpireat(self.k1, ts)) | ||
self.assertLessEqual(self.r.pttl(self.k1), 5000) | ||
self.assertEqual(self.r.get(self.k1), self.v1) | ||
time.sleep(5) | ||
self.assertIsNone(self.r.get(self.k1)) | ||
|
||
def test_expire(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
# expire in 5s | ||
self.assertTrue(self.r.expire(self.k1, 5)) | ||
self.assertLessEqual(self.r.ttl(self.k1), 5) | ||
self.assertEqual(self.r.get(self.k1), self.v1) | ||
time.sleep(5) | ||
self.assertIsNone(self.r.get(self.k1)) | ||
|
||
def test_expireat(self): | ||
self.assertTrue(self.r.set(self.k1, self.v1)) | ||
# expire in 5s | ||
ts = int(round(time.time())) + 5 | ||
self.assertTrue(self.r.expireat(self.k1, ts)) | ||
self.assertLessEqual(self.r.ttl(self.k1), 5) | ||
self.assertEqual(self.r.get(self.k1), self.v1) | ||
time.sleep(5) | ||
self.assertIsNone(self.r.get(self.k1)) | ||
|
||
def tearDown(self): | ||
pass | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
cls.r.delete(cls.k1) | ||
cls.r.delete(cls.k2) | ||
print '\nclean up\n' |