Skip to content

Commit

Permalink
{AH} pep8
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasHeger committed Nov 23, 2017
1 parent eb51ad3 commit cecff8d
Show file tree
Hide file tree
Showing 22 changed files with 484 additions and 551 deletions.
259 changes: 135 additions & 124 deletions pysam/Pileup.py
Expand Up @@ -2,62 +2,67 @@
import collections
import pysam

PileupSubstitution = collections.namedtuple( "PileupSubstitution",
" ".join( (\
"chromosome",
"pos",
"reference_base",
"genotype",
"consensus_quality",
"snp_quality",
"mapping_quality",
"coverage",
"read_bases",
"base_qualities" ) ) )

PileupIndel = collections.namedtuple( "PileupIndel",
" ".join( (\
"chromosome",
"pos",
"reference_base",
"genotype",
"consensus_quality",
"snp_quality",
"mapping_quality",
"coverage",
"first_allele",
"second_allele",
"reads_first",
"reads_second",
"reads_diff" ) ) )

def iterate( infile ):
PileupSubstitution = collections.namedtuple("PileupSubstitution",
" ".join((
"chromosome",
"pos",
"reference_base",
"genotype",
"consensus_quality",
"snp_quality",
"mapping_quality",
"coverage",
"read_bases",
"base_qualities")))

PileupIndel = collections.namedtuple("PileupIndel",
" ".join((
"chromosome",
"pos",
"reference_base",
"genotype",
"consensus_quality",
"snp_quality",
"mapping_quality",
"coverage",
"first_allele",
"second_allele",
"reads_first",
"reads_second",
"reads_diff")))


def iterate(infile):
'''iterate over ``samtools pileup -c`` formatted file.
*infile* can be any iterator over a lines.
The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution`
or :class:`pysam.Pileup.PileupIndel`.
.. note::
.. note::
The parser converts to 0-based coordinates
'''

conv_subst = (str,lambda x: int(x)-1,str,str,int,int,int,int,str,str)
conv_indel = (str,lambda x: int(x)-1,str,str,int,int,int,int,str,str,int,int,int)

conv_subst = (str, lambda x: int(x) - 1, str,
str, int, int, int, int, str, str)
conv_indel = (str, lambda x: int(x) - 1, str, str, int,
int, int, int, str, str, int, int, int)

for line in infile:
d = line[:-1].split()
if d[2] == "*":
try:
yield PileupIndel( *[x(y) for x,y in zip(conv_indel,d) ] )
yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)])
except TypeError:
raise pysam.SamtoolsError( "parsing error in line: `%s`" % line)
raise pysam.SamtoolsError("parsing error in line: `%s`" % line)
else:
try:
yield PileupSubstitution( *[x(y) for x,y in zip(conv_subst,d) ] )
yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)])
except TypeError:
raise pysam.SamtoolsError( "parsing error in line: `%s`" % line)
raise pysam.SamtoolsError("parsing error in line: `%s`" % line)


ENCODE_GENOTYPE = {
'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
Expand All @@ -68,7 +73,7 @@ def iterate( infile ):
'GT': 'k', 'TG': 'K',
'CG': 's', 'GC': 'S',
'AT': 'w', 'TA': 'W',
}
}

DECODE_GENOTYPE = {
'A': 'AA',
Expand All @@ -81,59 +86,67 @@ def iterate( infile ):
'k': 'GT', 'K': 'GT',
's': 'CG', 'S': 'CG',
'w': 'AT', 'W': 'AT',
}
}

# ------------------------------------------------------------


##------------------------------------------------------------
def encodeGenotype( code ):
def encodeGenotype(code):
'''encode genotypes like GG, GA into a one-letter code.
The returned code is lower case if code[0] < code[1], otherwise
it is uppercase.
'''
return ENCODE_GENOTYPE[ code.upper() ]
return ENCODE_GENOTYPE[code.upper()]

def decodeGenotype( code ):

def decodeGenotype(code):
'''decode single letter genotypes like m, M into two letters.
This is the reverse operation to :meth:`encodeGenotype`.
'''
return DECODE_GENOTYPE[ code ]
return DECODE_GENOTYPE[code]


def translateIndelGenotypeFromVCF( vcf_genotypes, ref ):
def translateIndelGenotypeFromVCF(vcf_genotypes, ref):
'''translate indel from vcf to pileup format.'''

# indels
def getPrefix( s1, s2 ):
def getPrefix(s1, s2):
'''get common prefix of strings s1 and s2.'''
n = min( len( s1), len( s2 ) )
for x in range( n ):
if s1[x] != s2[x]: return s1[:x]
n = min(len(s1), len(s2))
for x in range(n):
if s1[x] != s2[x]:
return s1[:x]
return s1[:n]

def getSuffix( s1, s2 ):
def getSuffix(s1, s2):
'''get common sufix of strings s1 and s2.'''
n = min( len( s1), len( s2 ) )
if s1[-1] != s2[-1]: return ""
for x in range( -2, -n - 1, -1 ):
if s1[x] != s2[x]: return s1[x+1:]
n = min(len(s1), len(s2))
if s1[-1] != s2[-1]:
return ""
for x in range(-2, -n - 1, -1):
if s1[x] != s2[x]:
return s1[x + 1:]
return s1[-n:]

def getGenotype( variant, ref ):
def getGenotype(variant, ref):

if variant == ref:
return "*", 0

if variant == ref: return "*", 0

