# Tutorials

## Stream processing with text8 data

Input raw text8 corpus file and return the occurent number of each tokens in corpus.

In [None]:
import * as Preprocessing from 'causal-net.preprocessing';
import * as Log from 'causal-net.log';
import * as Utils from 'causal-net.utils';
import * as Storage from 'causal-net.storage';
import * as fs from 'fs';
var { indexDBStorage } = Storage;
var { stream } = Utils;
var { termLogger } = Log;
var { nlpPreprocessing, tokenizer } = Preprocessing;

create stream process
- read chunks from file.
- transform each chunk.
- write transformed chunks into new files.

In [None]:
var remainingChars = '', wordFreqCount = {}, lineIndex = 0;
function tranformFn(chunkData, chunkEncoding, afterTransformFn){
    let sampleText = chunkData + remainingChars;
    let sampleLines = sampleText.split('\n');
    let transformedData = [];
    for(let line of sampleLines){
        let tokens = tokenizer.tokenize(line);
        wordFreqCount = nlpPreprocessing.wordFreqCount(tokens, wordFreqCount);
        lineIndex += 1;
        transformedData.push({lineIndex, tokens});
    }
    afterTransformFn(null, transformedData);
};
var transformer = stream.makeTransform(tranformFn);

function writeTokens(transformedData, chunkEncoding, afterWriteFn){
    const WriteTokensToFile = async (transformedData)=>{
        for(let {lineIndex, tokens} of transformedData){
//             console.log({lineIndex});
            await indexDBStorage.writeFile(`/corpus/line_${lineIndex}`, JSON.stringify(tokens));
        }
    }
    WriteTokensToFile(transformedData).then(()=>{
        afterWriteFn();
    })
}
var writer = stream.makeWritable(writeTokens);
var characterCount = 0;
(async ()=>{
    var corpusReader = fs.createReadStream('../server/datasets/text8/text8.txt');
    const CorpusStreamer = stream.makePipeline([corpusReader, transformer, writer], (data)=>{
        characterCount += data.length;
    });
    termLogger.groupBegin('stream performance');
    let result = await CorpusStreamer;
    termLogger.groupEnd()
    termLogger.log({ result, characterCount } );
})();

**********************************
stream performance: begin at Thu May 09 2019 15:17:07 GMT+0700 (Indochina Time)


TypeError: Cannot read property 'tokenize' of undefined
    at Transform.tranformFn [as _transform] (evalmachine.<anonymous>:11:30)
    at Transform.../../node_modules/readable-stream/lib/_stream_transform.js.Transform._read (/home/huynhnguyen/github/causality/packages/causality-utils/dist/@causalNet/utils.node.js:1746:10)
    at Transform.../../node_modules/readable-stream/lib/_stream_transform.js.Transform._write (/home/huynhnguyen/github/causality/packages/causality-utils/dist/@causalNet/utils.node.js:1734:83)
    at doWrite (/home/huynhnguyen/github/causality/packages/causality-utils/dist/@causalNet/utils.node.js:2215:64)
    at writeOrBuffer (/home/huynhnguyen/github/causality/packages/causality-utils/dist/@causalNet/utils.node.js:2204:5)
    at Transform.../../node_modules/readable-stream/lib/_stream_writable.js.Writable.write (/home/huynhnguyen/github/causality/packages/causality-utils/dist/@causalNet/utils.node.js:2121:11)
    at ReadStream.ondata (_stream_readable.js:670:20)
 

In [None]:
termLogger.log({'show 100 items': Object.entries(wordFreqCount).slice(0,100)});

After preprocessing, data is saved into files under `/copus/` folder

In [None]:
(async ()=>{
    termLogger.groupBegin('get list of preprocessing files')
    let listFiles = await indexDBStorage.getFileList('/corpus/');
    termLogger.groupEnd()
    termLogger.groupBegin('read one file from indexDB')
    let tokens = await indexDBStorage.readFile(listFiles[0]);
    termLogger.groupEnd()
    termLogger.log([ listFiles.length , JSON.parse(tokens).length]);
})()    