<a href="https://colab.research.google.com/github/mov-q/kmer-spark/blob/main/sparktest0_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# K-mer counting con pySpark #

### Introduzione al problema ###
Negli ultimi anni gli avanzamenti nell'ambito della bioinformatica sono spesso stati identificabili e ascrivibili ai grandi avanzamenti delle tecnologie di sequenziamento: il sequenziamento è l'operazione attraverso la quale, a partire da un campione biologico, è possibile andare ad estrarre la sequenza di DNA (genoma) o RNA (trascrittoma). 

Con l'affermarsi delle tecnologie NGS (Next Generation Sequencing), l'approccio tecnologico che si è affermato è quello di sequenze che vengono lette in forma di short reads: centinaia di migliaia (nel caso di piccoli genomi batterici) o decine di milioni di stringhe di dimensione più o meno variabile. Una delle maggiori sfide dal punto di vista computazionale è quella di generare un "assembly" del genoma o del trascrittoma: andare ad unire queste stringhe per ricostruire la sequenza reale.

Tra le tecniche degli algoritmi di assembly, quella prevalente è basata sulla generazione, a partire dalle short reads, di k-mer: sottostringhe di k basi nucleotidiche calcolate da ogni read utilizzando una sliding window di dimensione k e a partire dal primo carattere. 

La pipeline completa di processamento prevede delle fasi di trimming degli adapter (sequenze utilizzate dai kit di preparamento campioni sperimentali), analisi di qualità delle short-reads (e ad esempio delezione delle sequenze troppo corte o di qualità troppo bassa), generazione dei k-mer dalle reads e conteggio delle occorrenze di ogni k-mer. 

Tutti questi passaggi sono il preambolo alla creazione di una struttura grafo (grafo di de Brujin) che viene poi utilizzata come base per l'assembly definitivo del genoma (o di frammenti di esso, come singoli contigs e scaffolds). 

### Il progetto ###
Questo progetto si propone di implementare attraverso l'uso di Spark, più precisamente dei wrapper python pySpark, un k-mer counter che a partire da un file in formato FASTQ vada ad effettuare estrazione delle sequenze, una pulizia piuttosto grezza (data solo ai fini di esempio di trattamento del formato), la generazione dei k-mer e il conto delle loro occorrenze. 
Al contempo, al termine di questa fase di conto occorrenze, avremo anche per ogni sequenza il calcolo dei prefissi e dei suffissi del k-mer generato, elementi fondamentali per poter costruire il grafo di De Brujin. 

In [1]:
import os
import subprocess
import string
import random

# settiamo il k-mer size
KSIZE=55
# definiamo se stiamo usando un dataset "brief" o se scaricheremo intere run da 
# NCBI SRA 
BRIEF=True
# indichiamo quante run vogliamo processare
# 0 significa che il file di input verrà processato interamente
BRIEF_MULTIPLIER=0
BRIEF_NOSEQ=4*BRIEF_MULTIPLIER
# id delle run da scaricare da NCBI SRA (in caso che BRIEF sia settato a False)
sraRunIDlist = ["SRR16693264", "SRR16693265"]
# binari dello sratoolkit utilizzati per estrarre i file fastq dal formato binario sra
PREFETCH_BASEPATH="/content"
NCBI_PREFETCH_BIN="/content/sratoolkit.2.11.2-ubuntu64/bin/prefetch-orig.2.11.2"
NCBI_FASTQDUMP_BIN="/content/sratoolkit.2.11.2-ubuntu64/bin/fastq-dump-orig.2.11.2"

# calcoliamo una stringa casuale da aggiungere come suffisso al percorso di output
def generateRandomOutputSuffix(suff_length):
  temp = random.sample(string.ascii_letters,suff_length)
  return "".join(temp)

OUTPUT_SUFFIX = generateRandomOutputSuffix(4)
OUTPUT_PATH = "/content/output_"+OUTPUT_SUFFIX
print("Output path: "+OUTPUT_PATH)

Output path: /content/output_jhFe


In [2]:
exMemory = '10g'
PYSPARK_SUBMIT_ARGS = ' --driver-memory ' + exMemory + ' pyspark-shell --driver-maxResultSize 10g'
os.environ["PYSPARK_SUBMIT_ARGS"] = PYSPARK_SUBMIT_ARGS

