In [1]:
%load_ext autoreload
%autoreload 2
%pylab inline

import os.path as op
import findspark
import os
findspark.init()

import pyspark
sc = pyspark.SparkContext()

Populating the interactive namespace from numpy and matplotlib


In [2]:
import shortuuid
shortuuid.uuid()

'xgmYXZ6ybNvyBZhm4ohRqU'

## Utility functions

In [3]:
assembly = 'mm9'

In [10]:
data_dir = op.expanduser("~/data")
output_dir = op.join(data_dir, assembly)   # where all of the intermediate output will be stored
base_ucsc_dir = op.join(data_dir, 'ucsc-data/{}'.format(assembly))  # where all of the files downloaded from UCSC will be stored

import shutil

# create a directory to store intermediate output files
def get_outfile(table_name):
    outfile = op.join(output_dir, 'genbank-output/{}'.format(table_name))
    if op.exists(outfile):
        shutil.rmtree(outfile)
    return outfile

## Load the chromosome lengths

In [11]:
def get_chrom_lengths(base_dir):
    '''
    Get the cumulative start positions for the chromosomes in an assembly. The chromosomes
    will be sorted alphabetically by their names.
    
    :param base_dir: A directory containing meta data about a genome assembly
    :return: A dictionary of the from { 'chr2': 234323432 }, showing at which position
             chromosomes start.
    '''
    chromLengths = (sc.textFile(op.join(base_dir, 'chromInfo.txt.gz'))
                    .map(lambda x: x.split('\t'))
                    .map(lambda x: {'chrom': x[0], 'length': int(x[1]) })
                    .collect())
    
    cum_chrom_lengths = {}
    curr_cum_lengths = 0
    
    for x in sorted(chromLengths, key=lambda x: -x['length']):
        cum_chrom_lengths[x['chrom']] = curr_cum_lengths
        curr_cum_lengths += x['length']
        
    return cum_chrom_lengths

cum_chrom_lengths = get_chrom_lengths(base_ucsc_dir)

print cum_chrom_lengths['chr1'], cum_chrom_lengths['chr2'], cum_chrom_lengths['chr3']

0 197195432 545593815


## Loading the refgene data

In [12]:
from pyspark import SparkContext, SparkConf


def parse_exon_positions(exon_positions_str):
    return map(int, exon_positions_str.strip(",").split(','))

def load_refgene_data(base_dir):
    '''
    Load the UCSC refgene data for a particular assembly.
    
    :param base_dir: The directory which contains the refGene.txt.gz file.
    '''
    refGene = (sc.textFile(op.join(base_dir, 'refGene.txt.gz'))
               .map(lambda x: x.split('\t'))
               .map(lambda x: {'name': x[1],
                               'chrom': x[2],
                               'strand': x[3],
                               'txStart': x[4],
                               'txEnd': x[5],
                               'cdsStart': x[6],
                               'cdsEnd': x[7],
                               'exonCount': x[8],
                               'exonStarts': x[9].strip(','),
                               'exonEnds': x[10].strip(','),
                               'chromOffset': cum_chrom_lengths[x[2]],
                               'genomeTxStart': cum_chrom_lengths[x[2]] + int(x[4]),
                               'genomeTxEnd': cum_chrom_lengths[x[2]] + int(x[5]),
                               'geneName': x[12],
                               'geneLength': int(x[5]) - int(x[4]),
                               })
               .filter(lambda x: x['chrom'].find('_') == -1)
            )
    
    return refGene

refGene = load_refgene_data(base_ucsc_dir)
### add the genomic position
print refGene.take(1)

[{'exonEnds': u'3207049,3411982,3661579', 'geneName': u'Xkr4', 'chromOffset': 0, 'name': u'NM_001011874', 'txStart': u'3204562', 'exonCount': u'3', 'strand': u'-', 'cdsEnd': u'3661429', 'genomeTxStart': 3204562, 'geneLength': 457017, 'cdsStart': u'3206102', 'chrom': u'chr1', 'genomeTxEnd': 3661579, 'txEnd': u'3661579', 'exonStarts': u'3204562,3411782,3660632'}]


## Count the references

For this exercise, the result should be a tsv with the following columns:

```
<taxId> <geneId> <refSeqID> <citation count>
```

