Import data from CSV files to Cassandra using Akka Streams
Scala
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
project CSV importer skeleton Jun 7, 2016
slides Slides Jun 7, 2016
src ReadingRepository.save returns Future[Unit] Jun 16, 2017
.gitignore
README.md updated create keyspace with int parameter Jan 30, 2017
build.sbt Added Bintray resolver for Phantom Apr 18, 2017

README.md

Akka Streams in Practice

This is a sample Akka Streams project which uses the library to import data from a number of Gzipped CSV files into a Cassandra table.

The CSV files contain some kind of readings, i.e. (id, value) pairs, where every id has two associated values and the records for a given id appear in subsequent lines in the file. Any of the values may ocassionally be an invalid number. Example:

93500;0.5287942176336127
93500;0.3404326895942348
961989;invalid_value
961989;0.27452559752437566
136308;0.07525660747531115
136308;0.6509485024097678

The importer streams the Gzipped files and extracts them on the fly, then converts every line to a domain object representing either a valid or an invalid reading. The next step is to compute an average value for the readings under a given id when any of the readings is valid. When both readings for a given id are invalid, the average is assumed to be -1. Finally, the computed average values are written to Cassandra.

Prerequisites

CSV files

You can generate the CSV data yourself using the provided RandomDataGenerator. There are a few configurable properties of the generator in application.conf:

generator {
  number-of-files = 100
  number-of-pairs = 1000
  invalid-line-probability = 0.005
}

They are pretty self-explanatory: number-of-files is the number of files to be generated, number-of-pairs is the number of (id, value) pairs in each file (since two values are generated for each id), invalid-line-probability is the probability of the generator inserting a line with a value that is not a valid number.

Note that the importer expectt the files to be compressed with Gzip. You can easily compress the generated files with the following command run in the ./data directory:

find . -type f -exec gzip "{}" \;

Now you're ready to generate the CSV files:

sbt "runMain org.kunicki.akka_streams.RandomDataGenerator"

Cassandra

The probably easiest way to have Cassandra up and running is to use a Docker image - then all you need to do is run the following command:

docker run -d --name cassandra cassandra

and in a while you should have Cassandra ready at port 9042. When the container has started, it's time to create a keyspace and a table for our data.

Depending on the setup that you have you have you might want to bind the container directly to port 9042:

docker run --name cassandra -p 127.0.0.1:9042:9042 -d cassandra

First you need to run the CQL shell:

docker exec -it cassandra cqlsh

Then, in cqlsh you create an akka_streams keyspace:

CREATE KEYSPACE akka_streams WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

Finally, let's create the readings table:

CREATE TABLE akka_streams.readings (id int PRIMARY KEY, value float);  

Running

Before running the import you may wish to tweak some configuration settings in application.conf:

importer {
  import-directory = "./data"
  lines-to-skip = 0
  concurrent-files = 10
  concurrent-writes = 5
  non-io-parallelism = 42
}

The import-directory is the directory with the CSV files, lines-to-skip allows you to optionally skip a number of lines from the top of each file (e.g. CSV headers if you had any), concurrent-files tells the importer how many files to read in parallel, concurrent-writes determines the number of parallel inserts to Cassandra, non-io-parallelism defines the number of threads for in-memory calculations.

Having the configuration tweaked, the test data generated and a Cassandra instance running, you can now run the actual import:

sbt "runMain org.kunicki.akka_streams.Importer"