In [3]:
!apt-get install openjdk-8-jdk-headless -qq 

In [4]:
if (os.path.exists('spark-3.2.0-bin-hadoop3.2.tgz') == False):
  !wget  https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
  !tar xf spark-3.2.0-bin-hadoop3.2.tgz

In [5]:
if (os.path.exists('sratoolkit.2.11.2-ubuntu64.tar.gz') == False):
  !wget  https://ftp-trace.ncbi.nlm.nih.gov/sra/sdk/2.11.2/sratoolkit.2.11.2-ubuntu64.tar.gz
  !tar xf sratoolkit.2.11.2-ubuntu64.tar.gz

In [6]:
!pip install -q findspark

In [7]:
if (os.path.exists("/content/inputbrief") == False):
  !mkdir /content/inputbrief
  !wget -O brief.fastq.gz https://sra-download.ncbi.nlm.nih.gov/traces/sra57/SRZ/016777/SRR16777190/MD_2_D8.r1.fastq.gz 
  !gunzip brief.fastq.gz
if BRIEF_NOSEQ == 0:
  !cp brief.fastq /content/inputbrief/brief.fastq
else:
  !head -n $BRIEF_NOSEQ  brief.fastq > /content/inputbrief/brief.fastq


In [8]:
# setup variabili d'ambiente
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
from pathlib import Path

import findspark
findspark.init()
# verifica delle variabili d'ambiente
findspark.find()

'/content/spark-3.2.0-bin-hadoop3.2'

In [9]:


def downloadFastq(runList):
  for singleId in runList:
    if os.path.exists(PREFETCH_BASEPATH+'/'+singleId):
      print("Prefetch delle run già completato")
    else:
      ncbiPrefetchProc = NCBI_PREFETCH_BIN + ' ' + singleId
      print("Eseguo prefetch: " + ncbiPrefetchProc)
      subprocess.check_call(ncbiPrefetchProc, shell=True)
      #ncbiFastqDump = "/content/sratoolkit.2.11.2-ubuntu64/bin/fastq-dump-orig.2.11.2 --outdir /content/input/fastq/ --gzip --skip-technical  --readids --read-filter pass --dumpbase --split-3 --clip /content/" + singleId + "/"+singleId+".sra"
      ncbiFastqDumpProc = NCBI_FASTQDUMP_BIN+" --outdir /content/input/fastq/ --skip-technical  --readids --read-filter pass --dumpbase --split-3 --clip /content/" + singleId + "/"+singleId+".sra"
      subprocess.check_call(ncbiFastqDumpProc, shell=True)

if not BRIEF:
  downloadFastq(sraRunIDlist)

In [10]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType


spark = SparkSession.builder\
        .master('local')\
        .appName('km-ark')\
        .config('spark.ui.port', '4050')\
        .config('spark.driver.maxResultSize', '10G')\
        .getOrCreate()

spark

In [11]:
if not BRIEF:
  datasetPath = Path("/content/input/fastq/")
  datasetFiles = [str(sf) for sf in datasetPath.iterdir()]
else:
  datasetPath = Path("/content/inputbrief/")
  datasetFiles = [str(sf) for sf in datasetPath.iterdir()]

print("File in input: ", len(datasetFiles))

sc = SparkContext.getOrCreate()

# read data from text file and split each line into words
#words = sc.textFile("/content/input.txt").flatMap(lambda line: line.split(" "))

def preProcess(s):
  return s

def stripFastQ(s):
  if (s[0] != '@') and (s[0] != '+'):
    return True
  else:
    return False
#rowDataFrame = sc.textFile(','.join(datasetFiles)).\
#                  map(lambda x: Row(row=stripId(x))).\
#                  zipWithIndex().\
#                  toDF(["fastqRow","index"])

# per ritornare tuple:    map(lambda x: (preProcess(x),1)).\
rawData = None

rawData = sc.textFile(','.join(datasetFiles)).zipWithIndex()
#rawData.collect()


File in input:  1


In [12]:


#filtered = rowData.filter(lambda line: stripFastQ(line)).collect()
rawDataFrame = rawData.map(lambda line: preProcess(line)).toDF(["fastqRow","index"])
rawDataFrame.show(truncate=False)
# aggiungiamo una chiave per il raggruppamento in 4 righe
gSeq = rawDataFrame.withColumn("group", col("index")/4)
gSeq = gSeq.withColumn("group", col("group").cast(IntegerType()))
# mostriamo il dataframe risultante
gSeq.show(truncate=False)
gSeqRDD = gSeq.rdd
#gSeqRDD.collect()

