Skip to content

Commit

Permalink
[chore][pkg/stanza]: Use shared parser code for CSV parsing (#31302)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
* Follow up to #31081 to refactor using shared code between Stanza and
OTTL for parsing CSV.

**Testing:**
* Existing unit tests cover this refactor
  • Loading branch information
BinaryFissionGames authored Feb 16, 2024
1 parent c403102 commit 5f81014
Showing 1 changed file with 6 additions and 61 deletions.
67 changes: 6 additions & 61 deletions pkg/stanza/operator/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ package csv // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"context"
csvparser "encoding/csv"
"errors"
"fmt"
"io"
"strings"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -158,52 +157,12 @@ func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool
return nil, err
}

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = fieldDelimiter
reader.FieldsPerRecord = len(headers)
reader.LazyQuotes = lazyQuotes

// Typically only need one
lines := make([][]string, 0, 1)
for {
line, err := reader.Read()
if errors.Is(err, io.EOF) {
break
}

if err != nil && len(line) == 0 {
return nil, errors.New("failed to parse entry")
}

lines = append(lines, line)
}

/*
This parser is parsing a single value, which came from a single log entry.
Therefore, if there are multiple lines here, it should be assumed that each
subsequent line contains a continuation of the last field in the previous line.
Given a file w/ headers "A,B,C,D,E" and contents "aa,b\nb,cc,d\nd,ee",
expect reader.Read() to return bodies:
- ["aa","b"]
- ["b","cc","d"]
- ["d","ee"]
*/

joinedLine := lines[0]
for i := 1; i < len(lines); i++ {
nextLine := lines[i]

// The first element of the next line is a continuation of the previous line's last element
joinedLine[len(joinedLine)-1] += "\n" + nextLine[0]

// The remainder are separate elements
for n := 1; n < len(nextLine); n++ {
joinedLine = append(joinedLine, nextLine[n])
}
joinedLine, err := parseutils.ReadCSVRow(csvLine, fieldDelimiter, lazyQuotes)
if err != nil {
return nil, err
}

return headersMap(headers, joinedLine)
return parseutils.MapCSVHeaders(headers, joinedLine)
}
}

Expand All @@ -217,7 +176,7 @@ func generateSplitParseFunc(headers []string, fieldDelimiter rune) parseFunc {

// This parse function does not do any special quote handling; Splitting on the delimiter is sufficient.
fields := strings.Split(csvLine, string(fieldDelimiter))
return headersMap(headers, fields)
return parseutils.MapCSVHeaders(headers, fields)
}
}

Expand All @@ -235,17 +194,3 @@ func valueAsString(value any) (string, error) {

return s, nil
}

// headersMap creates a map of headers[i] -> fields[i].
func headersMap(headers []string, fields []string) (map[string]any, error) {
parsedValues := make(map[string]any)

if len(fields) != len(headers) {
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields))
}

for i, val := range fields {
parsedValues[headers[i]] = val
}
return parsedValues, nil
}

0 comments on commit 5f81014

Please sign in to comment.