-
Notifications
You must be signed in to change notification settings - Fork 6
/
pileup.go
458 lines (404 loc) · 14.3 KB
/
pileup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package sam
import (
"fmt"
"log"
"github.com/vertgenlab/gonomics/chromInfo"
"github.com/vertgenlab/gonomics/cigar"
"github.com/vertgenlab/gonomics/dna"
)
// Pile stores the number of each base observed across multiple reads.
// The number of a particular base can be retrieved by using the dna.Base
// as the index for Pile.Count. E.g. the number of reads with A is Count[dna.A].
//
// Linked list mechanics:
// In the pileup function the Pile struct is part of a circular doubly-linked
// list in which Pile.next/prev is the next/prev contiguous position. Discontiguous
// positions are not allowed. When adding base information to the pile struct, the
// pointer to the start of the read position should be kept, and a second pointer
// will increment along the list as bases are added. The pointer to the beginning
// of the list should only be incremented when a position is passed and set to zero.
type Pile struct {
RefIdx int
Pos uint32 // 1-base (like Sam)
CountF [13]int // Count[dna.Base] == Number of observed dna.Base, forward reads
CountR [13]int // Count[dna.Base] == Number of observed dna.Base
InsCountF map[string]int // key is insertion sequence as string, value is number of observations, forward reads. Real base appears to be before insert sequence.
InsCountR map[string]int // key is insertion sequence as string, value is number of observations. Note that key is forward strand sequence.
// Note that DelCountF/R DO NOT contribute to the total depth of a particular base.
// They are only included to preserve multi-base deletion structure for downstream use.
// Further note that DelCount is only recorded for the 5'-most base in the deletion.
DelCountF map[int]int // key is the number of contiguous bases that are deleted, value is number of observations, forward reads
DelCountR map[int]int // key is the number of contiguous bases that are deleted, value is number of observations, reverse reads
touched bool // true if Count or InsCount has been modified
// touched is used as a quick check to see whether a pile struct
// has been used. This value is used to avoid unnecessary allocations
// in resetPile, and is used in checkSend to avoid sending zeroed structs
// that happen to pass the user-defined filters.
next *Pile
prev *Pile
}
// GoSyncPileups inputs a slice of channels receiving Pile structs and syncs
// their outputs to a new output channel that organizes Piles into a slice.
// The returned slice maintains the order of the input slice. Any of the input
// samples that have no data for the returned position will have the RefIdx
// field set to -1.
func GoSyncPileups(samples ...<-chan Pile) <-chan []Pile {
synced := make(chan []Pile, 1000)
go syncPileups(samples, synced)
return synced
}
// syncPileups performs syncs positions from an input slice of pile channels
// to an output channel of []Pile.
func syncPileups(samples []<-chan Pile, output chan<- []Pile) {
var allClosed, isOpen bool
buf := make([]Pile, len(samples))
// fill buffer with 1 element from each channel
for i := range samples {
buf[i], isOpen = <-samples[i]
if !isOpen {
buf[i].RefIdx = -1
samples[i] = nil
}
}
// loop until all channels are closed
var minRefIdx int
var minPos uint32
for !allClosed {
data := make([]Pile, len(samples))
minRefIdx, minPos = getMinPileCoords(buf)
for i := range buf { // check for matching records in the buffer
if buf[i].RefIdx != minRefIdx || buf[i].Pos != minPos || samples[i] == nil {
data[i].RefIdx = -1 // mark as no data in output
continue
}
data[i] = buf[i] // save matching Pile to output data
buf[i], isOpen = <-samples[i] // replace Pile in buf
if !isOpen { // set channel to nil once closed
buf[i].RefIdx = -1
samples[i] = nil
if allChannelsClosed(samples) { // each time a channel is closed, check if they are all closed
allClosed = true
}
}
}
output <- data
}
close(output)
}
// getMinPileCoords returns the RefIdx and Pos of the Pile with the lowest coordinates.
func getMinPileCoords(p []Pile) (int, uint32) {
var minIdx int
for i := 1; i < len(p); i++ {
if p[minIdx].RefIdx == -1 { // handles case where first index has no data
minIdx = i
continue
}
if (p[i].RefIdx >= 0 && p[i].RefIdx < p[minIdx].RefIdx) || (p[i].RefIdx == p[minIdx].RefIdx && p[i].Pos < p[minIdx].Pos) {
minIdx = i
}
}
//fmt.Println(minIdx, p)
return p[minIdx].RefIdx, p[minIdx].Pos
}
// allChanelsClosed returns true if all input channels are set to nil (closed).
func allChannelsClosed(c []<-chan Pile) bool {
for i := range c {
if c[i] != nil {
return false
}
}
return true
}
// GoPileup inputs a channel of coordinate sorted Sam structs and generates
// a Pile for each base. The Pile is sent through the return channel when
// the Pile position is no longer being updated.
//
// The input readFilter functions must all be true for a read to be included.
// The input pileFilter functions can be likewise be used to filter the output.
// All reads/piles are included if filters == nil.
func GoPileup(reads <-chan Sam, header Header, includeNoData bool, readFilters []func(s Sam) bool, pileFilters []func(p Pile) bool) <-chan Pile {
if header.Metadata.SortOrder[0] != Coordinate {
log.Fatal("ERROR: (GoPileup) input sam/bam must be coordinate sorted")
}
pileChan := make(chan Pile, 1000)
go pileupLinked(pileChan, reads, header, includeNoData, readFilters, pileFilters)
return pileChan
}
// passesReadFilters returns true if input Sam is true for all functions in filters.
func passesReadFilters(s Sam, filters []func(s Sam) bool) bool {
for _, f := range filters {
if !f(s) {
return false
}
}
return true
}
// passesPileFilters returns true if input Sam is true for all functions in filters.
func passesPileFilters(p Pile, filters []func(p Pile) bool) bool {
for _, f := range filters {
if !f(p) {
return false
}
}
return true
}
func pileupLinked(send chan<- Pile, reads <-chan Sam, header Header, includeNoData bool, readFilters []func(s Sam) bool, pileFilters []func(p Pile) bool) {
var lastSentRefIdx int
var lastSentPos uint32
start := newLinkedPileBuffer(300) // initialize to 2x std read size
refmap := chromInfo.SliceToMap(header.Chroms)
var read Sam
for read = range reads {
if read.Cigar == nil || read.Cigar[0].Op == '*' {
continue // skip unmapped reads
}
if !passesReadFilters(read, readFilters) {
continue
}
sclipTerminalIns(&read)
start, lastSentRefIdx, lastSentPos = sendPassedLinked(start, read, includeNoData, refmap, send, pileFilters, lastSentRefIdx, lastSentPos)
updateLinkedPile(start, read, refmap)
}
sendRemaining(start, send, includeNoData, pileFilters, lastSentRefIdx, lastSentPos, refmap[read.RName])
close(send)
}
// newLinkedPilebuffer creates size Pile structs as a doubly-linked list and returns the start.
func newLinkedPileBuffer(size int) (start *Pile) {
buf := make([]Pile, size)
for i := range buf {
buf[i].RefIdx = -1
switch i {
case 0: // first in slice
buf[i].next = &buf[i+1]
buf[i].prev = &buf[len(buf)-1]
case len(buf) - 1: // last in slice
buf[i].next = &buf[0]
buf[i].prev = &buf[i-1]
default:
buf[i].next = &buf[i+1]
buf[i].prev = &buf[i-1]
}
}
return &buf[0]
}
// expandLinkedPileBuffer adds toAdd new Piles to the list directly after start.
func expandLinkedPileBuffer(start *Pile, toAdd int) {
end := start.next
newStart := newLinkedPileBuffer(toAdd)
newEnd := newStart.prev
start.next = newStart
newStart.prev = start
end.prev = newEnd
newEnd.next = end
}
// updateLinkedPile updates the linked list with the data in s.
func updateLinkedPile(start *Pile, s Sam, refmap map[string]chromInfo.ChromInfo) {
var seqPos int
var readIsForward bool = !IsPaired(s) || IsForwardRead(s) // record unpaired as forward
refPos := s.Pos
refidx := refmap[s.RName].Order
for i := range s.Cigar {
switch s.Cigar[i].Op {
case 'M', '=', 'X': // Match
addMatchLinked(start, refidx, refPos, s.Seq[seqPos:seqPos+s.Cigar[i].RunLength], readIsForward)
refPos += uint32(s.Cigar[i].RunLength)
seqPos += s.Cigar[i].RunLength
case 'D': // Deletion
addDeletionLinked(start, refidx, refPos, s.Cigar[i].RunLength, readIsForward)
refPos += uint32(s.Cigar[i].RunLength)
case 'I': // Insertion
addInsertionLinked(start, refidx, refPos-1, s.Seq[seqPos:seqPos+s.Cigar[i].RunLength], readIsForward)
seqPos += s.Cigar[i].RunLength
default:
if cigar.ConsumesReference(s.Cigar[i].Op) {
refPos += uint32(s.Cigar[i].RunLength)
}
if cigar.ConsumesQuery(s.Cigar[i].Op) {
seqPos += s.Cigar[i].RunLength
}
}
}
}
func addMatchLinked(start *Pile, refidx int, startPos uint32, seq []dna.Base, readIsForward bool) *Pile {
for i := range seq {
start = getPile(start, refidx, startPos+uint32(i))
if readIsForward {
start.CountF[seq[i]]++
} else {
start.CountR[seq[i]]++
}
start.touched = true
}
return start
}
func addDeletionLinked(start *Pile, refidx int, startPos uint32, length int, readIsForward bool) *Pile {
start = getPile(start, refidx, startPos)
if start.DelCountF == nil { // only make the map when we need it
start.DelCountF = make(map[int]int)
}
if start.DelCountR == nil { // only make the map when we need it
start.DelCountR = make(map[int]int)
}
if readIsForward {
start.DelCountF[length]++
} else {
start.DelCountR[length]++
}
var i uint32
for i = 0; i < uint32(length); i++ {
start = getPile(start, refidx, startPos+i)
if readIsForward {
start.CountF[dna.Gap]++
} else {
start.CountR[dna.Gap]++
}
start.touched = true
}
return start
}
func addInsertionLinked(start *Pile, refidx int, startPos uint32, seq []dna.Base, readIsForward bool) *Pile {
start = getPile(start, refidx, startPos)
if start.InsCountF == nil { // only make the map when we need it
start.InsCountF = make(map[string]int)
}
if start.InsCountR == nil { // only make the map when we need it
start.InsCountR = make(map[string]int)
}
if readIsForward {
start.InsCountF[dna.BasesToString(seq)]++
} else {
start.InsCountR[dna.BasesToString(seq)]++
}
start.touched = true
return start
}
func getPile(start *Pile, refidx int, pos uint32) *Pile {
if pos < start.Pos {
log.Panicf("sent a record for writing before all the data was present. something went horribly wrong")
}
for start.RefIdx != refidx || start.Pos != pos {
switch {
case start.prev.RefIdx == -1 && start.RefIdx == -1: // no data in buffer, start at pos
start.RefIdx = refidx
start.Pos = pos
return start
case start.RefIdx == -1: // previous has data, current is empty. update from previous
start.RefIdx = start.prev.RefIdx
start.Pos = start.prev.Pos + 1
case start.Pos < start.prev.Pos: // looped to start of buffer. need to expand
start = start.prev // back up to end of buffer
// TODO: POTENTIAL MEMORY LEAK. HARD CAP???
expandLinkedPileBuffer(start, 300)
// move into start of newly added buffer
start = start.next
default:
start = start.next
}
}
return start
}
func sendRemaining(start *Pile, send chan<- Pile, includeNoData bool, pileFilters []func(p Pile) bool, lastRefIdx int, lastPos uint32, ref chromInfo.ChromInfo) {
for start.RefIdx != -1 {
if (start.touched || includeNoData) && passesPileFilters(*start, pileFilters) {
send <- *start
}
start.RefIdx = -1
start = start.next
}
// if we have not returned by this point, there are positions with no data
// between the last position send, and the beginning of buf
if !includeNoData {
return
}
// send missing chromosome data
dummyPile := Pile{}
for i := lastPos + 1; i <= uint32(ref.Size); i++ {
dummyPile.RefIdx = lastRefIdx
dummyPile.Pos = i
send <- dummyPile
}
}
func sendPassedLinked(start *Pile, s Sam, includeNoData bool, refmap map[string]chromInfo.ChromInfo, send chan<- Pile, pileFilters []func(p Pile) bool, lastRefIdx int, lastPos uint32) (newStart *Pile, lastSentRefIdx int, lastSentPos uint32) {
for start.RefIdx != refmap[s.RName].Order || start.Pos < s.Pos-1 { // the -1 on s.Pos is to handle cases where a read begins with an insertion
if start.RefIdx == -1 {
break
}
if (start.touched || includeNoData) && passesPileFilters(*start, pileFilters) {
lastRefIdx = start.RefIdx
lastPos = start.Pos
send <- *start
}
resetPile(start)
start = start.next
}
// if we have not returned by this point, there are positions with no data
// between the last position send, and the beginning of buf
if !includeNoData {
return start, lastRefIdx, lastPos
}
// send missing chromosome data
dummyPile := Pile{}
for lastRefIdx < refmap[s.RName].Order {
for i := lastPos + 1; int(i) < refmap[s.RName].Size; i++ {
dummyPile.RefIdx = lastRefIdx
dummyPile.Pos = i
send <- dummyPile
}
lastPos = 0
lastRefIdx++
}
for lastRefIdx == refmap[s.RName].Order {
for i := lastPos + 1; i < s.Pos; i++ {
dummyPile.RefIdx = lastRefIdx
dummyPile.Pos = i
send <- dummyPile
}
lastRefIdx++
}
return start, lastRefIdx, lastPos
}
// resetPile sets a Pile to the default state.
func resetPile(p *Pile) {
p.RefIdx = -1
p.Pos = 0
if !p.touched {
return
}
for i := range p.CountF {
p.CountF[i] = 0
p.CountR[i] = 0
}
p.InsCountF = nil // only make maps when we need them
p.InsCountR = nil
p.DelCountF = nil
p.DelCountR = nil
p.touched = false
}
// sclipTerminalIns will convert an insertion on the left or right end of the read to a soft clip.
func sclipTerminalIns(s *Sam) {
if len(s.Cigar) == 0 || s.Cigar[0].Op == '*' {
return
}
if s.Cigar[0].Op == 'I' {
s.Cigar[0].Op = 'S'
}
if s.Cigar[len(s.Cigar)-1].Op == 'I' {
s.Cigar[len(s.Cigar)-1].Op = 'S'
}
// catch case where beginning/end of read is already soft clipped
if len(s.Cigar) >= 2 && s.Cigar[0].Op == 'S' && s.Cigar[1].Op == 'I' {
s.Cigar[1].Op = 'S'
s.Cigar[1].RunLength += s.Cigar[0].RunLength
s.Cigar = s.Cigar[1:]
}
if len(s.Cigar) >= 2 && s.Cigar[len(s.Cigar)-1].Op == 'S' && s.Cigar[len(s.Cigar)-2].Op == 'I' {
s.Cigar[len(s.Cigar)-2].Op = 'S'
s.Cigar[len(s.Cigar)-2].RunLength += s.Cigar[len(s.Cigar)-1].RunLength
s.Cigar = s.Cigar[:len(s.Cigar)-1]
}
}
// String for debug.
func (p *Pile) String() string {
return fmt.Sprintf("RefIdx: %d\tPos: %d\tCountF: %v\tCountR: %v\tInsCountF: %v\tInsCountR: %v\tDelCountF: %v\tDelCountR: %v\tNext: %p\tPrev: %p",
p.RefIdx, p.Pos, p.CountF, p.CountR, p.InsCountF, p.InsCountR, p.DelCountF, p.DelCountR, p.next, p.prev)
}