@@ -7,6 +7,7 @@ var defParam=require("./defParam");
7
7
var csvline = require ( "./csvline" ) ;
8
8
var fileline = require ( "./fileline" ) ;
9
9
var dataToCSVLine = require ( "./dataToCSVLine" ) ;
10
+ var fileLineToCSVLine = require ( "./fileLineToCSVLine" ) ;
10
11
var linesToJson = require ( "./linesToJson" ) ;
11
12
var CSVError = require ( "./CSVError" ) ;
12
13
var workerMgr = require ( "./workerMgr" ) ;
@@ -41,7 +42,8 @@ function Converter(params,options) {
41
42
this . _csvTransf = null ;
42
43
this . finalResult = [ ] ;
43
44
// this.on("data", function() {});
44
- this . on ( "error" , function ( ) { } ) ;
45
+ this . on ( "error" , emitDone ( this ) ) ;
46
+ this . on ( "end" , emitDone ( this ) ) ;
45
47
this . initWorker ( ) ;
46
48
process . nextTick ( function ( ) {
47
49
if ( this . _needEmitFinalResult === null ) {
@@ -69,6 +71,13 @@ function Converter(params,options) {
69
71
return this ;
70
72
}
71
73
util . inherits ( Converter , Transform ) ;
74
+ function emitDone ( conv ) {
75
+ return function ( err ) {
76
+ process . nextTick ( function ( ) {
77
+ conv . emit ( 'done' , err )
78
+ } )
79
+ }
80
+ }
72
81
Converter . prototype . _transform = function ( data , encoding , cb ) {
73
82
if ( this . param . toArrayString && this . started === false ) {
74
83
this . started = true ;
@@ -94,21 +103,38 @@ Converter.prototype.setPartialData=function(d){
94
103
}
95
104
Converter . prototype . processData = function ( data , cb ) {
96
105
var params = this . param ;
106
+ var fileLines = fileline ( data , this . param )
107
+ if ( this . preProcessLine && typeof this . preProcessLine === "function" ) {
108
+ fileLines . lines = this . _preProcessLines ( fileLines . lines , this . lastIndex )
109
+ }
97
110
if ( ! params . _headers ) { //header is not inited. init header
98
- this . processHead ( data , cb ) ;
111
+ this . processHead ( fileLines , cb ) ;
99
112
} else {
100
113
if ( params . workerNum <= 1 ) {
101
- var lines = dataToCSVLine ( data , params ) ;
114
+ var lines = fileLineToCSVLine ( fileLines , params ) ;
102
115
this . setPartialData ( lines . partial ) ;
103
116
var jsonArr = linesToJson ( lines . lines , params , this . recordNum ) ;
104
117
this . processResult ( jsonArr )
105
118
this . lastIndex += jsonArr . length ;
106
119
this . recordNum += jsonArr . length ;
107
120
cb ( ) ;
108
121
} else {
109
- this . workerProcess ( data , cb ) ;
122
+ this . workerProcess ( fileLines , cb ) ;
123
+ }
124
+ }
125
+ }
126
+ Converter . prototype . _preProcessLines = function ( lines , startIdx ) {
127
+ var rtn = [ ]
128
+ for ( var i = 0 ; i < lines . length ; i ++ ) {
129
+ var result = this . preProcessLine ( lines [ i ] , startIdx + i + 1 )
130
+ if ( typeof result === "string" ) {
131
+ rtn . push ( result )
132
+ } else {
133
+ rtn . push ( lines [ i ] )
134
+ this . emit ( "error" , new Error ( "preProcessLine should return a string but got: " + JSON . stringify ( result ) ) )
110
135
}
111
136
}
137
+ return rtn
112
138
}
113
139
Converter . prototype . initWorker = function ( ) {
114
140
var workerNum = this . param . workerNum - 1 ;
@@ -117,14 +143,22 @@ Converter.prototype.initWorker=function(){
117
143
this . workerMgr . initWorker ( workerNum , this . param ) ;
118
144
}
119
145
}
146
+ Converter . prototype . preRawData = function ( func ) {
147
+ this . preProcessRaw = func ;
148
+ return this ;
149
+ }
150
+ Converter . prototype . preFileLine = function ( func ) {
151
+ this . preProcessLine = func ;
152
+ return this ;
153
+ }
120
154
/**
121
155
* workerpRocess does not support embeded multiple lines.
122
156
*/
123
157
124
- Converter . prototype . workerProcess = function ( data , cb ) {
158
+ Converter . prototype . workerProcess = function ( fileLine , cb ) {
125
159
var self = this ;
126
- var line = fileline ( data , this . param )
127
- var eol = this . getEol ( data )
160
+ var line = fileLine
161
+ var eol = this . getEol ( )
128
162
this . setPartialData ( line . partial )
129
163
this . workerMgr . sendWorker ( line . lines . join ( eol ) + eol , this . lastIndex , cb , function ( results , lastIndex ) {
130
164
var cur = self . sequenceBuffer [ 0 ] ;
@@ -154,10 +188,10 @@ Converter.prototype.workerProcess=function(data,cb){
154
188
} ) ;
155
189
this . lastIndex += line . lines . length ;
156
190
}
157
- Converter . prototype . processHead = function ( data , cb ) {
191
+ Converter . prototype . processHead = function ( fileLine , cb ) {
158
192
var params = this . param ;
159
193
if ( ! params . _headers ) { //header is not inited. init header
160
- var lines = dataToCSVLine ( data , params ) ;
194
+ var lines = fileLineToCSVLine ( fileLine , params ) ;
161
195
this . setPartialData ( lines . partial ) ;
162
196
if ( params . noheader ) {
163
197
if ( params . headers ) {
@@ -198,6 +232,7 @@ Converter.prototype.processResult=function(result){
198
232
// this.lastIndex+=result.length;
199
233
// cb();
200
234
}
235
+
201
236
Converter . prototype . emitResult = function ( r ) {
202
237
var index = r . index ;
203
238
var row = r . row ;
@@ -320,7 +355,7 @@ Converter.prototype.getEol = function(data) {
320
355
this . param . eol = eol ;
321
356
}
322
357
323
- return this . param . eol ;
358
+ return this . param . eol || eol ;
324
359
} ;
325
360
Converter . prototype . fromFile = function ( filePath , cb ) {
326
361
var fs = require ( 'fs' ) ;
0 commit comments