In [1]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.context import SparkConf
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pyspark.sql.types as T 
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, size
from operator import add
from functools import reduce
from bio_spark.io.fasta_reader import FASTAReader, FASTAQReader
import collections
import numpy as np
import sys

from pathlib import Path

from operator import add

# Sobre este Notebook

Este notebook executa uma clusterização de seuência de Aminoácidos usando a ML lib dp Spark. Clustrização é um método que pode auxiliar os pesquisadores a descobrir relações filogenéticas e/ou relações de similaridade entre sequências sem a necessidade de comparar com uma base de referência. O fluxo é composto dos seguintes passos:

1. Leutra e parsing do arquivos fasta de entrada
2. Cálculo dos Kmers a partir das sequências encontradas nos arquivos de entrada
3. Uso do método de Elbow para encontrar clusters coesos.

___

## Cluster local

Para fins de desenvolvimento, utilizamos imagens Docker para criar um cluster spark local. Esse cluster deve estar rodadndo para que o notebook funcione como esperado. Na raiz do projeto:

```shell
docker-compose up
```

In [2]:
sConf = SparkConf("spark://localhost:7077")
sc = SparkContext(conf=sConf)
spark = SparkSession(sc)

## Data Input

Tdoso os arquivos de entrada serão tratados em único Dataframe

```shell
INPUT_DIR_PATH: caminho para o diretório com os arquivs .fna (FASTA)
```

In [496]:
INPUT_DIR_PATH = Path("/home/thiago/Dados/sparkAAI-1/data/genomes/")
OUTPUT_DIR_PATH = Path("/home/thiago/Dados/sparkAAI-1/output/")
files_to_process = [str(f) for f in INPUT_DIR_PATH.iterdir()]
print("Files to process :", len(files_to_process))

Files to process : 10


In [286]:
fasta_plain_df = sc.textFile(','.join(files_to_process))\
            .map(lambda x: Row(row=x))\
            .zipWithIndex()\
            .toDF(["row","idx"])

print("raw file lines to process", fasta_plain_df.count())

raw file lines to process 86243


inspecionando o dataframe lido

In [287]:
fasta_plain_df.show()

