# Calculate GC-content of some DNA with a Scatter/Gather workflow

***N.B: See slide 10 and 11 in [these slides](http://www.slideshare.net/SamuelLampa/scipipe-a-lightweight-workflow-library-inspired-by-flowbased-programming), for a graphical description of this workflow!***

This workflow or pipeline calculates [GC-content](https://en.wikipedia.org/wiki/GC-content) (percentage of G and C, versus A and T DNA letters), in a piece of Human DNA (from the Y chromosome to be precise).

It does this in a parallel manner, by first splitting the input DNA file into splits with 100'000 lines per file (will be around 10 splits), and calculates the GC content on each of these files in parallel, before merging the results of each of the calculations into an average value for the whole chromosome Y sequence.

The raw DNA sequence is downloaded as part of the workflow, so you don't need to worry about that.

## Requirements

- This notebook is supposed to be run with the [GopherNotes](https://github.com/gophergala2016/gophernotes) Jupyter kernel for Go, developed by [Daniel Whitenack](http://www.datadan.io/).

In [3]:
:import "fmt"

In [11]:
:import "github.com/scipipe/scipipe"

In [12]:
:import "github.com/scipipe/scipipe/components"

In [13]:
// === INITIALIZE TASKS =======================================================================

// Download a zipped Chromosome Y fasta file
fastaURL := "ftp://ftp.ensembl.org/pub/release-84/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.chromosome.Y.fa.gz"
wget := scipipe.NewProc("wget", "wget "+fastaURL+" -O {o:chry_zipped}")
wget.SetPathStatic("chry_zipped", "chry.fa.gz")

// Ungzip the fasta file
unzip := scipipe.NewProc("ungzip", "gunzip -c {i:gzipped} > {o:ungzipped}")
unzip.SetPathReplace("gzipped", "ungzipped", ".gz", "")

// Split the fasta file in to parts with 100000 lines in each
linesPerSplit := 100000
scipipelit := components.NewFileSplitter(linesPerSplit)

// Create a 2-way multiplexer that can be used to provide the same
// file target to two downstream processes
dupl := components.NewFanOut()

// Count GC & AT characters in the fasta file
charCountCommand := "cat {i:infile} | fold -w 1 | grep '[%s]' | wc -l | awk '{ print $1 }' > {o:%s}"
gccnt := scipipe.NewProc("gccount", fmt.Sprintf(charCountCommand, "GC", "gccount"))
gccnt.SetPathExtend("infile", "gccount", ".gccnt")
atcnt := scipipe.NewProc("atcount", fmt.Sprintf(charCountCommand, "AT", "atcount"))
atcnt.SetPathExtend("infile", "atcount", ".atcnt")

// Concatenate GC & AT counts
gccat := components.NewConcatenator("gccounts.txt")
atcat := components.NewConcatenator("atcounts.txt")

// Sum up the GC & AT counts on the concatenated file
sumCommand := "awk '{ SUM += $1 } END { print SUM }' {i:in} > {o:sum}"
gcsum := scipipe.NewProc("gcsum", sumCommand)
gcsum.SetPathExtend("in", "sum", ".sum")
atsum := scipipe.NewProc("atsum", sumCommand)
atsum.SetPathExtend("in", "sum", ".sum")

// Finally, calculate the ratio between GC chars, vs. GC+AT chars
gcrat := scipipe.NewProc("gcratio", "gc=$(cat {i:gcsum}); at=$(cat {i:atsum}); calc \"$gc/($gc+$at)\" > {o:gcratio}")
gcrat.SetPathStatic("gcratio", "gcratio.txt")

// A sink, to drive the network
asink := scipipe.NewSink()

// === CONNECT DEPENDENCIES ===================================================================

unzip.In["gzipped"].Connect(wget.Out["chry_zipped"])
scipipelit.InFile.Connect(unzip.Out["ungzipped"])
dupl.InFile.Connect(scipipelit.OutSplitFile)
gccnt.In["infile"].Connect(dupl.GetOutPort("gccnt"))
atcnt.In["infile"].Connect(dupl.GetOutPort("atcnt"))
gccat.In.Connect(gccnt.Out["gccount"])
atcat.In.Connect(atcnt.Out["atcount"])
gcsum.In["in"].Connect(gccat.Out)
atsum.In["in"].Connect(atcat.Out)
gcrat.In["gcsum"].Connect(gcsum.Out["sum"])
gcrat.In["atsum"].Connect(atsum.Out["sum"])

asink.Connect(gcrat.Out["gcratio"])

// === RUN PIPELINE ===========================================================================

piperunner := scipipe.NewPipelineRunner()
piperunner.AddProcesses(wget, unzip, scipipelit, dupl, gccnt, atcnt, gccat, atcat, gcsum, atsum, gcrat, asink)
piperunner.Run()

AUDIT   2016/06/23 01:59:27 FileSplitter      Now processing input file  chry.fa ...
AUDIT   2016/06/23 01:59:27 FileSplitter      Created split file chry.fa.split_1
AUDIT   2016/06/23 01:59:27 Task:atcount      Executing command: cat chry.fa.split_1 | fold -w 1 | grep '[AT]' | wc -l | awk '{ print $1 }' > chry.fa.split_1.atcnt.tmp
AUDIT   2016/06/23 01:59:27 Task:gccount      Executing command: cat chry.fa.split_1 | fold -w 1 | grep '[GC]' | wc -l | awk '{ print $1 }' > chry.fa.split_1.gccnt.tmp
AUDIT   2016/06/23 01:59:27 FileSplitter      Created split file chry.fa.split_2
AUDIT   2016/06/23 01:59:27 Task:atcount      Executing command: cat chry.fa.split_2 | fold -w 1 | grep '[AT]' | wc -l | awk '{ print $1 }' > chry.fa.split_2.atcnt.tmp
AUDIT   2016/06/23 01:59:27 Task:gccount      Executing command: cat chry.fa.split_2 | fold -w 1 | grep '[GC]' | wc -l | awk '{ print $1 }' > chry.fa.split_2.gccnt.tmp
AUDIT   2016/06/23 01:59:28 FileSplitter      Created split file chry.fa.split_3