if len(ref) > len(variant):
# is a deletion
if ref.startswith(variant):
return "-%s" % ref[len(variant):], len(variant) - 1
elif ref.endswith( variant ):
elif ref.endswith(variant):
return "-%s" % ref[:-len(variant)], -1
else:
prefix = getPrefix( ref, variant )
suffix = getSuffix( ref, variant )
shared = len(prefix) + len(suffix) - len(variant)
prefix = getPrefix(ref, variant)
suffix = getSuffix(ref, variant)
shared = len(prefix) + len(suffix) - len(variant)
# print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref)
if shared < 0:
raise ValueError()
return "-%s" % ref[len(prefix):-(len(suffix)-shared)], len(prefix) - 1
return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1

elif len(ref) < len(variant):
# is an insertion
Expand All @@ -142,47 +155,49 @@ def getGenotype( variant, ref ):
elif variant.endswith(ref):
return "+%s" % variant[:len(ref)], 0
else:
prefix = getPrefix( ref, variant )
suffix = getSuffix( ref, variant )
shared = len(prefix) + len(suffix) - len(ref)
prefix = getPrefix(ref, variant)
suffix = getSuffix(ref, variant)
shared = len(prefix) + len(suffix) - len(ref)
if shared < 0:
raise ValueError()

return "+%s" % variant[len(prefix):-(len(suffix)-shared)], len(prefix)
return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix)
else:
assert 0, "snp?"

# in pileup, the position refers to the base
# after the coordinate, hence subtract 1
#pos -= 1
# pos -= 1

genotypes, offsets = [], []
is_error = True

for variant in vcf_genotypes:
try:
g, offset = getGenotype( variant, ref )
g, offset = getGenotype(variant, ref)
except ValueError:
break

genotypes.append( g )
if g != "*": offsets.append( offset )

else:
genotypes.append(g)
if g != "*":
offsets.append(offset)

else:
is_error = False

if is_error:
if is_error:
raise ValueError()

assert len(set(offsets )) == 1, "multiple offsets for indel"
assert len(set(offsets)) == 1, "multiple offsets for indel"
offset = offsets[0]

genotypes = "/".join( genotypes )
genotypes = "/".join(genotypes)
return genotypes, offset

def vcf2pileup( vcf, sample ):

def vcf2pileup(vcf, sample):
'''convert vcf record to pileup record.'''

chromosome = vcf.contig
pos = vcf.pos
reference = vcf.ref
Expand All @@ -193,79 +208,75 @@ def vcf2pileup( vcf, sample ):
# get genotype
genotypes = data["GT"]
if len(genotypes) > 1:
raise ValueError( "only single genotype per position, %s" % (str(vcf)))
raise ValueError("only single genotype per position, %s" % (str(vcf)))

genotypes = genotypes[0]

# not a variant
if genotypes[0] == ".": return None
if genotypes[0] == ".":
return None

genotypes = [ allelles[int(x)] for x in genotypes if x != "/" ]
genotypes = [allelles[int(x)] for x in genotypes if x != "/"]

# snp_quality is "genotype quality"
snp_quality = consensus_quality = data.get( "GQ", [0])[0]
mapping_quality = vcf.info.get( "MQ", [0])[0]
coverage = data.get( "DP", 0)
snp_quality = consensus_quality = data.get("GQ", [0])[0]
mapping_quality = vcf.info.get("MQ", [0])[0]
coverage = data.get("DP", 0)

if len(reference) > 1 or max([len(x) for x in vcf.alt] ) > 1:
if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1:
# indel
genotype, offset = translateIndelGenotypeFromVCF( genotypes, reference )

return PileupIndel( chromosome,
pos + offset,
"*",
genotype,
consensus_quality,
snp_quality,
mapping_quality,
coverage,
genotype,
"<" * len(genotype),
0,
0,
0 )

else:

genotype = encodeGenotype( "".join(genotypes) )
genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference)

return PileupIndel(chromosome,
pos + offset,
"*",
genotype,
consensus_quality,
snp_quality,
mapping_quality,
coverage,
genotype,
"<" * len(genotype),
0,
0,
0)


else:
genotype = encodeGenotype("".join(genotypes))
read_bases = ""
base_qualities = ""

return PileupSubstitution( chromosome, pos, reference,
genotype,
consensus_quality,
snp_quality,
mapping_quality,
coverage, read_bases, base_qualities )
return PileupSubstitution(chromosome, pos, reference,
genotype, consensus_quality,
snp_quality, mapping_quality,
coverage, read_bases,
base_qualities)


def iterate_from_vcf( infile, sample ):
def iterate_from_vcf(infile, sample):
'''iterate over a vcf-formatted file.
*infile* can be any iterator over a lines.
The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution`
or :class:`pysam.Pileup.PileupIndel`.
The function yields named tuples of the type
:class:`pysam.Pileup.PileupSubstitution` or
:class:`pysam.Pileup.PileupIndel`.
Positions without a snp will be skipped.
Positions without a snp will be skipped.
This method is wasteful and written to support same
legacy code that expects samtools pileup output.
This method is wasteful and written to support same legacy code
that expects samtools pileup output.
Better use the vcf parser directly.
'''


vcf = pysam.VCF()
vcf.connect( infile )
vcf.connect(infile)

if sample not in vcf.getsamples():
raise KeyErorr( "sample %s not vcf file" )
raise KeyError("sample %s not vcf file")

for row in vcf.fetch():
result = vcf2pileup( row, sample )
if result: yield result

result = vcf2pileup(row, sample)
if result:
yield result

0 comments on commit cecff8d

Please sign in to comment.