In [13]:
def load_gene_counts(genbank_dir):
    gene2pubmed = (sc.textFile(op.join(genbank_dir, "gene2pubmed"))
                     .filter(lambda x: x[0] != '#')
                     .map(lambda x: x.split('\t'))
                     .map(lambda p: {'taxid': int(p[0]), 'geneid': int(p[1]), 'pmid': int(p[2]), 'count': 1})
                     .map(lambda x: ((x['taxid'], x['geneid']), {'count': x['count']}))
                     )
    
    def reduce_count(r1, r2):
        '''
        A reduce function that simply counts the number of elements in the table.
        
        @param r1: A Row
        @param r2: A Row
        @return: A new Row, equal to the first Row with a summed count.
        '''
        #print >>sys.stderr, "r1:", r1
        r1['count'] += r2['count']
        return r1

    print gene2pubmed.take(1)
    reduced_gene2pubmed = gene2pubmed.reduceByKey(reduce_count)
    
    outfile = get_outfile('taxid-geneid-count')

    (reduced_gene2pubmed
        .map(lambda x: "{}\t{}\t{}".format(x[0][0], x[0][1], x[1]['count']))
        .saveAsTextFile(outfile)
        )
    
    return reduced_gene2pubmed


taxid_geneid_count = load_gene_counts(op.join(data_dir, 'genbank-data/'))
print taxid_geneid_count.take(1)

[((9, 1246500), {'count': 1})]
[((6279, 6095747), {'count': 1})]


## Load the gene2refseq annotations

In [14]:
def take_one(r1, r2):
    return r1

def load_refseq2gene(genbank_base_dir):
    '''
    Get the mapping from refseq IDs to gene IDs
    
    :param genbank_base_dir: The directory that contains all of the genbank files.
    :return: A set of tuples of the form (refseq_id, (taxid, geneid))
    '''
    gene2refseq = (sc.textFile(op.join(genbank_base_dir, 'gene2refseq'))
                   .filter(lambda x: x[0] != '#')
                   .map(lambda x: x.split('\t'))
                   .map(lambda p: {'taxid': int(p[0]), 'geneid': int(p[1]), 'refseqid': p[3] })
                   .map(lambda x: (x['refseqid'].split('.')[0], (x['taxid'], x['geneid'])))
                   )
    
    def reduce_by_refseq_id(r1, r2):
        # because we're just looking for a mapping from geneId to refseqId, we just need to throw
        # away single entries with identical refseq ids
        return r1
    
    print gene2refseq.take(10)
    refseq2gene = gene2refseq.reduceByKey(take_one)
    print refseq2gene.take(1)
    
    outfile = get_outfile('refseqid-taxid-geneid')

    (refseq2gene.map(lambda x: "{}\t{}\t{}".format(x[0], x[1][0], x[1][1]))
         .saveAsTextFile(outfile)
    )
    return refseq2gene

refseqid_taxid_geneid = load_refseq2gene(op.join(data_dir, 'genbank-data'))

[(u'-', (9, 1246500)), (u'-', (9, 1246501)), (u'-', (9, 1246502)), (u'-', (9, 1246503)), (u'-', (9, 1246504)), (u'-', (9, 1246505)), (u'-', (9, 1246509)), (u'-', (9, 1246510)), (u'-', (9, 3722426)), (u'-', (9, 8655732))]
[(u'XM_009054708', (225164, 20243866))]


## Join the reference counts and refseq to geneid translations

In [15]:
print refseqid_taxid_geneid.take(1)

def take_one(r1, r2):
    return r1

def take_max(r1, r2):
    if r1 > r2:
        return r1
    else:
        return r2

[(u'XM_009054708', (225164, 20243866))]


In [16]:
def join_counts_and_ids(refseqid_taxid_geneid, taxid_geneid_count):
    taxid_geneid_refseq = refseqid_taxid_geneid.map(lambda x: (x[1], x[0]))
    print "count1:", taxid_geneid_refseq.count()
    taxid_geneid_refseq = taxid_geneid_refseq.reduceByKey(take_one)
    print "count2:", taxid_geneid_refseq.count()
    
    
    '''    
    taxid_geneid_refseq = (sc.textFile(op.join(output_dir, 'genbank-output/refseqid-taxid-geneid'))
                   .map(lambda x: x.split())
                   .map(lambda x: ((int(x[1]), int(x[2])), x[0]))
                        )
    '''
    print taxid_geneid_refseq.take(1)
    
    '''
    (sc.textFile(op.join(output_dir, 'genbank-output/taxid-geneid-count'))
                          .map(lambda x: x.split())
                          .map(lambda x: ((int(x[0]), int(x[1])), int(x[2])))
                          )
    '''
    print "taxid_geneid_count", taxid_geneid_count.take(1)
    print "1. taxid_geneid_count.count():", taxid_geneid_count.count()
    taxid_geneid_count = taxid_geneid_count.reduceByKey(take_max)
    print "2. taxid_geneid_count.count():", taxid_geneid_count.count()
    
    taxid_geneid_count_refseq = taxid_geneid_count.join(taxid_geneid_refseq)
    print taxid_geneid_count_refseq.take(1)
    print taxid_geneid_count.take(1)
    
    outfile = get_outfile('taxid-geneid-refseqid-count')

    (taxid_geneid_count_refseq.map(lambda x: "{}\t{}\t{}\t{}".format(x[0][0],
                                                                  x[0][1],
                                                                  x[1][1],
                                                                  x[1][0]))
     .saveAsTextFile(outfile)
     )
    return taxid_geneid_count_refseq

