Skip to content

Commit

Permalink
parallel parsing of csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeizbicki committed Jan 3, 2015
1 parent dbb7e8a commit a9494dc
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 28 deletions.
71 changes: 52 additions & 19 deletions src/exec/LoadData.hs
Expand Up @@ -6,8 +6,7 @@ module LoadData
import Control.DeepSeq
import Control.Monad
import Control.Monad.ST
import Data.Csv
import Data.List hiding (insert,length,concat)
import Data.List hiding (insert,length,concat,partition,head)
import Data.Maybe
import qualified Data.Set as Set
import qualified Data.Vector as V
Expand All @@ -33,6 +32,8 @@ import qualified Numeric.LinearAlgebra.Devel as LA

import SubHask hiding (Functor(..), Applicative(..), Monad(..), Then(..), fail, return)
import SubHask.Algebra.Container
import SubHask.Compatibility.ByteString
import SubHask.Compatibility.Cassava
import SubHask.Compatibility.Containers
import SubHask.Compatibility.Vector.Lebesgue
-- import HLearn.Algebra
Expand All @@ -49,12 +50,14 @@ import HLearn.Metrics.EMD
-- import HLearn.Metrics.Mahalanobis
-- import HLearn.Metrics.Mahalanobis.Normal
-- import HLearn.Models.Distributions
import SubHask.TemplateHaskell.Deriving

import Timing
import HLearn.UnsafeVector

import Debug.Trace
import Prelude (asTypeOf,unzip)
import qualified Prelude as P

--------------------------------------------------------------------------------

Expand Down Expand Up @@ -172,30 +175,58 @@ data DataParams = DataParams
, varshift :: Bool
}

{-# INLINABLE loaddata #-}
loaddata ::
( VG.Vector v Float
, NFData (v Float)
, FromRecord (v Float)
) => DataParams -> IO (Array (v Float))
loaddata params = do
rse :: Either String (V.Vector (v Float))
<- timeIO ("loading ["++datafile params++"] ")
$ fmap (decode NoHeader)
$ BS.readFile (datafile params)
head x = case unCons x of
Nothing -> error "head on empty"
Just (x,_) -> x

{-# INLINABLE loadCSV #-}
loadCSV ::
( Monoid a
, NFData a
, FromRecord a
, Eq a
-- , Normed a
, Show (Scalar a)
) => FilePath -> IO (Array a)
loadCSV filepath = do

bs <- timeIO ("loading ["++filepath++"]") $ readFileByteString filepath

let rse = decode NoHeader bs
time "parsing csv file" rse

rs <- case rse of
Right rs -> return rs
Left str -> error $ "failed to parse CSV file " ++ datafile params ++ ": " ++ take 1000 str

setptsize $ VG.length $ VG.head rs
Left str -> error $ "failed to parse CSV file " ++ filepath ++ ": " ++ take 1000 str

putStrLn " dataset info:"
putStrLn $ " num dp: " ++ show (VG.length rs)
putStrLn $ " num dim: " ++ show (VG.length $ rs V.! 0)
putStrLn $ " num dp: " ++ show (abs rs)
-- putStrLn $ " size dp: " ++ show (abs $ head rs)
putStrLn ""

return rs

{-# INLINABLE loaddata #-}
loaddata ::
( VG.Vector v f
, Monoid (v f)
, NFData (v f)
, FromRecord (v f)
, Eq (v f)
, Ord f
-- , Normed (v f)
, Show (Scalar (v f))
, Floating f
, VUM.Unbox f
) => DataParams -> IO (Array (v f))
loaddata params = do
(ArrayT rs) <- loadCSV $ datafile params

setptsize $ VG.length $ VG.head rs

rs' <- if pca params
then time "calculating PCA" $ VG.convert $ rotatePCA rs
then error "pca disabled"
-- then time "calculating PCA" $ VG.convert $ rotatePCA rs
else return rs

rs'' <- if varshift params
Expand All @@ -205,6 +236,7 @@ loaddata params = do

return $ ArrayT rs''

{-
{-# INLINABLE loadLabeledNumericData #-}
loadLabeledNumericData :: forall v f.
( VG.Vector v f
Expand Down Expand Up @@ -259,6 +291,7 @@ loadLabeledNumericData params = do
let ys' = VG.zipWith (\y r -> MaybeLabeled (label y) r) ys rs''
deepseq ys' $ return ys'
-}

-------------------------------------------------------------------------------
-- data preprocessing
Expand Down
18 changes: 9 additions & 9 deletions src/exec/hlearn-allknn.hs
Expand Up @@ -384,15 +384,15 @@ runTest params rs mqs tree knn = do
"subhask/eqUnboxedVectorFloat" (==) = eqUnboxedVectorFloat
"subhask/eqUnboxedVectorInt" (==) = eqUnboxedVectorInt

"subhask/distance_l2_float_unboxed" distance = distance_l2_float_unboxed
"subhask/isFartherThan_l2_float_unboxed" isFartherThanWithDistanceCanError=isFartherThan_l2_float_unboxed
"subhask/distance_l2_m128_unboxed" distance = distance_l2_m128_unboxed
"subhask/isFartherThan_l2_m128_unboxed" isFartherThanWithDistanceCanError=isFartherThan_l2_m128_unboxed

"subhask/distance_l2_m128_storable" distance = distance_l2_m128_storable
"subhask/distance_l2_m128d_storable" distance = distance_l2_m128d_storable
"subhask/isFartherThan_l2_m128_storable" isFartherThanWithDistanceCanError=isFartherThan_l2_m128_storable
"subhask/isFartherThan_l2_m128d_storable" isFartherThanWithDistanceCanError=isFartherThan_l2_m128d_storable
-- "subhask/distance_l2_float_unboxed" distance = distance_l2_float_unboxed
-- "subhask/isFartherThan_l2_float_unboxed" isFartherThanWithDistanceCanError=isFartherThan_l2_float_unboxed
-- "subhask/distance_l2_m128_unboxed" distance = distance_l2_m128_unboxed
-- "subhask/isFartherThan_l2_m128_unboxed" isFartherThanWithDistanceCanError=isFartherThan_l2_m128_unboxed
--
-- "subhask/distance_l2_m128_storable" distance = distance_l2_m128_storable
-- "subhask/distance_l2_m128d_storable" distance = distance_l2_m128d_storable
-- "subhask/isFartherThan_l2_m128_storable" isFartherThanWithDistanceCanError=isFartherThan_l2_m128_storable
-- "subhask/isFartherThan_l2_m128d_storable" isFartherThanWithDistanceCanError=isFartherThan_l2_m128d_storable

#-}

Expand Down

0 comments on commit a9494dc

Please sign in to comment.