Skip to content
Browse files

csv_reader

  • Loading branch information...
1 parent d4de211 commit 3b64b28d1ea181db02d57d183b5c28357ef235fd @maxlapshin committed Jun 7, 2012
Showing with 40 additions and 154 deletions.
  1. +16 −8 csv_bench.erl
  2. +24 −146 src/csv_reader.erl
View
24 csv_bench.erl
@@ -23,14 +23,9 @@
main([Path]) ->
code:add_pathz("./ebin"),
- ets:new(csv_entries, [public,named_table,{keypos,#evt.time}]),
+ ETS = ets:new(csv_entries, [private,named_table,{keypos,#evt.time}]),
- LoadFun = fun(Lines) ->
- ets:insert(csv_entries, Lines)
- end,
-
-
- {ok, F} = csv_reader:init(Path, [{header, evt}, {size, size(#evt{})}, {loader, LoadFun},
+ {ok, F} = csv_reader:init(Path, [{header, evt}, {size, size(#evt{})},
{"#RIC", #evt.instrument, undefined}, {"<TICKER>", #evt.instrument, undefined},
{"Date[G]", #evt.date, date}, {"<Date>", #evt.date, date},
{"Time[G]", #evt.time, time}, {"<Time>", #evt.date, date},
@@ -50,9 +45,11 @@ main([Path]) ->
{"L10-BidPrice",#evt.l10_bid_price,float},{"L10-BidSize",#evt.l10_bid_size,int},{"L10-AskPrice",#evt.l10_ask_price,float},{"L10-AskSize",#evt.l10_ask_size,int}
]),
+ Total = csv_reader:total_lines(F),
+
T1 = erlang:now(),
% Events = fprof:apply(fun() -> load(F) end, []),
- {ok, Count} = csv_reader:wait(F),
+ Count = loop(F, Total, 0, ETS),
T2 = erlang:now(),
Time = timer:now_diff(T2, T1),
io:format("NIF: ~p, ~8.2. f us per line~n", [Time div 1000, Time / Count]),
@@ -75,3 +72,14 @@ main([Path]) ->
% fprof:analyse(),
ok.
+loop(F, Total, Count, ETS) ->
+ F2 = case csv_reader:next(F) of
+ {ok, Lines, F1} ->
+ io:format("~2.. B%~n", [Count*100 div Total]),
+ ets:insert(ETS, Lines),
+ loop(F1, Total, Count + length(Lines), ETS);
+ {eof, Count} ->
+ Count
+ end,
+ F2.
+
View
170 src/csv_reader.erl
@@ -4,11 +4,9 @@
-define(D(X), io:format("~p:~p ~p~n", [?MODULE, ?LINE, X])).
% -define(D(X), ok).
--export([init/2, wait/1]).
+-export([init/2, next/1, total_lines/1]).
-export([date_to_ms/2]).
--export([start_loader/3, start_subloader/1]).
-
date_to_ms({YY,MM,DD},{H,M,S,MS}) ->
date_to_ms_nif(YY, MM, DD, H, M, S, MS);
@@ -31,13 +29,6 @@ init_nif() ->
end,
ok.
-
-init(Path, Options) ->
- % {ok, Reader} = csv_open(Path, Options),
- % {ok, Reader}.
- {ok, Reader} = proc_lib:start(?MODULE, start_loader, [Path, Options, self()]),
- {ok, Reader}.
-
-record(loader, {
file,
fd,
@@ -47,10 +38,8 @@ init(Path, Options) ->
header,
pattern,
cols,
- client,
- loader,
- parent,
count = 0,
+ total,
buffer = <<>>
}).
@@ -68,6 +57,9 @@ filter(time) -> $t;
filter(utc) -> $g;
filter(_) -> $u.
+
+total_lines(#loader{total = Total}) -> Total.
+
compile_pattern(Cols1, Options) ->
Cols = [binary_to_list(H) || H <- Cols1],
Record = atom_to_binary(proplists:get_value(header, Options, csv), latin1),
@@ -82,145 +74,53 @@ compile_pattern(Cols1, Options) ->
end, Cols),
iolist_to_binary([<<(size(Record)), Record/binary, RecordSize, (length(Map))>>, Map]).
-
-start_loader(Path, _Options, Parent) ->
- try start_loader0(Path, _Options, Parent) of
- Result -> Parent ! {csv_result, self(), Result}, Result
- catch
- Class:Error ->
- ?D({failed_loader, Class, Error, erlang:get_stacktrace()}),
- Parent ! {csv_result, self(), {error, {Error, Path}}}
- end.
-
-
-start_loader0(Path, Options, Parent) ->
- OpenOptions = case re:run(Path, "\\.gz$") of
- nomatch -> [{read_ahead, 1024*1024}];
- _ -> [compressed]
+init(Path, Options) ->
+ {OpenOptions, TotalCmd} = case re:run(Path, "\\.gz$") of
+ nomatch -> {[{read_ahead, 1024*1024}], "wc -l "++Path};
+ _ -> {[compressed], "gzcat "++Path++" | wc -l"}
end,
+ {match, [Tot]} = re:run(os:cmd(TotalCmd), "(\\d+)", [{capture,all_but_first,list}]),
+ Total = list_to_integer(Tot),
case file:open(Path, [raw, binary|OpenOptions]) of
{ok, F} ->
- start_loader1(Path, Options, Parent, OpenOptions, F);
+ start_loader1(Path, Options, F, Total);
{error, Error} ->
{error, Error}
end.
-start_loader1(Path, Options, Parent, OpenOptions, F) ->
- proc_lib:init_ack(Parent, {ok, self()}),
+start_loader1(Path, Options, F, Total) ->
{ok, Header1} = file:read_line(F),
[Header2, <<>>] = binary:split(Header1, [<<"\n">>]),
Header = binary:split(Header2, [<<",">>], [global]),
Pattern = compile_pattern(Header, Options),
- LoadFun = proplists:get_value(loader, Options, fun(Lines) ->
- Parent ! {csv, self(), length(Lines)}
- end),
-
Loader1 = #loader{
file = Path,
header = Header,
offset = size(Header1),
cols = length(Header),
pattern = Pattern,
- parent = Parent,
fd = F,
- loader = LoadFun,
+ total = Total,
options = Options
},
-
- case OpenOptions of
- [compressed] ->
- single_thread_load(Loader1);
- _ ->
- multiple_thread_load(Loader1)
- end.
+ {ok, Loader1}.
-multiple_thread_load(#loader{fd = F, offset = FileOffset, parent = Parent} = Loader1) ->
-
- LoaderCount = 4,
- {ok, FileSize} = file:position(F, eof),
- ChunkSize = FileSize div LoaderCount,
- Chunks = detect_chunks(F, FileOffset, ChunkSize, FileSize, [], LoaderCount),
- file:close(F),
-
- Loader = Loader1#loader{
- parent = self()
- },
-
- Loaders = [
- proc_lib:start_link(?MODULE, start_subloader, [Loader#loader{offset = Offset, limit = Limit}]) % , client = self()
- || {Offset, Limit} <- Chunks],
-
- [erlang:monitor(process, Pid) || Pid <- Loaders],
-
- Counts = [receive {eof, Pid, Count} -> Count end || Pid <- Loaders],
- [receive {'DOWN', _, _, Pid, _} -> ok end || Pid <- Loaders],
- TotalCount = lists:sum(Counts) + 1,
- Parent ! {eof, self(), TotalCount},
- ok.
-
-single_thread_load(#loader{fd = F, buffer = Buffer, pattern = Pattern, loader = Fun, count = Count, parent = Parent} = Loader) ->
- case file:read(F, 8192) of
+next(#loader{fd = F, buffer = Buffer, pattern = Pattern, count = Count} = Loader) ->
+ case file:read(F, 65536) of
{ok, Bin} ->
{Lines, Rest} = split_lines(<<Buffer/binary, Bin/binary>>, Pattern),
- Fun(Lines),
- single_thread_load(Loader#loader{buffer = Rest, count = Count + length(Lines)});
- {error, _Error} ->
- Parent ! {eof, self(), Count},
- ok;
- eof ->
+ {ok, Lines, Loader#loader{buffer = Rest, count = Count + length(Lines)}};
+ {error, Error} ->
+ {error, Error};
+ eof when Buffer == <<>> ->
+ {eof, Count};
+ eof ->
{Lines, _} = split_lines(<<Buffer/binary, "\n">>, Pattern),
- Fun(Lines),
- Parent ! {eof, self(), Count + length(Lines)},
- ok
+ {ok, Lines, Loader#loader{buffer = <<>>, count = Count + length(Lines)}}
end.
-detect_chunks(_F, Offset, _ChunkSize, FileSize, Acc, 1) ->
- lists:reverse([{Offset, FileSize}|Acc]);
-
-detect_chunks(F, Offset, ChunkSize, FileSize, Acc, LoaderCount) when LoaderCount > 1 ->
- file:position(F, Offset + ChunkSize),
- file:read_line(F),
- {ok, Pos} = file:position(F, {cur, 0}),
- % ?D({chunk, Offset, Pos - Offset}),
- detect_chunks(F, Pos, ChunkSize, FileSize, [{Offset, Pos}|Acc], LoaderCount - 1).
-
-start_subloader(#loader{file = Path, parent = Parent} = Loader) ->
- proc_lib:init_ack(Parent, self()),
- {ok, F} = file:open(Path, [binary,read,raw]),
- % ?D({start, self()}),
- loader(Loader#loader{file = F}).
-
-loader(#loader{offset = Offset, count = Count, limit = Limit, parent = Parent} = _Loader) when Offset >= Limit ->
- % ?D({loader, self(), finish, Count}),
- Parent ! {eof, self(), Count},
- ok;
-
-loader(#loader{file = F, offset = Offset, limit = Limit, pattern = Pattern, loader = Fun, count = Count, parent = Parent} = Loader) ->
- Size = lists:min([256*1024, Limit - Offset]),
- case file:pread(F, Offset, Size) of
- {ok, Bin} ->
- {Lines, Rest} = split_lines(Bin, Pattern),
- Fun(Lines),
- if Size == Limit - Offset andalso size(Rest) > 0 ->
- {Lines1, _} = split_lines(<<Rest/binary, "\n">>, Pattern),
- Fun(Lines1),
- ?D({refetched, Rest, Lines1}),
- Count1 = Count + length(Lines) + length(Lines1),
- ?D({loader, self(), eof, Count1}),
- Parent ! {eof, self(), Count1},
- ok;
- true ->
- loader(Loader#loader{offset = Offset + size(Bin) - size(Rest), count = length(Lines) + Count})
- end;
- eof ->
- Parent ! {eof, self(), Count},
- ?D({loader, self(), eof, Count}),
- ok
- end.
-
-
split_lines(Bin, Pattern) -> split_lines(Bin, [], Pattern).
split_lines(Bin, Acc, Pattern) ->
@@ -231,25 +131,3 @@ split_lines(Bin, Acc, Pattern) ->
% ?D(Line),
split_lines(Rest, [Line|Acc], Pattern)
end.
-
-
-wait(Reader) ->
- erlang:monitor(process, Reader),
- receive
- {'DOWN', _, _, Reader, _} -> ok
- end,
- Result = receive
- {csv_result, Reader, Res} -> Res
- after
- 0 -> undefined
- end,
- Result1 = receive
- {eof, Reader, Count} -> {ok, Count}
- after
- 0 -> undefined
- end,
- case Result1 of
- undefined -> Result;
- _ -> Result1
- end.
-

0 comments on commit 3b64b28

Please sign in to comment.
Something went wrong with that request. Please try again.