2626var etl = { } ;
2727exports . etl = etl ;
2828
29+ Array . prototype . clean = function ( deleteValue ) {
30+ for ( var i = 0 ; i < this . length ; i ++ ) {
31+ if ( ! this [ i ] ) {
32+ this . splice ( i , 1 ) ;
33+ i -- ;
34+ }
35+ }
36+ return this ;
37+ } ;
38+
2939etl . verify = function ( context , workspace , collection , documents , callback ) {
3040 var _document ;
3141 if ( ! Array . isArray ( documents ) )
3242 documents = [ documents ] ;
3343
44+ documents . clean ( ) ;
45+
3446 var buildRowKey = function ( obj ) {
35- var key = obj . timestamp . toISOString ( ) ;
47+ var key ;
48+ try {
49+ key = obj . timestamp . toISOString ( ) ;
50+ }
51+ catch ( ex ) {
52+ key = '' ;
53+ }
3654 traverse . map ( obj , function ( x ) {
3755 if ( x && typeof ( x ) === 'string' ) {
38-
3956 key += x ;
4057 }
4158 } ) ;
4259 key = joola . common . hash ( key ) ;
4360 return key ;
4461 } ;
4562
46- async . map ( documents , function ( doc , callback ) {
63+ async . map ( documents , function ( doc , cb ) {
4764 try {
4865 if ( ! doc . timestamp )
4966 doc . timestamp = new Date ( ) ;
5067 else {
5168 doc . timestamp = new Date ( doc . timestamp . value || doc . timestamp ) ;
5269 }
53- doc . _key = buildRowKey ( doc ) ;
70+
71+ Object . keys ( doc ) . forEach ( function ( key ) {
72+ var elem = doc [ key ] ;
73+ try {
74+ if ( elem . datatype === 'ip' || elem . key === 'ip' || key === 'ip' ) {
75+ var ipaddress = elem . value || elem ;
76+ var ip = joola . common . geoip . lookup ( ipaddress ) ;
77+ if ( ip && typeof ip === 'object' ) {
78+ delete ip . range ;
79+ ip . address = ipaddress ;
80+ doc [ key ] = ip ;
81+ doc . location = {
82+ lat : ip . ll [ 0 ] ,
83+ lon : ip . ll [ 1 ]
84+ } ;
85+ }
86+ else {
87+ ip = { } ;
88+ ip . address = ipaddress ;
89+ doc [ key ] = ip ;
90+ doc . location = {
91+ lat : 0 ,
92+ lon : 0
93+ } ;
94+ }
95+ }
96+ }
97+ catch ( ex ) {
98+ //ignore
99+ console . log ( 'exception' , ex ) ;
100+ }
101+ } ) ;
54102 }
55103 catch ( ex ) {
56- return callback ( ex ) ;
104+ doc . timestamp = new Date ( ) ;
105+ return cb ( ex ) ;
57106 }
58- return callback ( null ) ;
107+ doc . _key = buildRowKey ( doc ) ;
108+ return cb ( null ) ;
59109 } , function ( err ) {
60110 if ( err )
61111 return callback ( err ) ;
@@ -65,9 +115,7 @@ etl.verify = function (context, workspace, collection, documents, callback) {
65115 joola . dispatch . collections . metadata ( context , workspace , null , _document , function ( err , meta ) {
66116 /* istanbul ignore if */
67117 if ( err )
68- return setImmediate ( function ( ) {
69- return callback ( err ) ;
70- } ) ;
118+ return callback ( err ) ;
71119
72120 joola . dispatch . collections . get ( context , workspace , collection , function ( err , _collection ) {
73121 if ( err ) {
@@ -77,30 +125,22 @@ etl.verify = function (context, workspace, collection, documents, callback) {
77125 joola . dispatch . collections . add ( context , workspace , meta , function ( err , _collection ) {
78126 /* istanbul ignore if */
79127 if ( err && err . message != 'Collection already exist' ) {
80- return setImmediate ( function ( ) {
81- return callback ( err ) ;
82- } ) ;
128+ return callback ( err ) ;
83129 }
84130 joola . dispatch . collections . get ( context , workspace , collection , function ( err , _collection ) {
85131 if ( err ) {
86- return setImmediate ( function ( ) {
87- return callback ( err ) ;
88- } ) ;
132+ return callback ( err ) ;
89133 }
90134 if ( joola . datastore . providers . default . addcollection ) {
91135 joola . datastore . providers . default . addcollection ( meta . key , meta , function ( err ) {
92136 if ( err )
93137 return callback ( err ) ;
94138
95- return setImmediate ( function ( ) {
96- return callback ( null ) ;
97- } ) ;
139+ return callback ( null ) ;
98140 } ) ;
99141 }
100142 else {
101- return setImmediate ( function ( ) {
102- return callback ( null ) ;
103- } ) ;
143+ return callback ( null ) ;
104144 }
105145 } ) ;
106146 } ) ;
@@ -109,9 +149,7 @@ etl.verify = function (context, workspace, collection, documents, callback) {
109149 joola . dispatch . collections . metadata ( context , workspace , collection , _document , function ( err , _meta ) {
110150 /* istanbul ignore if */
111151 if ( err )
112- return setImmediate ( function ( ) {
113- return callback ( err ) ;
114- } ) ;
152+ return callback ( err ) ;
115153
116154 var differences ;
117155 delete meta . _key ;
@@ -147,22 +185,16 @@ etl.verify = function (context, workspace, collection, documents, callback) {
147185 joola . datastore . providers . default . altercollection ( meta . key , meta , differences , function ( err ) {
148186 if ( err )
149187 return callback ( err ) ;
150- return setImmediate ( function ( ) {
151- return callback ( null ) ;
152- } ) ;
188+ return callback ( null ) ;
153189 } ) ;
154190 }
155191 else {
156- return setImmediate ( function ( ) {
157- return callback ( null ) ;
158- } ) ;
192+ return callback ( null ) ;
159193 }
160194 } ) ;
161195 }
162196 else {
163- return setImmediate ( function ( ) {
164- return callback ( null ) ;
165- } ) ;
197+ return callback ( null ) ;
166198 }
167199 } ) ;
168200 }
@@ -174,46 +206,57 @@ etl.verify = function (context, workspace, collection, documents, callback) {
174206etl . load = function ( context , workspace , collection , documents , options , callback ) {
175207 joola . dispatch . collections . metadata ( context , workspace , collection , ce . clone ( documents [ 0 ] ) , function ( err , meta , _collection ) {
176208 if ( err )
177- return setImmediate ( function ( ) {
178- return callback ( err ) ;
179- } ) ;
209+ return callback ( err ) ;
180210
181211 _collection . meta = meta ;
182212 _collection . storeKey = ( workspace + '_' + collection ) . replace ( / [ ^ \w \s ] / gi, '' ) ;
183213
184214 //cleanup the object if we passed adhoc values;
185- documents . forEach ( function ( document ) {
186- traverse . forEach ( document , function ( x ) {
187- if ( x && typeof x === 'object' && x . hasOwnProperty ( 'value' ) )
188- this . update ( x . value ) ;
189- } ) ;
190- } ) ;
191-
192- var d = domain . create ( ) ;
193- d . on ( 'error' , function ( err ) {
194- console . log ( err ) ;
195- joola . logger . error ( 'Failed to insert document, ' + err ) ;
196- return callback ( 'Failed to insert document: ' + err ) ;
197- } ) ;
198- d . run ( function ( ) {
199- var filtered = _ . filter ( Object . keys ( joola . datastore . providers ) , function ( key ) {
200- var p = joola . datastore . providers [ key ] ;
201- return key !== 'default' && ( p . enabled || ! p . hasOwnProperty ( 'enabled' ) ) ;
215+ var docCount = 0 ;
216+ var traverseFound = false ;
217+ var geoFound = false ;
218+ async . map ( documents , function ( document , cb ) {
219+ if ( docCount === 0 || ( docCount > 0 && ( traverseFound || geoFound ) ) ) {
220+ traverse . forEach ( document , function ( x ) {
221+ if ( x && typeof x === 'object' && x . hasOwnProperty ( 'value' ) ) {
222+ traverseFound = true ;
223+ this . update ( x . value ) ;
224+ }
225+ } ) ;
226+ docCount ++ ;
227+ }
228+ return cb ( null ) ;
229+ } , function ( ) {
230+ var d = domain . create ( ) ;
231+ d . on ( 'error' , function ( err ) {
232+ console . log ( err ) ;
233+ joola . logger . error ( 'Failed to insert document, ' + err ) ;
234+ return callback ( 'Failed to insert document: ' + err ) ;
202235 } ) ;
203- async . map ( filtered , function ( key , cb ) {
204- var provider = joola . datastore . providers [ key ] ;
205- provider . insert ( _collection , ce . clone ( documents ) , options , function ( err , results ) {
206- if ( err )
207- return cb ( err ) ;
208- return cb ( null , results ) ;
236+ d . run ( function ( ) {
237+ var filtered = _ . filter ( Object . keys ( joola . datastore . providers ) , function ( key ) {
238+ var p = joola . datastore . providers [ key ] ;
239+ return key !== 'default' && ( p . enabled || ! p . hasOwnProperty ( 'enabled' ) ) ;
209240 } ) ;
210- } , function ( err , results ) {
211- if ( err )
212- return callback ( err ) ;
213- documents . forEach ( function ( d ) {
214- d . saved = true ;
241+ async . map ( filtered , function ( key , cb ) {
242+ var provider = joola . datastore . providers [ key ] ;
243+ provider . insert ( _collection , ce . clone ( documents ) , options , function ( err , results ) {
244+ if ( err )
245+ return cb ( err ) ;
246+ return cb ( null , results ) ;
247+ } ) ;
248+ } , function ( err , results ) {
249+ if ( err )
250+ return callback ( err ) ;
251+ async . map ( documents , function ( d , cb ) {
252+ d . saved = true ;
253+ return cb ( null , d ) ;
254+ } , function ( err , arrdocuments ) {
255+ if ( err )
256+ return callback ( err ) ;
257+ return callback ( null , arrdocuments , results ) ;
258+ } ) ;
215259 } ) ;
216- return callback ( null , documents , results ) ;
217260 } ) ;
218261 } ) ;
219262 } ) ;
@@ -305,7 +348,7 @@ exports.insert = {
305348 }
306349
307350 callback = callback || function ( ) {
308- } ;
351+ } ;
309352 if ( context . user . workspace !== workspace && context . user . permissions . indexOf ( 'superuser' ) === - 1 ) {
310353 var err = new Error ( 'Forbidden' ) ;
311354 err . code = 403 ;
@@ -327,24 +370,22 @@ exports.insert = {
327370 if ( err )
328371 return callback ( err ) ;
329372
330- return setImmediate ( function ( ) {
331- var end_ts = new Date ( ) . getTime ( ) ;
332- if ( count > 1 )
333- joola . logger . trace ( 'Beacon insert, count: ' + count + ', total: ' + ( end_ts - start_ts ) + 'ms, rate: ' + ( count / ( end_ts - start_ts ) ) + 'doc/ms' ) ;
373+ var end_ts = new Date ( ) . getTime ( ) ;
374+ if ( count > 1 )
375+ joola . logger . trace ( 'Beacon insert, count: ' + count + ', total: ' + ( end_ts - start_ts ) + 'ms, rate: ' + ( count / ( end_ts - start_ts ) ) + 'doc/ms' ) ;
334376
335- //if (workspace != '_stats')
377+ //if (workspace != '_stats')
336378// joola.stats.emit({event: 'writes', workspace: workspace, username: context.user.username, collection: collection, writeCount: count, duration_per_doc: (count / (end_ts - start_ts))});
337379
338- subscribers . forEach ( function ( subscriber , i ) {
339- if ( subscriber . disconnected )
340- subscribers . splice ( i , 1 ) ;
341- else {
342- subscriber . emit ( 'event' , collection , documents ) ;
343- }
344- } ) ;
345-
346- return callback ( null , documents ) ;
380+ subscribers . forEach ( function ( subscriber , i ) {
381+ if ( subscriber . disconnected )
382+ subscribers . splice ( i , 1 ) ;
383+ else {
384+ subscriber . emit ( 'event' , collection , documents ) ;
385+ }
347386 } ) ;
387+
388+ return callback ( null , documents ) ;
348389 } ) ;
349390 } ) ;
350391 }
@@ -376,12 +417,13 @@ exports.subscribe = {
376417 } ,
377418 _route : function ( req , res , next ) {
378419 subscribers . push ( req . socket ) ;
420+ return router . responseSuccess ( { } , { } , req , res ) ;
379421 } ,
380422 run : function ( context , callback ) {
381423 callback = callback || function ( ) {
382- } ;
424+ } ;
383425
384426 //subscribers.push(req.socket);
385- return callback ( ) ;
427+ return callback ( null , { } ) ;
386428 }
387429} ;
0 commit comments