-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathbulk_create.exs
113 lines (106 loc) · 2.75 KB
/
bulk_create.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
alias AshPostgres.Test.{Domain, Post}
ten_rows =
1..10
|> Enum.map(fn i ->
%{
title: "Title: #{i}"
}
end)
thousand_rows =
1..1000
|> Enum.map(fn i ->
%{
title: "Title: #{i}"
}
end)
hundred_thousand_rows =
1..1000
|> Enum.map(fn i ->
%{
title: "Title: #{i}"
}
end)
# do them both once to warm things up
Ash.bulk_create(ten_rows, Post, :create,
batch_size: 10,
max_concurrency: 2
)
# do them both once to warm things up
AshPostgres.TestRepo.insert_all(Post, ten_rows)
max_concurrency = 16
batch_size = 200
Benchee.run(
%{
"ash sync": fn input ->
%{error_count: 0} = Ash.bulk_create(input, Post, :create,
batch_size: batch_size,
transaction: false
)
end,
"ash sync assuming casted": fn input ->
%{error_count: 0} = Ash.bulk_create(input, Post, :create,
batch_size: batch_size,
transaction: false,
assume_casted?: true
)
end,
"ecto sync": fn input ->
input
|> Stream.chunk_every(batch_size)
|> Enum.each(fn batch ->
AshPostgres.TestRepo.insert_all(Post, batch)
end)
end,
"ash async stream": fn input ->
input
|> Stream.chunk_every(batch_size)
|> Task.async_stream(fn batch ->
%{error_count: 0} = Ash.bulk_create(batch, Post, :create,
transaction: false
)
end, max_concurrency: max_concurrency, timeout: :infinity)
|> Stream.run()
end,
"ash async stream assuming casted": fn input ->
input
|> Stream.chunk_every(batch_size)
|> Task.async_stream(fn batch ->
%{error_count: 0} = Ash.bulk_create(batch, Post, :create,
transaction: false,
assume_casted?: true
)
end, max_concurrency: max_concurrency, timeout: :infinity)
|> Stream.run()
end,
"ash using own async option": fn input ->
%{error_count: 0} = Ash.bulk_create(input, Post, :create,
transaction: false,
max_concurrency: max_concurrency,
batch_size: batch_size
)
end,
"ash using own async option assuming casted": fn input ->
%{error_count: 0} = Ash.bulk_create(input, Post, :create,
transaction: false,
assume_casted?: true,
max_concurrency: max_concurrency,
batch_size: batch_size
)
end,
"ecto async stream": fn input ->
input
|> Stream.chunk_every(batch_size)
|> Task.async_stream(fn batch ->
AshPostgres.TestRepo.insert_all(Post, batch)
end, max_concurrency: max_concurrency, timeout: :infinity)
|> Stream.run()
end
},
after_scenario: fn _ ->
AshPostgres.TestRepo.query!("TRUNCATE posts CASCADE")
end,
inputs: %{
"10 rows" => ten_rows,
"1000 rows" => thousand_rows
}
)