+--------------------+---+
|                 row|idx|
+--------------------+---+
|[>ALPH01000001.1 ...|  0|
|[TCTCCCAGCACTTAGG...|  1|
|[CAACCTCTTTAGAGTT...|  2|
|[ATATTAGAAAGTACTT...|  3|
|[AATTCCCGCACTTCTT...|  4|
|[CAGGACTTGTATCAAG...|  5|
|[CCTGCAGTAACACATG...|  6|
|[TCTTATTTCTCTCCAA...|  7|
|[ATTCTACTTCTTGAAT...|  8|
|[CAACCTCCTGTTTTTA...|  9|
|[CCACATTAAATCTATA...| 10|
|[AATCTTGATTCAATTT...| 11|
|[CCACCAAATCTCCTAT...| 12|
|[ATCCGTTATATAAATT...| 13|
|[GCAAGTCAGGATCTTG...| 14|
|[CCTGAGATTGACTTCC...| 15|
|[TGTAAATTGATCATTA...| 16|
|[CGCCAATAAATTTGAT...| 17|
|[AGAAATTTCACCTCTT...| 18|
|[TTTAGAAACTTTAATT...| 19|
+--------------------+---+
only showing top 20 rows



### Parse dos arquivos FASTA

os arquivos [FASTA]([FASTA](https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Web&PAGE_TYPE=BlastDocs&DOC_TYPE=BlastHelp)), tem o seguinte formato:

```
>ID.CONTIG
ATTC....
GCG...
CCG...
>ID2.CONTIG
GGC...
...
```

nesta primeira sessão fazermos um parse desses arquivos para agrupar as sequẽncias por ID, calcular os kmers para esses contigs e obter um map com as freqências dos kmers em todos os contigs de uma sequẽncia.

In [288]:
def parse_fasta_id_line(l):
    """
    Desejamos extrair os IDs das sequências da linhas que começarem pelo caracter ''>'. Pelo padrão
    FASTA, o ID é a primeira palavra e é um campo composto por ID.CONTIG
    
    Input>
        l: Uma linha de um arquivo FASTA
    Return:
        ID: da sequência ignorando o número de contigs, ou None caso não seja uma linha de ID
    """
    if l[0][0] == ">":
        heaer_splits = l[0][1:].split(" ")[0]
        seq_id_split = heaer_splits.split(".")
        return seq_id_split[0]
    else:
        return None
seq2kmer_udf = udf(parse_fasta_id_line, T.StringType())

In [289]:
fasta_null_ids_df = fasta_plain_df.withColumn("seqID_wNull", seq2kmer_udf("row"))

inspecionar o resultado

In [290]:
fasta_null_ids_df.show()

+--------------------+---+------------+
|                 row|idx| seqID_wNull|
+--------------------+---+------------+
|[>ALPH01000001.1 ...|  0|ALPH01000001|
|[TCTCCCAGCACTTAGG...|  1|        null|
|[CAACCTCTTTAGAGTT...|  2|        null|
|[ATATTAGAAAGTACTT...|  3|        null|
|[AATTCCCGCACTTCTT...|  4|        null|
|[CAGGACTTGTATCAAG...|  5|        null|
|[CCTGCAGTAACACATG...|  6|        null|
|[TCTTATTTCTCTCCAA...|  7|        null|
|[ATTCTACTTCTTGAAT...|  8|        null|
|[CAACCTCCTGTTTTTA...|  9|        null|
|[CCACATTAAATCTATA...| 10|        null|
|[AATCTTGATTCAATTT...| 11|        null|
|[CCACCAAATCTCCTAT...| 12|        null|
|[ATCCGTTATATAAATT...| 13|        null|
|[GCAAGTCAGGATCTTG...| 14|        null|
|[CCTGAGATTGACTTCC...| 15|        null|
|[TGTAAATTGATCATTA...| 16|        null|
|[CGCCAATAAATTTGAT...| 17|        null|
|[AGAAATTTCACCTCTT...| 18|        null|
|[TTTAGAAACTTTAATT...| 19|        null|
+--------------------+---+------------+
only showing top 20 rows



In [291]:
num_ids = fasta_null_ids_df.where(F.col("seqID_wNull").isNotNull()).count()
print("número de seuências para serem processadas", num_ids)

número de seuências para serem processadas 1864


desejamos fazer um "fillna" com o último valor não nulo encontrado na coluna de sequência, para isso usaremos um operador de janela deslizante em cima do índice que serve para manter a ordem original das linhas

In [292]:
fasta_n_filter_df = fasta_null_ids_df.withColumn(
    "seqID", F.last('seqID_wNull', ignorenulls=True)\
    .over(Window\
    .orderBy('idx')\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)))

A seguir devemos excluir as linhas de header e renomear as colunas excluíndo as que não foram utilizadas

In [293]:
fasta_n_filter_df.show()

+--------------------+---+------------+------------+
|                 row|idx| seqID_wNull|       seqID|
+--------------------+---+------------+------------+
|[>ALPH01000001.1 ...|  0|ALPH01000001|ALPH01000001|
|[TCTCCCAGCACTTAGG...|  1|        null|ALPH01000001|
|[CAACCTCTTTAGAGTT...|  2|        null|ALPH01000001|
|[ATATTAGAAAGTACTT...|  3|        null|ALPH01000001|
|[AATTCCCGCACTTCTT...|  4|        null|ALPH01000001|
|[CAGGACTTGTATCAAG...|  5|        null|ALPH01000001|
|[CCTGCAGTAACACATG...|  6|        null|ALPH01000001|
|[TCTTATTTCTCTCCAA...|  7|        null|ALPH01000001|
|[ATTCTACTTCTTGAAT...|  8|        null|ALPH01000001|
|[CAACCTCCTGTTTTTA...|  9|        null|ALPH01000001|
|[CCACATTAAATCTATA...| 10|        null|ALPH01000001|
|[AATCTTGATTCAATTT...| 11|        null|ALPH01000001|
|[CCACCAAATCTCCTAT...| 12|        null|ALPH01000001|
|[ATCCGTTATATAAATT...| 13|        null|ALPH01000001|
|[GCAAGTCAGGATCTTG...| 14|        null|ALPH01000001|
|[CCTGAGATTGACTTCC...| 15|        null|ALPH010

In [294]:
fasta_df = fasta_n_filter_df\
                .where(F.col("seqID_wNull").isNull())\
                .select("seqID","row")\
                .toDF("seqID","seq")

O Dataframe tratado tem o seguinte esquema

In [295]:
fasta_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- seq: struct (nullable = true)
 |    |-- row: string (nullable = true)



inspeção do daframe

In [296]:
fasta_per_seq_df = fasta_df.rdd\
            .map(lambda r: (r.seqID, r.seq[0]))\
            .reduceByKey(lambda x,y:x+y)\
            .map(lambda x: Row(seqID=x[1],seq=x[0]))\
            .toDF(["seqID", "seq"])

In [297]:
fasta_per_seq_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- seq: string (nullable = true)



In [298]:
fasta_per_seq_df.show()

+------------+--------------------+
|       seqID|                 seq|
+------------+--------------------+
|ALPH01000001|TCTCCCAGCACTTAGGC...|
|ALPH01000002|CCTTGCTTATTTAGAAA...|
|ALPH01000003|ATTCTTCTTCATCATCC...|
|ALPH01000004|AATATCATTTCTTACTT...|
|ALPH01000005|AACTTTTAATTGGCAAA...|
|ALPH01000006|CCACTACTAACAATTTC...|
|ALPH01000007|CTTGGCTTGTTTTTATC...|
|ALPH01000008|CTGAGTCCTATTTAAAT...|
|ALPH01000009|CGATGTAATGGCTATGC...|
|ALPH01000010|TCTCACTAGAAGAAAAT...|
|ALPH01000011|GTTTTTATCAGTAGCTT...|
|ALPH01000012|AGGGTGTCGGTTAAAAG...|
|ALPH01000013|TTTTCATCTAATAAGTA...|
|ALPH01000014|AATGTTGTGAGCTTTAA...|
|ALPH01000015|ACTGCAGCATTATTTAT...|
|ALPH01000016|GCAATACCTCCAACAAT...|
|ALPH01000017|GACTCTGAAAGTAAATA...|
|ALPH01000018|AGACTCATTGGACATAT...|
|ALPH01000019|CTTCTATATCACTAGCG...|
|ALPH01000020|AGGATTTTTTATTTTTA...|
+------------+--------------------+
only showing top 20 rows



### Calculate Kmers

Nesta sessão faremos o cálculo dos [kmers](https://en.wikipedia.org/wiki/K-mer) de tambo ```K```. O objetivo é associar cada ID de sequência ao conjunto de kmers distiontos presentes em todos os seus motifs

In [299]:
K = 3

In [300]:
Seq2kmerTy = T.ArrayType(T.StringType())
def seq2kmer(seq_):
    global K
    value = seq_.strip()
    num_kmers = len(value) - K + 1
    kmers_list = [value[n:K+n] for n in range(0, num_kmers)]
    
    # return len(value)
    return kmers_list

seq2kmer_udf = udf(seq2kmer,Seq2kmerTy)

In [301]:
fasta_kmers_df = fasta_per_seq_df\
        .withColumn("kmers", seq2kmer_udf("seq"))\

In [302]:
fasta_kmers_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- seq: string (nullable = true)
 |-- kmers: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [303]:
fasta_kmers_df.show()

+------------+--------------------+--------------------+
|       seqID|                 seq|               kmers|
+------------+--------------------+--------------------+
|ALPH01000001|TCTCCCAGCACTTAGGC...|[TCT, CTC, TCC, C...|
|ALPH01000002|CCTTGCTTATTTAGAAA...|[CCT, CTT, TTG, T...|
|ALPH01000003|ATTCTTCTTCATCATCC...|[ATT, TTC, TCT, C...|
|ALPH01000004|AATATCATTTCTTACTT...|[AAT, ATA, TAT, A...|
|ALPH01000005|AACTTTTAATTGGCAAA...|[AAC, ACT, CTT, T...|
|ALPH01000006|CCACTACTAACAATTTC...|[CCA, CAC, ACT, C...|
|ALPH01000007|CTTGGCTTGTTTTTATC...|[CTT, TTG, TGG, G...|
|ALPH01000008|CTGAGTCCTATTTAAAT...|[CTG, TGA, GAG, A...|
|ALPH01000009|CGATGTAATGGCTATGC...|[CGA, GAT, ATG, T...|
|ALPH01000010|TCTCACTAGAAGAAAAT...|[TCT, CTC, TCA, C...|
|ALPH01000011|GTTTTTATCAGTAGCTT...|[GTT, TTT, TTT, T...|
|ALPH01000012|AGGGTGTCGGTTAAAAG...|[AGG, GGG, GGT, G...|
|ALPH01000013|TTTTCATCTAATAAGTA...|[TTT, TTT, TTC, T...|
|ALPH01000014|AATGTTGTGAGCTTTAA...|[AAT, ATG, TGT, G...|
|ALPH01000015|ACTGCAGCATTATTTAT

inspeção do daframe

Para validação, podemos obter estatísticas básicas dso kmers obtidos. Para isso vamos contar o número de kmers por ID de sequência e obter um describe da coluna

In [305]:
n_kmers_df = fasta_kmers_df\
                    .withColumn("n_kmers", size(col("kmers")))\
                    .select("n_kmers")\

In [306]:
kmers_pofile_df = fasta_kmers_df.select("seqID","kmers")

### Extração de features

O número de K que defie o tamanho dos k-mers define um espaço de features de dimensão $4^K$, para codificar essas features podemos usar a classe ```CountVectorizer```. Essa codificação atribui ordinais a cada kmer único e cria duas listas para representar a presença e o frequência absoluta dos mesmos

In [307]:
from pyspark.ml.feature import CountVectorizer

In [308]:
kmers_pofile_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- kmers: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [309]:
%%time
cv = CountVectorizer(inputCol="kmers", outputCol="features")

model = cv.fit(kmers_pofile_df)

features_df = model.transform(kmers_pofile_df)

CPU times: user 14.9 ms, sys: 0 ns, total: 14.9 ms
Wall time: 3.46 s


In [310]:
## conferir resultado temporário
features_df.select("seqID","features").toPandas().to_csv('features.csv')

In [311]:
%%time
unique_features_count = features_df.select("features").distinct().count()
print("Número de features únicas ",unique_features_count )

Número de features únicas  1864
CPU times: user 16.5 ms, sys: 4.31 ms, total: 20.8 ms
Wall time: 4.21 s


In [276]:
print("%d das %d sequências tem features únicas" % (unique_features_count, num_ids))

108 das 108 sequências tem features únicas


In [313]:
features_df.printSchema()

root
 |-- seqID: string (nullable = true)
 |-- kmers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)



In [312]:
features_df.show()

+------------+--------------------+--------------------+
|       seqID|               kmers|            features|
+------------+--------------------+--------------------+
|ALPH01000001|[TCT, CTC, TCC, C...|(82,[0,1,2,3,4,5,...|
|ALPH01000002|[CCT, CTT, TTG, T...|(82,[0,1,2,3,4,5,...|
|ALPH01000003|[ATT, TTC, TCT, C...|(82,[0,1,2,3,4,5,...|
|ALPH01000004|[AAT, ATA, TAT, A...|(82,[0,1,2,3,4,5,...|
|ALPH01000005|[AAC, ACT, CTT, T...|(82,[0,1,2,3,4,5,...|
|ALPH01000006|[CCA, CAC, ACT, C...|(82,[0,1,2,3,4,5,...|
|ALPH01000007|[CTT, TTG, TGG, G...|(82,[0,1,2,3,4,5,...|
|ALPH01000008|[CTG, TGA, GAG, A...|(82,[0,1,2,3,4,5,...|
|ALPH01000009|[CGA, GAT, ATG, T...|(82,[0,1,2,3,4,5,...|
|ALPH01000010|[TCT, CTC, TCA, C...|(82,[0,1,2,3,4,5,...|
|ALPH01000011|[GTT, TTT, TTT, T...|(82,[0,1,2,3,4,5,...|
|ALPH01000012|[AGG, GGG, GGT, G...|(82,[0,1,2,3,4,5,...|
|ALPH01000013|[TTT, TTT, TTC, T...|(82,[0,1,2,3,4,5,...|
|ALPH01000014|[AAT, ATG, TGT, G...|(82,[0,1,2,3,4,5,...|
|ALPH01000015|[ACT, CTG, TGC, G

salva arquivo temporário com os kmers e suas frequências

In [491]:
def sparseVectorToColumsn(seqID, vector):
    global vocab
    
    vector_as_dict = {vocab[k]:str(v) for k,v in zip(vector.indices, vector.values)}
    vector_as_dict["seqID"] = seqID
    vector_as_row = Row(**vector_as_dict)
    return vector_as_row

In [492]:
pivoted_kmers_rdd = features_df.rdd\
            .map(lambda r: sparseVectorToColumsn(r.seqID, r.features))

In [501]:
pivoted_kmers_rdd.saveAsTextFile(str(OUTPUT_DIR_PATH.joinpath("kmers_freq")))

## Clustering

Para o ajuste dos hiperparâmetros da clusterização devemos fazer um parameter sweep para achar o número ideal de clusters. A avaliação da qualidade do cluster é dada pela [Métreica de Silhouette](https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/ml/evaluation/ClusteringEvaluator.html)

In [502]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [503]:
bkm = BisectingKMeans()
# model = bkm.fit(features_df)
clustering_pipeline = Pipeline(stages=[bkm])

In [504]:
%%time
paramGrid = ParamGridBuilder() \
    .addGrid(bkm.k, [5, 10, 20, 50, 70, 100]) \
    .build()

crossval = CrossValidator(estimator=clustering_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=ClusteringEvaluator(),
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel= crossval.fit(features_df)

CPU times: user 1.53 s, sys: 544 ms, total: 2.08 s
Wall time: 6min 30s


In [505]:
cluster_df = cvModel.transform(features_df)

## Saving result

In [536]:
cluster_pandasDF = cluster_df.toPandas()

In [537]:
cluster_pandasDF.to_csv(str(OUTPUT_DIR_PATH.joinpath("result_cluster.csv")))

In [538]:
import pickle

In [539]:
# with open(str(OUTPUT_DIR_PATH.joinpath("cluster_model")),"w") as f:
#     pickle.dumps(cvModel, f)

In [540]:
cluster_df.show()

+------------+--------------------+--------------------+----------+
|       seqID|               kmers|            features|prediction|
+------------+--------------------+--------------------+----------+
|ALPH01000001|[TCT, CTC, TCC, C...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000002|[CCT, CTT, TTG, T...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000003|[ATT, TTC, TCT, C...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000004|[AAT, ATA, TAT, A...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000005|[AAC, ACT, CTT, T...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000006|[CCA, CAC, ACT, C...|(82,[0,1,2,3,4,5,...|         1|
|ALPH01000007|[CTT, TTG, TGG, G...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000008|[CTG, TGA, GAG, A...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000009|[CGA, GAT, ATG, T...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000010|[TCT, CTC, TCA, C...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000011|[GTT, TTT, TTT, T...|(82,[0,1,2,3,4,5,...|         0|
|ALPH01000012|[AGG, GGG, GGT, G...|(82,[0,1,2,3,

In [541]:
cluster_df.select("prediction").describe().show()

+-------+------------------+
|summary|        prediction|
+-------+------------------+
|  count|              1864|
|   mean|0.3063304721030043|
| stddev| 0.690432002592243|
|    min|                 0|
|    max|                 4|
+-------+------------------+

