In [54]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.context import SparkConf
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pyspark.sql.types as T 
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, size
from operator import add
from functools import reduce
from bio_spark.io.fasta_reader import FASTAReader, FASTAQReader
import collections
import numpy as np
import sys

from pathlib import Path

from operator import add

# Sobre este Notebook

Este notebook executa uma clusterização de seuência de Aminoácidos usando a ML lib dp Spark. Clustrização é um método que pode auxiliar os pesquisadores a descobrir relações filogenéticas e/ou relações de similaridade entre sequências sem a necessidade de comparar com uma base de referência. O fluxo é composto dos seguintes passos:

1. Leutra e parsing do arquivos fasta de entrada
2. Cálculo dos Kmers a partir das sequências encontradas nos arquivos de entrada
3. Uso do método de Elbow para encontrar clusters coesos.

___

## Cluster local

Para fins de desenvolvimento, utilizamos imagens Docker para criar um cluster spark local. Esse cluster deve estar rodadndo para que o notebook funcione como esperado. Na raiz do projeto:

```shell
docker-compose up
```

In [3]:
sConf = SparkConf("spark://localhost:7077")
sc = SparkContext(conf=sConf)
spark = SparkSession(sc)

## Data Input

Tdoso os arquivos de entrada serão tratados em único Dataframe

```shell
INPUT_DIR_PATH: caminho para o diretório com os arquivs .fna (FASTA)
```

In [25]:
INPUT_DIR_PATH = Path("/home/thiago/Dados/sparkAAI-1/data/genomes/")
files_to_process = [str(f) for f in INPUT_DIR_PATH.iterdir()]
print("Files to process :", len(files_to_process))

Files to process : 10


In [30]:
fasta_plain_df = sc.textFile(','.join(files_to_process))\
            .map(lambda x: Row(row=x))\
            .zipWithIndex()\
            .toDF(["row","idx"])

print("raw file lines to process", fasta_plain_df.count())

raw file lines to process 86243


inspecionando o dataframe lido

In [31]:
fasta_plain_df.show()

