4
4
import os
5
5
import logging
6
6
import time
7
- import multiprocessing
8
7
import traceback
9
-
10
- import urllib .request
8
+ import optparse
9
+ import subprocess
10
+ import urllib .request , urllib .error , urllib .parse
11
11
import json
12
12
13
- def receive_devices (server = "localhost" ):
14
- '''
15
- Interrogates the NODE on its current knowledge of devices, then extracts from the JSON record
16
- only the IPs
17
- '''
18
- url = "http://%s/devices" % server
19
- devices = []
20
-
21
- try :
22
- req = urllib .request .Request (url , headers = {'Content-Type' : 'application/json' })
23
- f = urllib .request .urlopen (req , timeout = 10 )
24
- devices = json .load (f )
25
- return devices
26
-
27
- except :
28
- logging .error ("The node ethoscope server %s is not running or cannot be reached. A list of available ethoscopes could not be found." % server )
29
- return
30
- #logging.error(traceback.format_exc())
31
-
13
+ import threading
32
14
33
15
class BackupClass (object ):
16
+ '''
17
+ The default backup class. Will connect to the ethoscope mysql and mirror its content into the local sqlite3
18
+ '''
34
19
35
20
_db_credentials = {
36
21
"name" :"ethoscope_db" ,
37
22
"user" :"ethoscope" ,
38
23
"password" :"ethoscope"
39
24
}
40
25
41
- # #the db name is specific to the ethoscope being interrogated
42
- # #the user remotely accessing it is node/node
43
-
44
- # _db_credentials = {
45
- # "name":"ETHOSCOPE_001_db",
46
- # "user":"node",
47
- # "password":"node"
48
- # }
49
-
50
26
def __init__ (self , device_info , results_dir ):
51
27
52
28
self ._device_info = device_info
53
29
self ._database_ip = os .path .basename (self ._device_info ["ip" ])
54
30
self ._results_dir = results_dir
55
31
56
32
57
- def run (self ):
33
+ def backup (self ):
58
34
try :
59
35
if "backup_path" not in self ._device_info :
60
36
raise KeyError ("Could not obtain device backup path for %s" % self ._device_info ["id" ])
@@ -73,84 +49,219 @@ def run(self):
73
49
mirror .update_roi_tables ()
74
50
75
51
logging .info ("Backup status for %s is %0.2f%%" % (self ._device_info ["id" ], mirror .compare_databases () ))
52
+ return True
76
53
77
54
except DBNotReadyError as e :
78
55
logging .warning (e )
79
56
logging .warning ("Database %s on IP %s not ready, will try later" % (self ._db_credentials ["name" ], self ._database_ip ) )
80
- pass
57
+ return False
81
58
82
59
except Exception as e :
83
60
logging .error (traceback .format_exc ())
61
+ return False
62
+
63
+ class VideoBackupClass (object ):
64
+ '''
65
+ The video backup class. Will connect to the ethoscope and mirror its video chunks in the node
66
+ '''
84
67
68
+ def __init__ (self , device_info , results_dir ):
85
69
86
- class GenericBackupWrapper (object ):
87
- def __init__ (self , backup_job , results_dir , safe , server ):
88
- self ._TICK = 1.0 # s
89
- self ._BACKUP_DT = 5 * 60 # 5min
70
+ self ._device_info = device_info
71
+ self ._database_ip = os .path .basename (self ._device_info ["ip" ])
90
72
self ._results_dir = results_dir
91
- self ._safe = safe
92
- self ._backup_job = backup_job
93
- self ._server = server
94
73
95
- # for safety, starts device scanner too in case the node will go down at later stage
96
- self ._device_scanner = EthoscopeScanner (results_dir = results_dir )
74
+ def backup (self ):
75
+ try :
76
+ if "backup_path" not in self ._device_info :
77
+ raise KeyError ("Could not obtain device backup path for %s" % self ._device_info ["id" ])
78
+
79
+ if self ._device_info ["backup_path" ] is None :
80
+ raise ValueError ("backup path is None for device %s" % self ._device_info ["id" ])
97
81
82
+ backup_path = os .path .join (self ._results_dir , self ._device_info ["backup_path" ])
98
83
99
- def get_devices (self ):
100
- logging .info ("Updating list of devices" )
101
- devices = receive_devices (self ._server )
84
+
85
+ logging .info ("Initiating backup for device %s" % self ._device_info ["id" ])
102
86
103
- if not devices :
104
- logging .info ("Using Ethoscope Scanner to look for devices" )
105
- self ._device_scanner .start ()
106
- time .sleep (20 )
107
-
108
- return devices
87
+ self .get_all_videos ()
88
+ logging .info ("Backup done for for device %s" % self ._device_info ["id" ])
89
+ return True
90
+
91
+ except Exception as e :
92
+ #logging.error("Unexpected error in backup. args are: %s" % str(args))
93
+ logging .error (traceback .format_exc ())
94
+ return False
95
+
96
+ def wget_mirror_wrapper (self , target , target_prefix , output_dir , cut_dirs = 3 ):
97
+ target = target_prefix + target
98
+ command_arg_list = ["wget" ,
99
+ target ,
100
+ "-nv" ,
101
+ "--mirror" ,
102
+ "--cut-dirs=%i" % cut_dirs ,
103
+ "-nH" ,
104
+ "--directory-prefix=%s" % output_dir
105
+ ]
106
+ p = subprocess .Popen (command_arg_list , stdout = subprocess .PIPE , stderr = subprocess .STDOUT )
107
+
108
+ stdout , stderr = p .communicate ()
109
+ if p .returncode != 0 :
110
+ raise Exception ("Error %i: %s" % ( p .returncode ,stdout ))
111
+
112
+ if stdout == "" :
113
+ return False
114
+ return True
115
+
116
+
117
+ def get_video_list (self , ip , port = 9000 ,static_dir = "static" , index_file = "ethoscope_data/results/index.html" ):
118
+
119
+ url = "/" .join (["%s:%i" % (ip ,port ), static_dir , index_file ])
109
120
110
- def run (self ):
111
121
try :
122
+ response = urllib .request .urlopen (url )
123
+ out = [r .decode ('utf-8' ).rstrip () for r in response ]
124
+ except urllib .error .HTTPError as e :
125
+ logging .warning ("No index file could be found for device %s" % ip )
126
+ out = None
127
+ finally :
128
+ self .make_index (ip , port )
129
+ return out
112
130
113
- t0 = time .time ()
114
- t1 = t0 + self ._BACKUP_DT
131
+ def remove_video_from_host (self , ip , id , target , port = 9000 ):
132
+ request_url = "{ip}:{port}/rm_static_file/{id}" .format (ip = ip , id = id , port = port )
133
+ data = {"file" : target }
134
+ data = json .dumps (data )
135
+ req = urllib .request .Request (url = request_url , data = data , headers = {'Content-Type' : 'application/json' })
136
+ _ = urllib .request .urlopen (req , timeout = 5 )
115
137
116
- while True :
117
- if t1 - t0 < self ._BACKUP_DT :
118
- t1 = time .time ()
119
- time .sleep (self ._TICK )
120
- continue
121
138
122
- logging .info ("Starting backup" )
123
- devices = self .get_devices ()
139
+ def make_index (self , ip , port = 9000 , page = "make_index" ):
140
+ url = "/" .join (["%s:%i" % (ip ,port ), page ])
141
+ try :
142
+ response = urllib .request .urlopen (url )
143
+ return True
144
+ except urllib .error .HTTPError as e :
145
+ logging .warning ("No index file could be found for device %s" % ip )
146
+ return False
147
+
148
+ def get_all_videos (self , port = 9000 , static_dir = "static" ):
149
+ url = "http://" + self ._device_info ["ip" ]
150
+ id = self ._device_info ["id" ]
151
+ video_list = self .get_video_list (url , port = port , static_dir = static_dir )
152
+ #backward compatible. if no index, we do not stop
153
+ if video_list is None :
154
+ return
155
+ target_prefix = "/" .join (["%s:%i" % (url ,port ), static_dir ])
156
+ for v in video_list :
157
+ try :
158
+ current = self .wget_mirror_wrapper (v , target_prefix = target_prefix , output_dir = self ._results_dir )
159
+ except Exception as e :
160
+ logging .warning (e )
161
+ continue
124
162
125
- if not devices :
126
- devices = self ._device_scanner .get_all_devices_info ()
163
+ if not current :
164
+ # we only attempt to remove if the files is mirrored
165
+ self .remove_video_from_host (url , id , v )
127
166
128
- dev_list = str ([d for d in sorted (devices .keys ())])
129
- logging .info ("device map is: %s" % dev_list )
167
+ class GenericBackupWrapper (threading .Thread ):
168
+ def __init__ (self , results_dir , node_address , video = False ):
169
+ '''
170
+ '''
171
+ self ._TICK = 1.0 # s
172
+ self ._BACKUP_DT = 5 * 60 # 5min
173
+ self ._results_dir = results_dir
174
+ self ._node_address = node_address
175
+ self .backup_status = {}
176
+ self ._is_instance_video = video
130
177
131
- devices_to_backup = []
132
- for d in list (devices .values ()):
133
- if d ["status" ] not in ["not_in_use" , "offline" ] and d ["name" ] != "ETHOSCOPE_000" :
134
- devices_to_backup .append ((d , self ._results_dir ))
178
+ super (GenericBackupWrapper , self ).__init__ ()
135
179
136
- logging .info ("Found %s devices online" % len (devices_to_backup ))
180
+ def find_devices (self , only_active = True ):
181
+ '''
182
+ Interrogates the NODE on its current knowledge of devices
183
+ '''
184
+ url = "http://%s/devices" % self ._node_address
185
+ timeout = 10
186
+ devices = {}
187
+
188
+ try :
189
+ req = urllib .request .Request (url , headers = {'Content-Type' : 'application/json' })
190
+ f = urllib .request .urlopen (req , timeout = timeout )
191
+ devices = json .load (f )
137
192
138
- if self . _safe :
139
- for dtb in devices_to_backup :
140
- self . _backup_job ( dtb )
193
+ except urllib . error . URLError as e :
194
+ logging . error ( "The node ethoscope server %s is not running or cannot be reached. A list of available ethoscopes could not be found." % self . _node_address )
195
+ logging . info ( "Using Ethoscope Scanner to look for devices" )
141
196
142
- #map(self._backup_job, devices_to_backup)
143
- else :
144
- pool = multiprocessing .Pool (4 )
145
- _ = pool .map (self ._backup_job , devices_to_backup )
146
- logging .info ("Pool mapped" )
147
- pool .close ()
148
- logging .info ("Joining now" )
149
- pool .join ()
197
+ _device_scanner = EthoscopeScanner ()
198
+ _device_scanner .start ()
199
+ time .sleep (timeout ) #let's just wait a bit
200
+ devices = _device_scanner .get_all_devices_info ()
201
+ del _device_scanner
202
+
203
+
204
+ if only_active :
205
+ return [ d for d in list ( devices .values () ) if (d ["status" ] not in ["not_in_use" , "offline" ] and d ["name" ] != "ETHOSCOPE_000" ) ]
206
+
207
+ return devices
208
+
209
+
210
+ def _backup_job (self , device_info ):
211
+ '''
212
+ '''
213
+ try :
214
+ dev_id = device_info ["id" ]
215
+ logging .info ("Initiating backup for device %s" % dev_id )
216
+ if self ._is_instance_video :
217
+ backup_job = VideoBackupClass (device_info , results_dir = self ._results_dir )
218
+ else :
219
+ backup_job = BackupClass (device_info , results_dir = self ._results_dir )
220
+
221
+ logging .info ("Running backup for device %s" % dev_id )
222
+ self .backup_status [dev_id ] = {'started' : int (time .time ()), 'ended' : 0 }
223
+
224
+ if backup_job .backup ():
225
+
226
+ logging .info ("Backup done for for device %s" % dev_id )
227
+ self .backup_status [dev_id ]['ended' ] = int (time .time ())
228
+ else :
229
+ logging .error ("Problem backing up device %s" % dev_id )
230
+ self .backup_status [dev_id ]['ended' ] = - 1
231
+
232
+ del backup_job
233
+ return True
234
+
235
+ except Exception as e :
236
+ #logging.error("Unexpected error in backup. args are: %s" % str(args))
237
+ logging .error (traceback .format_exc ())
238
+ return False
239
+
240
+ def run (self ):
241
+ '''
242
+ '''
243
+
244
+ t0 = time .time ()
245
+ t1 = t0 + self ._BACKUP_DT
246
+
247
+ while True :
248
+ if t1 - t0 < self ._BACKUP_DT :
150
249
t1 = time .time ()
151
- logging . info ( "Backup finished at t=%i" % t1 )
152
- t0 = t1
250
+ time . sleep ( self . _TICK )
251
+ continue
153
252
154
- finally :
155
- if not devices :
156
- self ._device_scanner .stop ()
253
+ logging .info ("Starting backup round" )
254
+ active_devices = self .find_devices ()
255
+
256
+ logging .info ("Found %s devices online: %s" % (
257
+ len (active_devices ),
258
+ ', ' .join ( [dev ['id' ] for dev in active_devices ] )
259
+ ) )
260
+
261
+ for dev in active_devices :
262
+ if dev ['id' ] not in self .backup_status :
263
+ self .backup_status [dev ['id' ]] = { 'started' : 0 , 'ended' : 0 }
264
+ self ._backup_job (dev )
265
+
266
+ t0 = t1 = time .time ()
267
+ logging .info ("Backup finished at t=%i" % t1 )
0 commit comments