Skip to content

typeduck/bigcsv

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

bigcsv

bigcsv exists to conveniently parse and process large CSV files in a streaming fashion.

It is a fairly thin wrapper around encoding/csv which allows reusing a parsing function as well as the option to process rows in parallel.

Usage

Below is a fairly full usage example showing processing over HTTP, with multiple workers, and a canceling context.

Similar to the csv.Reader, you should first create the parser, then change any settings prior to reading.

The basic steps are:

  1. define a type to hold the data you want from a single row.

  2. provide a function which parses the row ([]string) and returns your data type (plus an error): func (row[]string) (T, error)

  3. provide a function which defines what to do with your parsed data: func (data T) error

  4. optionally, define an error handling function.

  5. set up the code which creates your parser and runs it.

// The URL below gives information about the processed CSV file.
// https://www.epa.gov/smartgrowth/national-walkability-index-user-guide-and-methodology

const CSV_URL = "https://edg.epa.gov/EPADataCommons/public/OA/EPA_SmartLocationDatabase_V3_Jan_2021_Final.csv"

// A simple struct with part of the data from the CSV.
type Place struct {
	Name        string
	Population  int
	Walkability float64
	Area        float64
}

// ParsePlace is the CSV to struct parsing function.
func ParsePlace(row []string) (Place, error) {
	var err error
	var e error
	place := Place{}
	if len(row) < 117 {
		return place, fmt.Errorf("got %d columns, need 117 at least", len(row))
	}
	place.Name = row[10]
	if place.Population, e = strconv.Atoi(row[18]); e != nil {
		err = errors.Join(err, e)
	}
	if place.Walkability, e = strconv.ParseFloat(row[114], 64); e != nil {
		err = errors.Join(err, e)
	}
	if place.Area, e = strconv.ParseFloat(row[116], 64); e != nil {
		err = errors.Join(err, e)
	}
	return place, err
}

func main() {
	// create the parser, note the use of generics to insert our type.
	parser, err := bigcsv.New[Place](bigcsv.HTTPStream(CSV_URL))
	if err != nil {
		panic(err)
	}
	ctx, cancel := context.WithCancel(context.Background()) // we're going to cancel early.
	processedRows := &atomic.Int32{} // atomic Int due to parallel workers.
	parser.Parse = ParsePlace // Set the parsing function.
	parser.OnData = func(p Place) error { // data handler (demo: just log it)
		log.Printf(
			"Place '%s', pop. %d, area %.1f, walkability %.1f\n",
			p.Name, p.Population, p.Area, p.Walkability,
		)
		if processedRows.Add(1) > 100 { // cancel early, this stops the HTTP stream, too.
			cancel()
		}
		return nil
	}
	// Handle parsing / data errors.
	parser.OnError = func(err error) { // add
		log.Println(err)
	}
	// Ignore CSV headers (first line).
	headers, err := parser.Reader.Read()
	if err != nil {
		panic(err)
	}
	if len(headers) < 117 {
		panic("CSV did not have enough headers")
	}
	// Run the parser with 5 parallel workers. Note: this is for demonstration,
	// it's unlikely that workers will speed things up for HTTP streams.
	if err = parser.Run(ctx, 5); err != nil {
		panic(err)
	}
}

TODO

Before this gets to v1, I’d like to change the API to be more similar to csv.Reader, possibly making FileStream and HTTPStream functions which are just wrapped io.Reader instances themselves.

Ideally, rather than than setting functions on the Parser and calling Run, there would be a method Read() (T, error) and a more BYOP (bring your own parallelism) attitude?