+--------------------+---+
|                 row|idx|
+--------------------+---+
|[>ALPH01000001.1 ...|  0|
|[TCTCCCAGCACTTAGG...|  1|
|[CAACCTCTTTAGAGTT...|  2|
|[ATATTAGAAAGTACTT...|  3|
|[AATTCCCGCACTTCTT...|  4|
|[CAGGACTTGTATCAAG...|  5|
|[CCTGCAGTAACACATG...|  6|
|[TCTTATTTCTCTCCAA...|  7|
|[ATTCTACTTCTTGAAT...|  8|
|[CAACCTCCTGTTTTTA...|  9|
|[CCACATTAAATCTATA...| 10|
|[AATCTTGATTCAATTT...| 11|
|[CCACCAAATCTCCTAT...| 12|
|[ATCCGTTATATAAATT...| 13|
|[GCAAGTCAGGATCTTG...| 14|
|[CCTGAGATTGACTTCC...| 15|
|[TGTAAATTGATCATTA...| 16|
|[CGCCAATAAATTTGAT...| 17|
|[AGAAATTTCACCTCTT...| 18|
|[TTTAGAAACTTTAATT...| 19|
+--------------------+---+
only showing top 20 rows



### Parse dos arquivos FASTA

os arquivos [FASTA]([FASTA](https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Web&PAGE_TYPE=BlastDocs&DOC_TYPE=BlastHelp)), tem o seguinte formato:

```
>ID.CONTIG
ATTC....
GCG...
CCG...
>ID2.CONTIG
GGC...
...
```

nesta primeira sessão fazermos um parse desses arquivos para agrupar as sequẽncias por ID, calcular os kmers para esses contigs e obter um map com as freqências dos kmers em todos os contigs de uma sequẽncia.

In [32]:
def parse_fasta_id_line(l):
    """
    Desejamos extrair os IDs das sequências da linhas que começarem pelo caracter ''>'. Pelo padrão
    FASTA, o ID é a primeira palavra e é um campo composto por ID.CONTIG
    
    Input>
        l: Uma linha de um arquivo FASTA
    Return:
        ID: da sequência ignorando o número de contigs, ou None caso não seja uma linha de ID
    """
    if l[0][0] == ">":
        heaer_splits = l[0][1:].split(" ")[0]
        seq_id_split = heaer_splits.split(".")
        return seq_id_split[0]
    else:
        return None
seq2kmer_udf = udf(parse_fasta_id_line, T.StringType())

In [34]:
fasta_null_ids_df = fasta_plain_df.withColumn("seqID_wNull", seq2kmer_udf("row"))

inspecionar o resultado

In [35]:
fasta_null_ids_df.show()

+--------------------+---+------------+
|                 row|idx| seqID_wNull|
+--------------------+---+------------+
|[>ALPH01000001.1 ...|  0|ALPH01000001|
|[TCTCCCAGCACTTAGG...|  1|        null|
|[CAACCTCTTTAGAGTT...|  2|        null|
|[ATATTAGAAAGTACTT...|  3|        null|
|[AATTCCCGCACTTCTT...|  4|        null|
|[CAGGACTTGTATCAAG...|  5|        null|
|[CCTGCAGTAACACATG...|  6|        null|
|[TCTTATTTCTCTCCAA...|  7|        null|
|[ATTCTACTTCTTGAAT...|  8|        null|
|[CAACCTCCTGTTTTTA...|  9|        null|
|[CCACATTAAATCTATA...| 10|        null|
|[AATCTTGATTCAATTT...| 11|        null|
|[CCACCAAATCTCCTAT...| 12|        null|
|[ATCCGTTATATAAATT...| 13|        null|
|[GCAAGTCAGGATCTTG...| 14|        null|
|[CCTGAGATTGACTTCC...| 15|        null|
|[TGTAAATTGATCATTA...| 16|        null|
|[CGCCAATAAATTTGAT...| 17|        null|
|[AGAAATTTCACCTCTT...| 18|        null|
|[TTTAGAAACTTTAATT...| 19|        null|
+--------------------+---+------------+
only showing top 20 rows



In [242]:
num_ids = fasta_null_ids_df.where(F.col("seqID_wNull").isNotNull()).count()
print("número de seuências para serem processadas", num_ids)

número de seuências para serem processadas 1864


desejamos fazer um "fillna" com o último valor não nulo encontrado na coluna de sequência, para isso usaremos um operador de janela deslizante em cima do índice que serve para manter a ordem original das linhas

In [39]:
fasta_n_filter_df = fasta_null_ids_df.withColumn(
    "seqID", F.last('seqID_wNull', ignorenulls=True)\
    .over(Window\
    .orderBy('idx')\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)))

A seguir devemos excluir as linhas de header e renomear as colunas excluíndo as que não foram utilizadas

In [40]:
fasta_df = fasta_n_filter_df\
                .where(F.col("seqID_wNull").isNull())\
                .select("seqID","row")\
                .toDF("seqID","seq")

O Dataframe tratado tem o seguinte esquema

In [41]:
fasta_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- seq: struct (nullable = true)
 |    |-- row: string (nullable = true)



inspeção do daframe

In [42]:
fasta_df.show()

+------------+--------------------+
|       seqID|                 seq|
+------------+--------------------+
|ALPH01000001|[TCTCCCAGCACTTAGG...|
|ALPH01000001|[CAACCTCTTTAGAGTT...|
|ALPH01000001|[ATATTAGAAAGTACTT...|
|ALPH01000001|[AATTCCCGCACTTCTT...|
|ALPH01000001|[CAGGACTTGTATCAAG...|
|ALPH01000001|[CCTGCAGTAACACATG...|
|ALPH01000001|[TCTTATTTCTCTCCAA...|
|ALPH01000001|[ATTCTACTTCTTGAAT...|
|ALPH01000001|[CAACCTCCTGTTTTTA...|
|ALPH01000001|[CCACATTAAATCTATA...|
|ALPH01000001|[AATCTTGATTCAATTT...|
|ALPH01000001|[CCACCAAATCTCCTAT...|
|ALPH01000001|[ATCCGTTATATAAATT...|
|ALPH01000001|[GCAAGTCAGGATCTTG...|
|ALPH01000001|[CCTGAGATTGACTTCC...|
|ALPH01000001|[TGTAAATTGATCATTA...|
|ALPH01000001|[CGCCAATAAATTTGAT...|
|ALPH01000001|[AGAAATTTCACCTCTT...|
|ALPH01000001|[TTTAGAAACTTTAATT...|
|ALPH01000001|[CCCATCTTCCATTACC...|
+------------+--------------------+
only showing top 20 rows



### Calculate Kmers

Nesta sessão faremos o cálculo dos [kmers](https://en.wikipedia.org/wiki/K-mer) de tambo ```K```. O objetivo é associar cada ID de sequência ao conjunto de kmers distiontos presentes em todos os seus motifs

In [129]:
K = 3

In [130]:
Seq2kmerTy = T.ArrayType(T.StringType())
def seq2kmer(seq_):
    global K
    value = seq_[0].strip()
    num_kmers = len(value) - K + 1
    kmers_list = [value[n*K:K*(n+1)] for n in range(0, num_kmers)]
    
    # return len(value)
    return kmers_list

seq2kmer_udf = udf(seq2kmer,Seq2kmerTy)

In [131]:
fasta_kmers_df = fasta_df\
        .withColumn("kmers", seq2kmer_udf("seq"))\

inspeção do daframe

In [132]:
fasta_kmers_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- seq: struct (nullable = true)
 |    |-- row: string (nullable = true)
 |-- kmers: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [133]:
fasta_kmers_df.show()

+------------+--------------------+--------------------+
|       seqID|                 seq|               kmers|
+------------+--------------------+--------------------+
|ALPH01000001|[TCTCCCAGCACTTAGG...|[TCT, CCC, AGC, A...|
|ALPH01000001|[CAACCTCTTTAGAGTT...|[CAA, CCT, CTT, T...|
|ALPH01000001|[ATATTAGAAAGTACTT...|[ATA, TTA, GAA, A...|
|ALPH01000001|[AATTCCCGCACTTCTT...|[AAT, TCC, CGC, A...|
|ALPH01000001|[CAGGACTTGTATCAAG...|[CAG, GAC, TTG, T...|
|ALPH01000001|[CCTGCAGTAACACATG...|[CCT, GCA, GTA, A...|
|ALPH01000001|[TCTTATTTCTCTCCAA...|[TCT, TAT, TTC, T...|
|ALPH01000001|[ATTCTACTTCTTGAAT...|[ATT, CTA, CTT, C...|
|ALPH01000001|[CAACCTCCTGTTTTTA...|[CAA, CCT, CCT, G...|
|ALPH01000001|[CCACATTAAATCTATA...|[CCA, CAT, TAA, A...|
|ALPH01000001|[AATCTTGATTCAATTT...|[AAT, CTT, GAT, T...|
|ALPH01000001|[CCACCAAATCTCCTAT...|[CCA, CCA, AAT, C...|
|ALPH01000001|[ATCCGTTATATAAATT...|[ATC, CGT, TAT, A...|
|ALPH01000001|[GCAAGTCAGGATCTTG...|[GCA, AGT, CAG, G...|
|ALPH01000001|[CCTGAGATTGACTTCC

Para validação, podemos obter estatísticas básicas dso kmers obtidos. Para isso vamos contar o número de kmers por ID de sequência e obter um describe da coluna

In [134]:
n_kmers_df = fasta_kmers_df\
                    .withColumn("n_kmers", size(col("kmers")))\
                    .select("n_kmers")\

In [135]:
n_kmers_df.describe().show()

+-------+-----------------+
|summary|          n_kmers|
+-------+-----------------+
|  count|            84379|
|   mean|77.11937804430012|
| stddev|6.794605271811715|
|    min|                0|
|    max|               78|
+-------+-----------------+



## Análise das Sequências 

A seguir analisaremos as sequẽncias a partir dos kmers obtidos. O profile de uma seuquência é um mapeamento ```kmer->num ocorrencias``` que pode ser utilizado em análises de similaridade entre sequências.

In [136]:
KmerFreqTuple = T.MapType(T.StringType(), T.IntegerType())

def kmers_list2kmers_freq_dict(kmers_list):
    """
    Cálcula as frequências absolutas de cda kmer no dataframe
    Retorna:
        Um onjeto map("kmer" -> número de ocorrências ) para cada sequência
    """
    unique, counts = np.unique(kmers_list[0], return_counts=True)
    kmers_map = {str(k):int(v) for k, v in zip(unique, counts) if k}
    return kmers_map

kmers_list2kmers_freq_dict_udf = udf(kmers_list2kmers_freq_dict)

> esse dataframe foi criado apenas para inspeção, como utilizaremos o VectorCounter para criar features, o map em si tornou-se desnecessário

In [150]:
%%time
kmers_pofile_df = fasta_kmers_df\
            .groupby("seqID")\
            .agg(F.collect_list('kmers').alias('kmers_list'))\
            .withColumn('kmers_freq', kmers_list2kmers_freq_dict_udf('kmers_list'))

CPU times: user 4.86 ms, sys: 5.86 ms, total: 10.7 ms
Wall time: 34.6 ms


In [139]:
kmers_pofile_df.select("seqID", "kmers_freq").show()

+------------+--------------------+
|       seqID|          kmers_freq|
+------------+--------------------+
|ALPC01000098|{TGT=1, AA=1, AAA...|
|ALPG01000168|{TTA=1, TT=1, TGT...|
|ALPH01000049|{TTA=1, TT=1, ATT...|
|ALPH01000154|{ATT=3, AAA=1, TT...|
|ALPI01000077|{TTA=1, CCA=1, AA...|
|ALPJ01000168|{TGT=1, GGA=1, AG...|
|ALPK01000207|{TT=1, ATT=2, CGG...|
|ALPD01000057|{TTA=2, GGA=1, AT...|
|ALPH01000006|{GGA=1, CCA=2, CC...|
|ALPH01000151|{TTA=1, ATT=1, TC...|
|ALPH01000275|{TGT=1, ATT=1, AC...|
|ALPI01000038|{TTA=1, TGT=1, GG...|
|ALPJ01000066|{TTA=1, AGG=2, AA...|
|ALPJ01000133|{TTA=2, TT=1, ATT...|
|ALPJ01000183|{CCA=1, ATT=1, AC...|
|ALPK01000019|{TTA=1, TT=1, GGA...|
|ALPK01000076|{TTA=2, ATT=2, AA...|
|ALPK01000121|{TTA=1, TGT=1, GG...|
|ALPC01000044|{CCA=2, ATT=1, AA...|
|ALPC01000071|{TTA=2, AGG=1, CC...|
+------------+--------------------+
only showing top 20 rows



### Extração de features

O número de K que defie o tamanho dos k-mers define um espaço de features de dimensão $4^K$, para codificar essas features podemos usar a classe ```CountVectorizer```. Essa codificação atribui ordinais a cada kmer único e cria duas listas para representar a presença e o frequência absoluta dos mesmos

In [140]:
from pyspark.ml.feature import CountVectorizer

In [141]:
kmers_df = fasta_kmers_df.select("seqID", "kmers")

In [227]:
%%time
kmers_pofile_df = fasta_kmers_df.rdd\
            .map(lambda r: (r.seqID, r.kmers))\
            .reduceByKey(lambda x,y: x+y)\
            .toDF(["seqID", "kmers_list"])

CPU times: user 31.1 ms, sys: 5.86 ms, total: 37 ms
Wall time: 6.74 s


In [226]:
kmers_pofile_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- kmers_list: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [228]:
kmers_pofile_df.show()

+------------+--------------------+
|       seqID|          kmers_list|
+------------+--------------------+
|ALPH01000001|[TCT, CCC, AGC, A...|
|ALPH01000002|[CCT, TGC, TTA, T...|
|ALPH01000003|[ATT, CTT, CTT, C...|
|ALPH01000004|[AAT, ATC, ATT, T...|
|ALPH01000005|[AAC, TTT, TAA, T...|
|ALPH01000006|[CCA, CTA, CTA, A...|
|ALPH01000007|[CTT, GGC, TTG, T...|
|ALPH01000008|[CTG, AGT, CCT, A...|
|ALPH01000009|[CGA, TGT, AAT, G...|
|ALPH01000010|[TCT, CAC, TAG, A...|
|ALPH01000011|[GTT, TTT, ATC, A...|
|ALPH01000012|[AGG, GTG, TCG, G...|
|ALPH01000013|[TTT, TCA, TCT, A...|
|ALPH01000014|[AAT, GTT, GTG, A...|
|ALPH01000015|[ACT, GCA, GCA, T...|
|ALPH01000016|[GCA, ATA, CCT, C...|
|ALPH01000017|[GAC, TCT, GAA, A...|
|ALPH01000018|[AGA, CTC, ATT, G...|
|ALPH01000019|[CTT, CTA, TAT, C...|
|ALPH01000020|[AGG, ATT, TTT, T...|
+------------+--------------------+
only showing top 20 rows



In [229]:
%%time
cv = CountVectorizer(inputCol="kmers_list", outputCol="features")

model = cv.fit(kmers_pofile_df)

features_df = model.transform(kmers_pofile_df)

CPU times: user 7.12 ms, sys: 11.9 ms, total: 19 ms
Wall time: 2.4 s


In [231]:
features_df.show()

+------------+--------------------+--------------------+
|       seqID|          kmers_list|            features|
+------------+--------------------+--------------------+
|ALPH01000001|[TCT, CCC, AGC, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000002|[CCT, TGC, TTA, T...|(93,[0,1,2,3,4,5,...|
|ALPH01000003|[ATT, CTT, CTT, C...|(93,[0,1,2,3,4,5,...|
|ALPH01000004|[AAT, ATC, ATT, T...|(93,[0,1,2,3,4,5,...|
|ALPH01000005|[AAC, TTT, TAA, T...|(93,[0,1,2,3,4,5,...|
|ALPH01000006|[CCA, CTA, CTA, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000007|[CTT, GGC, TTG, T...|(93,[0,1,2,3,4,5,...|
|ALPH01000008|[CTG, AGT, CCT, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000009|[CGA, TGT, AAT, G...|(93,[0,1,2,3,4,5,...|
|ALPH01000010|[TCT, CAC, TAG, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000011|[GTT, TTT, ATC, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000012|[AGG, GTG, TCG, G...|(93,[0,1,2,3,4,5,...|
|ALPH01000013|[TTT, TCA, TCT, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000014|[AAT, GTT, GTG, A...|(93,[0,1,2,3,4,5,...|
|ALPH01000015|[ACT, GCA, GCA, T

In [243]:
%%time
unique_features_count = features_df.select("features").distinct().count()
print("Número de features únicas ",unique_features_count )

Número de features únicas  1864
CPU times: user 21.5 ms, sys: 3.89 ms, total: 25.4 ms
Wall time: 2.63 s


In [244]:
print("%d das %d sequências tem features únicas" % (unique_features_count, num_ids))

1864 das 1864 sequências tem features únicas


## Clustering

Para o ajuste dos hiperparâmetros da clusterização devemos fazer um parameter sweep para achar o número ideal de clusters. A avaliação da qualidade do cluster é dada pela [Métreica de Silhouette](https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/ml/evaluation/ClusteringEvaluator.html)

In [245]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [246]:
bkm = BisectingKMeans()
# model = bkm.fit(features_df)
clustering_pipeline = Pipeline(stages=[bkm])

In [261]:
%%time
paramGrid = ParamGridBuilder() \
    .addGrid(bkm.k, [2, 5, 10, 20, 50, 70, 100]) \
    .build()

crossval = CrossValidator(estimator=clustering_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=ClusteringEvaluator(),
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel= crossval.fit(features_df)

CPU times: user 1.54 s, sys: 665 ms, total: 2.21 s
Wall time: 2min 32s


In [262]:
cluster_df = cvModel.transform(features_df)

In [263]:
cluster_df.show()

+------------+--------------------+--------------------+----------+
|       seqID|          kmers_list|            features|prediction|
+------------+--------------------+--------------------+----------+
|ALPH01000001|[TCT, CCC, AGC, A...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000002|[CCT, TGC, TTA, T...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000003|[ATT, CTT, CTT, C...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000004|[AAT, ATC, ATT, T...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000005|[AAC, TTT, TAA, T...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000006|[CCA, CTA, CTA, A...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000007|[CTT, GGC, TTG, T...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000008|[CTG, AGT, CCT, A...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000009|[CGA, TGT, AAT, G...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000010|[TCT, CAC, TAG, A...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000011|[GTT, TTT, ATC, A...|(93,[0,1,2,3,4,5,...|         0|
|ALPH01000012|[AGG, GTG, TCG, G...|(93,[0,1,2,3,

In [264]:
cluster_df.select("prediction").describe().show()

+-------+--------------------+
|summary|          prediction|
+-------+--------------------+
|  count|                1864|
|   mean|0.032188841201716736|
| stddev|  0.1765486944363821|
|    min|                   0|
|    max|                   1|
+-------+--------------------+

