-
Notifications
You must be signed in to change notification settings - Fork 256
/
rs_test.py
95 lines (77 loc) · 2.29 KB
/
rs_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# -*- coding: utf-8 -*-
import unittest
import os
import random
import string
from qiniu import rs
from qiniu import conf
def r(length):
lib = string.ascii_uppercase
return ''.join([random.choice(lib) for i in range(0, length)])
conf.ACCESS_KEY = os.getenv("QINIU_ACCESS_KEY")
conf.SECRET_KEY = os.getenv("QINIU_SECRET_KEY")
key = 'QINIU_UNIT_TEST_PIC'
bucket_name = os.getenv("QINIU_TEST_BUCKET")
noexist_key = 'QINIU_UNIT_TEST_NOEXIST' + r(30)
key2 = "rs_demo_test_key_1_" + r(5)
key3 = "rs_demo_test_key_2_" + r(5)
key4 = "rs_demo_test_key_3_" + r(5)
class TestRs(unittest.TestCase):
def test_stat(self):
r = rs.Client()
ret, err = r.stat(bucket_name, key)
assert err is None
assert ret is not None
# error
_, err = r.stat(bucket_name, noexist_key)
assert err is not None
def test_delete_move_copy(self):
r = rs.Client()
r.delete(bucket_name, key2)
r.delete(bucket_name, key3)
ret, err = r.copy(bucket_name, key, bucket_name, key2)
assert err is None, err
ret, err = r.move(bucket_name, key2, bucket_name, key3)
assert err is None, err
ret, err = r.delete(bucket_name, key3)
assert err is None, err
# error
_, err = r.delete(bucket_name, key2)
assert err is not None
_, err = r.delete(bucket_name, key3)
assert err is not None
def test_batch_stat(self):
r = rs.Client()
entries = [
rs.EntryPath(bucket_name, key),
rs.EntryPath(bucket_name, key2),
]
ret, err = r.batch_stat(entries)
assert err is None
self.assertEqual(ret[0]["code"], 200)
self.assertEqual(ret[1]["code"], 612)
def test_batch_delete_move_copy(self):
r = rs.Client()
e1 = rs.EntryPath(bucket_name, key)
e2 = rs.EntryPath(bucket_name, key2)
e3 = rs.EntryPath(bucket_name, key3)
e4 = rs.EntryPath(bucket_name, key4)
r.batch_delete([e2, e3, e4])
# copy
entries = [
rs.EntryPathPair(e1, e2),
rs.EntryPathPair(e1, e3),
]
ret, err = r.batch_copy(entries)
assert err is None
self.assertEqual(ret[0]["code"], 200)
self.assertEqual(ret[1]["code"], 200)
ret, err = r.batch_move([rs.EntryPathPair(e2, e4)])
assert err is None
self.assertEqual(ret[0]["code"], 200)
ret, err = r.batch_delete([e3, e4])
assert err is None
self.assertEqual(ret[0]["code"], 200)
r.batch_delete([e2, e3, e4])
if __name__ == "__main__":
unittest.main()