Skip to content

Commit

Permalink
attempt 8: read file and process it in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
shraddhaag committed Jan 29, 2024
1 parent e5213a8 commit 067f2a4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 38 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
|4|Instead of sending each line to the channel, now sending 100 lines chunked together. Also, to minimise garbage collection, not freeing up memory when resetting a slice. |3:41.76|-161.07|[b7b1781](https://github.com/shraddhaag/1brc/commit/b7b1781f58fd258a06940bd6c05eb404c8a14af6)|
|5|Read file in chunks of 100 MB instead of reading line by line. |3:32.62|-9.14|[c26fea4](https://github.com/shraddhaag/1brc/commit/c26fea40019552a7e4fc1c864236f433b1b686f0)|
|6|Convert temperature from `string` to `int64`, process in `int64` and convert to `float64` at the end. |2:51.50|-41.14|[7812da4](https://github.com/shraddhaag/1brc/commit/7812da4d0be07dd4686d5f9b9df1e93b08cd0dd1)|
|7|In the city <> temperatures map, replaced the value for each key (city) to preprocessed min, max, count and sum of all temperatures instead of storing all recorded temperatures for the city.|1:39.81|-71.79||
|7|In the city <> temperatures map, replaced the value for each key (city) to preprocessed min, max, count and sum of all temperatures instead of storing all recorded temperatures for the city.|1:39.81|-71.79|[e5213a8](https://github.com/shraddhaag/1brc/commit/e5213a836b17bec0a858474a11f07c902e724bba)|
|8|Use producer consumer pattern to read file in chunks and process the chunks in parallel.|1:43.82|+14.01||
96 changes: 59 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"errors"
"flag"
"fmt"
Expand All @@ -14,6 +15,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
)

var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
Expand Down Expand Up @@ -47,7 +49,7 @@ func main() {
defer pprof.StopCPUProfile()
}

fmt.Println(evaluate(*input))
evaluate(*input)

if *memprofile != "" {
f, err := os.Create("./profiles/" + *memprofile)
Expand All @@ -62,11 +64,6 @@ func main() {
}
}

type result struct {
city string
temp string
}

func evaluate(input string) string {
mapOfTemp, err := readFileLineByLineIntoAMap(input)
if err != nil {
Expand Down Expand Up @@ -104,39 +101,59 @@ func readFileLineByLineIntoAMap(filepath string) (map[string]cityTemperatureInfo
if err != nil {
panic(err)
}
defer file.Close()

mapOfTemp := make(map[string]cityTemperatureInfo)

chanOwner := func() <-chan []string {
resultStream := make(chan []string, 100)
toSend := make([]string, 100)
// reading 100MB per request
chunkSize := 100 * 1024 * 1024
buf := make([]byte, chunkSize)
var stringsBuilder strings.Builder
stringsBuilder.Grow(500)
var count int
resultStream := make(chan []string, 100)
chunkStream := make(chan []byte, 15)
chunkSize := 64 * 1024 * 1024
var wg sync.WaitGroup

// spawn workers to consume (process) file chunks read
for i := 0; i < runtime.NumCPU()-1; i++ {
wg.Add(1)
go func() {
defer close(resultStream)
for {
readTotal, err := file.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
count = processReadChunk(buf, readTotal, count, &stringsBuilder, toSend, resultStream)
break
}
panic(err)
}
count = processReadChunk(buf, readTotal, count, &stringsBuilder, toSend, resultStream)
}
if count != 0 {
resultStream <- toSend[:count]
for chunk := range chunkStream {
processReadChunk(chunk, resultStream)
}
wg.Done()
}()
return resultStream
}

resultStream := chanOwner()
// spawn a goroutine to read file in chunks and send it to the chunk channel for further processing
go func() {
buf := make([]byte, chunkSize)
leftover := make([]byte, 0, chunkSize)
for {
readTotal, err := file.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
panic(err)
}
buf = buf[:readTotal]

toSend := make([]byte, readTotal)
copy(toSend, buf)

lastNewLineIndex := bytes.LastIndex(buf, []byte{'\n'})

toSend = append(leftover, buf[:lastNewLineIndex+1]...)
leftover = make([]byte, len(buf[lastNewLineIndex+1:]))
copy(leftover, buf[lastNewLineIndex+1:])

chunkStream <- toSend

}
close(chunkStream)

// wait for all chunks to be proccessed before closing the result stream
wg.Wait()
close(resultStream)
}()

// process all city temperatures derived after processing the file chunks
for t := range resultStream {
for _, text := range t {
index := strings.Index(text, ";")
Expand Down Expand Up @@ -166,7 +183,7 @@ func readFileLineByLineIntoAMap(filepath string) (map[string]cityTemperatureInfo
}
}
}
// fmt.Println(mapOfTemp)

return mapOfTemp, nil
}

Expand All @@ -176,8 +193,12 @@ func convertStringToInt64(input string) int64 {
return output
}

func processReadChunk(buf []byte, readTotal, count int, stringsBuilder *strings.Builder, toSend []string, resultStream chan<- []string) int {
for _, char := range buf[:readTotal] {
func processReadChunk(buf []byte, resultStream chan<- []string) {
var count int
var stringsBuilder strings.Builder
toSend := make([]string, 100)

for _, char := range buf {
if char == '\n' {
if stringsBuilder.Len() != 0 {
toSend[count] = stringsBuilder.String()
Expand All @@ -195,8 +216,9 @@ func processReadChunk(buf []byte, readTotal, count int, stringsBuilder *strings.
stringsBuilder.WriteByte(char)
}
}

return count
if count != 0 {
resultStream <- toSend[:count]
}
}

func round(x float64) float64 {
Expand Down
Binary file added profiles/cpu-parallel.prof
Binary file not shown.

0 comments on commit 067f2a4

Please sign in to comment.