taxid_geneid_count_refseq = join_counts_and_ids(refseqid_taxid_geneid, taxid_geneid_count)

count1: 9519075
count2: 7364241
[((4577, 103654904), u'XR_566429')]
taxid_geneid_count [((6279, 6095747), {'count': 1})]
1. taxid_geneid_count.count(): 8516870
2. taxid_geneid_count.count(): 8516870
[((6279, 6095747), ({'count': 1}, u'XM_001892259'))]
[((880591, 9871835), {'count': 1})]


## Join the refgene data with the count data

In [17]:
def join_refgene_and_counts(refGene, taxid_geneid_count_refseq):
    '''
    Combine the refGene information about the genes with the citation
    count information.
    '''
    refseqid_refgene = refGene.map(lambda x: (x['name'], x))
    
    print refseqid_refgene.take(1)
    
    refseqid_count = taxid_geneid_count_refseq.map(lambda x: (x[1][1], x[1][0]))
    
    print refseqid_count.take(1)
    
    refseqid_refgene_count = refseqid_refgene.join(refseqid_count)

    print refseqid_refgene_count.take(1)
    

    return refseqid_refgene_count

refseqid_refgene_count = join_refgene_and_counts(refGene, taxid_geneid_count_refseq)

outfile = get_outfile('refgene-count')
(refseqid_refgene_count.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}\t{uid}"
                            .format(count=x[1][1]['count'],uid=shortuuid.uuid(), **x[1][0]))
 .saveAsTextFile(outfile))

[(u'NM_001011874', {'exonEnds': u'3207049,3411982,3661579', 'geneName': u'Xkr4', 'chromOffset': 0, 'name': u'NM_001011874', 'txStart': u'3204562', 'exonCount': u'3', 'strand': u'-', 'cdsEnd': u'3661429', 'genomeTxStart': 3204562, 'geneLength': 457017, 'cdsStart': u'3206102', 'chrom': u'chr1', 'genomeTxEnd': 3661579, 'txEnd': u'3661579', 'exonStarts': u'3204562,3411782,3660632'})]
[(u'XM_001892259', {'count': 1})]
[(u'NM_001113367', ({'exonEnds': u'55357029,55361470,55362567,55380565,55397444,55403296,55405779,55412583,55417603,55419551', 'geneName': u'Boll', 'chromOffset': 0, 'name': u'NM_001113367', 'txStart': u'55356912', 'exonCount': u'10', 'strand': u'-', 'cdsEnd': u'55397433', 'genomeTxStart': 55356912, 'exonStarts': u'55356912,55361341,55362519,55380493,55397316,55403220,55405724,55412491,55417459,55419325', 'cdsStart': u'55356912', 'chrom': u'chr1', 'genomeTxEnd': 55419551, 'txEnd': u'55419551', 'geneLength': 62639}, {'count': 12}))]


In [18]:
print refseqid_refgene_count.take(1)

[(u'NM_001113367', ({'exonEnds': u'55357029,55361470,55362567,55380565,55397444,55403296,55405779,55412583,55417603,55419551', 'geneName': u'Boll', 'chromOffset': 0, 'name': u'NM_001113367', 'txStart': u'55356912', 'exonCount': u'10', 'strand': u'-', 'cdsEnd': u'55397433', 'genomeTxStart': 55356912, 'exonStarts': u'55356912,55361341,55362519,55380493,55397316,55403220,55405724,55412491,55417459,55419325', 'cdsStart': u'55356912', 'chrom': u'chr1', 'genomeTxEnd': 55419551, 'txEnd': u'55419551', 'geneLength': 62639}, {'count': 12}))]


In [19]:
refseqid_refgene_count_plus = refseqid_refgene_count.filter(lambda x: x[1][0]['strand'] == '+')

outfile = get_outfile('refgene-count-plus')
(refseqid_refgene_count_plus.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}\t{uid}"
                            .format(count=x[1][1]['count'],uid=shortuuid.uuid(), **x[1][0]))
 .saveAsTextFile(outfile))

In [20]:
refseqid_refgene_count_minus = refseqid_refgene_count.filter(lambda x: x[1][0]['strand'] == '-')

outfile = get_outfile('refgene-count-minus')
(refseqid_refgene_count_minus.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}\t{uid}"
                            .format(count=x[1][1]['count'],uid=shortuuid.uuid(), **x[1][0]))
 .saveAsTextFile(outfile))

## Run the entire pipeline

