-
Notifications
You must be signed in to change notification settings - Fork 11
/
readDemo.py
executable file
·106 lines (86 loc) · 2.95 KB
/
readDemo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/python
import sys
from random import randint
import time
from dse.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from dse.auth import PlainTextAuthProvider
from dse.policies import DCAwareRoundRobinPolicy,TokenAwarePolicy, ConstantSpeculativeExecutionPolicy
from dse import ConsistencyLevel
from ConfigParser import ConfigParser
config = ConfigParser()
config.read('demo.ini')
section = 'READ'
#Default CL if not set in config ini
CL = ConsistencyLevel.ONE
#Configuration
contactpoints = config.get('CONFIG','contactpoints').split(',')
localDC = config.get(section,'localDC')
rowcount = config.getint(section,'rowcount')
cross_dc_latency = config.getint('CONFIG','crossdclatency')
ks_query = config.get('CONFIG','ks_query')
auth_provider = PlainTextAuthProvider (username= config.get('CONFIG','clusteruser'), password= config.get('CONFIG','clusterpass'))
if config.get(section,'cl') == "ONE":
CL = ConsistencyLevel.ONE
if config.get(section,'cl') == "TWO":
CL = ConsistencyLevel.TWO
if config.get(section,'cl') == "LOCAL_QUORUM":
CL = ConsistencyLevel.LOCAL_QUORUM
if config.get(section,'cl') == "QUORUM":
CL = ConsistencyLevel.QUORUM
if config.get(section,'cl') == "ALL":
CL = ConsistencyLevel.ALL
#SSL
if config.getint('CONFIG','sslenabled') == 0:
ssl_opts = None
else:
ssl_opts = {
'ca_certs': config.get('CONFIG','sslca'),
'ssl_version': PROTOCOL_TLSv1,
'cert_reqs': CERT_OPTIONAL
}
profile1 = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=localDC, used_hosts_per_remote_dc=3),
speculative_execution_policy=ConstantSpeculativeExecutionPolicy(.05, 20),
consistency_level = CL
)
print "Connecting to cluster"
cluster = Cluster( contact_points=contactpoints,
auth_provider=auth_provider,
execution_profiles={EXEC_PROFILE_DEFAULT: profile1},
)
session = cluster.connect()
used_dc = localDC
coordinator = localDC
c = 0
x = 0
d = ""
while 1:
current = time.localtime()
bucket = str(current.tm_year) + str(current.tm_mon) + str(current.tm_mday) + str(current.tm_hour) + str(current.tm_min)
ts = int(time.time() * 1000)
query = """ select * from demo.table1 where bucket = '%s' limit 1 """ % (bucket)
try:
results = session.execute (query)
except:
error = 1
for r in results:
d = r.d
while ts + cross_dc_latency > int(time.time() * 1000):
t1 = 0
c = c + 1
x = x + 1
if(x == rowcount):
last_c = coordinator
future = session.execute_async (query, trace=True )
try:
result = future.result()
trace = future.get_query_trace( 1 )
coordinator = trace.coordinator
except:
coordinator = last_c
for h in session.hosts:
if h.address == coordinator:
used_dc = h.datacenter
print(""" Rows Read %s (%s) - %s""" ) % (c, used_dc, d)
x = 0
cluster.shutdown()
sys.exit(0)