+-------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|fastqRow                                                                                                                                               |index|
+-------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|@A00152:72:HFKKWDSXX:1:1101:24632:1000 1:N:0:0                                                                                                         |0    |
|CTTATAAGCTCAATGGTGTTTTCCACCGTGATCCGCCGCTGACGCAGCGTGCTCTCAAATATCTGCCGGAAGACACATTGCGGTTCGTTGATAATAAAGCTACAGGCGTTATGTCTTCCCGGCTCAGTAAAATCGACATCTGCAATTTGCG|1    |
|+                                                                                                                                                      |2    |
|,FFFFFFF:FFFFF,FFFFFFFFFFFFFFF:FFFFFFFF

In [13]:
gSeqRDDgroup = gSeqRDD.map(lambda line: (line.group, (line.fastqRow, line.index)))
#gSeqRDDgroup.collect()

In [14]:
seqGroups = gSeqRDDgroup.groupByKey()

def setScoreDict():

  valString = "!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~"
  valDict = {}
  idx = 0
  for symbol in valString:
    valDict[symbol] = len(valString)-idx
    idx = idx+1

  return valDict
  
scoreTable = setScoreDict()

def parseSequenceId(sh):
  if (sh[0] == '@'):
    return sh.split()[0][1:]
       

def seqQuality(score_table, st):
  acc = 0
  for c in st:
    acc = acc + score_table[c]
  return acc

# seqCleaner per il momento implementa un metodo piuttosto rozzo per valutare la
# qualità dell'
def seqProcessor(rowGroup, st):
  score = 0  
  groupNumber = rowGroup[0]
  rowList = list(rowGroup[1])
  # ora abbiamo una lista di 4 elementi, il gruppo di righe
  seqHeader = rowList[0][0]
  rawSequence = rowList[1][0]
  seqScore = rowList[3][0]
  score = (seqQuality(scoreTable, rawSequence) / len(rawSequence))

  seqId = parseSequenceId(seqHeader)

  if (score > 54):
    return (groupNumber,(rawSequence, seqId))

cleanedSeq = seqGroups.map(lambda r: seqProcessor(r,scoreTable))
cleanedSeq = cleanedSeq.filter(lambda x: x != None)
#cleanedSeq.collect()

In [15]:
# Otteniamo un RDD delle sole sequenze filtrate al passaggio precedente
#seqRdd = cleanedSeq.map(lambda r: r[1][0])
seqRdd = cleanedSeq.map(lambda r: (r[1][0],r[1][1]))
#seqRdd.collect()

In [16]:
# Calcoliamo i k-mer di lunghezza KSIZE dalle sequenze
# Per ogni sequenza otteniamo una lista di k-mer che rendiamo in una sola string
def singleSeqKmer(l, ksize):
  sequence = l.strip()
  kmers = []

  for i in range(0,len(l)-ksize):
    kmers.append(sequence[i:i+ksize+1])

  kmerString = ' '.join(kmers)
  return kmerString

fullKmerSeq = seqRdd.map(lambda kmPair: (singleSeqKmer(kmPair[0],KSIZE),kmPair[1]))
#fullKmerSeq.collect()

In [17]:
# Dalla singola stringa andiamo ad effettuarle lo split finale
#kmerSplit = fullKmerSeq.map(lambda line: (line[0].split(" "),line[1]))
kmerSplit = fullKmerSeq.flatMap(lambda line: [(x, line[1]) for x in line[0].split(" ")])

#kmerSplit.collect()


In [None]:
def occReducer(lAccu, lCur):
  return (lAccu[0]+lCur[0],lAccu[1],lAccu[2],lAccu[3]+lCur[3])

kmerCounts = kmerSplit.map(lambda singleKmer: (singleKmer[0], (1,singleKmer[0][0:-1],singleKmer[0][1:],[singleKmer[1]]))).reduceByKey(lambda accu, cur: occReducer(accu, cur))
kmerCounts.saveAsTextFile(OUTPUT_PATH)

# stampiamo le prime 20 entries del RDD
kmerCounts.take(20)