/
test_warden_carbon_aggre.py
155 lines (109 loc) · 4.18 KB
/
test_warden_carbon_aggre.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import time
import unittest
import os
import sys
import tempfile
import random
from socket import socket
from ConfigParser import ConfigParser
# Check dependencies
try:
import whisper
except Exception as e:
print('Missing required dependency: Whisper=0.9.10')
exit(1)
try:
import carbon
except Exception as e:
print('Missing required dependency: Carbon=0.9.10')
exit(1)
try:
import twisted
except Exception as e:
print('Missing required dependency: Twisted=11.10.1')
exit(1)
CARBON_SERVER = '127.0.0.1'
CARBON_PORT = 2023
test_dir = os.path.dirname(os.path.abspath(__file__)) # this is the test dir
warden_dir = os.path.dirname(test_dir) # warden root
sys.path.insert(0, warden_dir) # add warden root to path
from warden_carbon import CarbonManager
temp_dir = tempfile.mkdtemp()
test_conf = os.path.join(test_dir, 'conf', 'carbon.conf') # path to test config
test_stor = os.path.join(test_dir, 'conf', 'storage-schemas.conf') # path to test config
class WardenCarbonAggreTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.manager = CarbonManager(temp_dir)
self.manager.add_daemon(CarbonManager.CACHE, test_conf)
self.manager.add_daemon(CarbonManager.AGGREGATOR, test_conf)
self.manager.start_daemons()
config_parser = ConfigParser()
if not config_parser.read(test_stor):
print "Error: Couldn't read config file: %s" % test_stor
secindex = config_parser.sections().index('carbonaggre')
section = config_parser.sections()[secindex]
options = dict(config_parser.items(section))
retentions = whisper.parseRetentionDef(options['retentions'])
self.step = retentions[0]
self.max_datapoints = retentions[1]
self.max_sample = 20
time.sleep(2)
self.manager.print_status()
def runTest(self):
tag = 'random_data_cca'
sock = socket()
try:
sock.connect( (CARBON_SERVER,CARBON_PORT) )
except Exception as e:
self.fail("could not connect")
# Create some sample data
num_data_points = 4
num_substep = 20
data = []
lines = []
start = (time.time())
start = start - (start % self.step)
last = start
stime = float(float(self.step)/float(num_substep))
pts = (num_data_points)*(num_substep)
tp = 0.0
print('Bin is ' + str(self.step) + ' seconds.')
print('Adding ' + str(1.0/stime) + ' points a second for ' + str(num_data_points*self.step) + ' seconds.')
for i in range(num_data_points):
to_aggregate = []
for tick in range(num_substep):
to_aggregate.append( (last, random.random()*100) )
line = "folder.%s %s %d " % (tag, to_aggregate[-1][1], to_aggregate[-1][0])
sock.sendall(line+'\n')
print(line)
tp+=1.0
#print(str((tp/pts)*100) + '%')
last += stime
time.sleep(stime)
aggregated_data = aggregate(to_aggregate)
data.append( aggregated_data )
print(aggregated_data)
print('')
print('')
time.sleep(2) # NB - allows file operations to complete
tagFile = os.path.join(temp_dir, "storage","whisper","folder", tag + ".wsp")
self.assertTrue(os.path.exists(tagFile))
data_period_info, stored_data = whisper.fetch(tagFile, start-1, time.time())
print('Whisper data period : ' + str(data_period_info))
print('Whisper data : ' + str(stored_data))
print('Data expected: ' + str(data))
print len(stored_data)
print(zip(stored_data, data))
for whisper_data, sent_data in zip(stored_data, data)[:-1]: # :D
self.assertAlmostEquals(whisper_data, sent_data)
@classmethod
def tearDownClass(self):
self.manager.stop_daemons()
time.sleep(1)
self.manager.print_status()
print('done.')
def aggregate(data):
return sum([d[1] for d in data])/len(data)
if __name__ == '__main__':
unittest.main()