In [21]:
assembly = 'hg19'
output_dir = op.join(data_dir, assembly)    # where all of the intermediate output will be stored
base_ucsc_dir = op.join(data_dir, 'ucsc-data/{}'.format(assembly))  # where all of the files downloaded from UCSC will be stored

cum_chrom_lengths = get_chrom_lengths(base_ucsc_dir)
refGene = load_refgene_data(base_ucsc_dir)
taxid_geneid_count = load_gene_counts(op.join(data_dir, 'genbank-data/'))
refseqid_taxid_geneid = load_refseq2gene(op.join(data_dir, 'genbank-data'))
taxid_geneid_count_refseq = join_counts_and_ids(refseqid_taxid_geneid, taxid_geneid_count)
refseqid_refgene_count = join_refgene_and_counts(refGene, taxid_geneid_count_refseq)

outfile = get_outfile('refgene-count')
(refseqid_refgene_count.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}\t{uid}"
                            .format(count=x[1][1]['count'],uid=shortuuid.uuid(), **x[1][0]))
 .saveAsTextFile(outfile))

[((9, 1246500), {'count': 1})]
[(u'-', (9, 1246500)), (u'-', (9, 1246501)), (u'-', (9, 1246502)), (u'-', (9, 1246503)), (u'-', (9, 1246504)), (u'-', (9, 1246505)), (u'-', (9, 1246509)), (u'-', (9, 1246510)), (u'-', (9, 3722426)), (u'-', (9, 8655732))]
[(u'XM_009054708', (225164, 20243866))]
count1: 9519075
count2: 7364241
[((4577, 103654904), u'XR_566429')]
taxid_geneid_count [((956149, 14564489), {'count': 1})]
1. taxid_geneid_count.count(): 8516870
2. taxid_geneid_count.count(): 8516870
[((6279, 6095747), ({'count': 1}, u'XM_001892259'))]
[((903510, 11565178), {'count': 1})]
[(u'NM_033083', {'exonEnds': u'15469389,15471514,15473730,15476045,15478082,15484120', 'geneName': u'EAF1', 'chromOffset': 492449994, 'name': u'NM_033083', 'txStart': u'15469063', 'exonCount': u'6', 'strand': u'+', 'cdsEnd': u'15480662', 'genomeTxStart': 507919057, 'geneLength': 15057, 'cdsStart': u'15469286', 'chrom': u'chr3', 'genomeTxEnd': 507934114, 'txEnd': u'15484120', 'exonStarts': u'15469063,15471419,1547

In [22]:
refseqid_refgene_count_plus = refseqid_refgene_count.filter(lambda x: x[1][0]['strand'] == '+')

outfile = get_outfile('refgene-count-plus')
print outfile
print refseqid_refgene_count_plus.take(1)
(refseqid_refgene_count_plus.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}\t{uid}"
                            .format(count=x[1][1]['count'],uid=shortuuid.uuid(), **x[1][0]))
 .saveAsTextFile(outfile))

/Users/peter/data/hg19/genbank-output/refgene-count-plus
[(u'NM_001466', ({'exonEnds': u'42638630', 'geneName': u'FZD2', 'chromOffset': 2655442424, 'name': u'NM_001466', 'txStart': u'42634811', 'exonCount': u'1', 'strand': u'+', 'cdsEnd': u'42636754', 'genomeTxStart': 2698077235, 'exonStarts': u'42634811', 'cdsStart': u'42635056', 'chrom': u'chr17', 'genomeTxEnd': 2698081054, 'txEnd': u'42638630', 'geneLength': 3819}, {'count': 23}))]


In [23]:
refseqid_refgene_count_plus = refseqid_refgene_count.filter(lambda x: x[1][0]['strand'] == '-')

outfile = get_outfile('refgene-count-minus')
print outfile
print refseqid_refgene_count_plus.take(1)
(refseqid_refgene_count_plus.map(lambda x: "{name}\t{chrom}\t{strand}\t{txStart}\t{txEnd}\t{genomeTxStart}\t{genomeTxEnd}\t{cdsStart}\t{cdsEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{geneName}\t{count}\t{uid}"
                            .format(count=x[1][1]['count'],uid=shortuuid.uuid(), **x[1][0]))
 .saveAsTextFile(outfile))

/Users/peter/data/hg19/genbank-output/refgene-count-minus
[(u'NR_031592', ({'exonEnds': u'10514214', 'geneName': u'MIR1181', 'chromOffset': 2937113968, 'name': u'NR_031592', 'txStart': u'10514133', 'exonCount': u'1', 'strand': u'-', 'cdsEnd': u'10514214', 'genomeTxStart': 2947628101, 'exonStarts': u'10514133', 'cdsStart': u'10514214', 'chrom': u'chr19', 'genomeTxEnd': 2947628182, 'txEnd': u'10514214', 'geneLength': 81}, {'count': 3}))]
