In [1]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.context import SparkConf
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pyspark.sql.types as T 
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, size
from operator import add
from os import path
from functools import reduce
from bio_spark.io.fasta_reader import FASTAReader, FASTAQReader
import collections
import numpy as np
import sys

from pathlib import Path

from operator import add

# Sobre este Notebook

Este notebook será dividido em duas tarefas: contagem de *k*-mers em genomas bacterianos e clusterização hierárquica do uso de *k*-mers nesses genomas. Para essa análise, vamos montar um cluster local de Spark usando Docker Compose. Essa análise permite uma análise *de novo* de similaridade entre os genomas, ou seja, ela pode ser feita sem uma referência externa.

O fluxo é composto dos seguintes passos:

1. Leitura e parsing do arquivos fasta de entrada;
2. Cálculo dos *k*-mers a partir das sequências encontradas nos arquivos de entrada;
3. Uso do método de Elbow para encontrar clusters coesos.

___

## Cluster local

Para fins de desenvolvimento, utilizamos imagens Docker para criar um cluster Spark local. Esse cluster deve estar rodando para que o notebook funcione como esperado. Na raiz do projeto:

```shell
$ docker-compose up -d
```

In [2]:
sConf = SparkConf("spark://localhost:7077")
sc = SparkContext(conf=sConf)
spark = SparkSession(sc)

## Data Input

Tdoso os arquivos de entrada serão tratados em único Dataframe

```shell
INPUT_DIR_PATH: caminho para o diretório com os arquivs .fna (FASTA)
```

In [3]:
home = "/Users/viniWS/Bio"

In [4]:
INPUT_DIR_PATH = Path(home, "sparkAAI/data/genomes/synechococcus")
files_to_process = [str(f) for f in INPUT_DIR_PATH.iterdir()]
print("Files to process :", len(files_to_process))

Files to process : 171


In [5]:
fasta_plain_df = sc.textFile(','.join(files_to_process))\
            .map(lambda x: Row(row=x))\
            .zipWithIndex()\
            .toDF(["row","idx"])

print("raw file lines to process", fasta_plain_df.count())

raw file lines to process 4490766


inspecionando o dataframe lido

In [None]:
fasta_plain_df.show()

### Parse dos arquivos FASTA

os arquivos [FASTA]([FASTA](https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Web&PAGE_TYPE=BlastDocs&DOC_TYPE=BlastHelp)), tem o seguinte formato:

```
>ID.CONTIG
ATTC....
GCG...
CCG...
>ID2.CONTIG
GGC...
...
```

nesta primeira sessão fazermos um parse desses arquivos para agrupar as sequẽncias por ID, calcular os kmers para esses contigs e obter um map com as freqências dos kmers em todos os contigs de uma sequẽncia.

In [None]:
def parse_fasta_id_line(l):
    """
    Desejamos extrair os IDs das sequências da linhas que começarem pelo caracter ''>'. Pelo padrão
    FASTA, o ID é a primeira palavra e é um campo composto por ID.CONTIG
    
    Input>
        l: Uma linha de um arquivo FASTA
    Return:
        ID: da sequência ignorando o número de contigs, ou None caso não seja uma linha de ID
    """
    if l[0][0] == ">":
        heaer_splits = l[0][1:].split(" ")[0]
        seq_id_split = heaer_splits.split(".")
        return seq_id_split[0]
    else:
        return None
seq2kmer_udf = udf(parse_fasta_id_line, T.StringType())

In [None]:
fasta_null_ids_df = fasta_plain_df.withColumn("seqID_wNull", seq2kmer_udf("row"))

inspecionar o resultado

In [None]:
fasta_null_ids_df.show()

In [None]:
num_ids = fasta_null_ids_df.where(F.col("seqID_wNull").isNotNull()).count()
print("número de seuências para serem processadas", num_ids)

desejamos fazer um "fillna" com o último valor não nulo encontrado na coluna de sequência, para isso usaremos um operador de janela deslizante em cima do índice que serve para manter a ordem original das linhas

In [None]:
fasta_n_filter_df = fasta_null_ids_df.withColumn(
    "seqID", F.last('seqID_wNull', ignorenulls=True)\
    .over(Window\
    .orderBy('idx')\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)))

A seguir devemos excluir as linhas de header e renomear as colunas excluíndo as que não foram utilizadas

In [None]:
fasta_df = fasta_n_filter_df\
                .where(F.col("seqID_wNull").isNull())\
                .select("seqID","row")\
                .toDF("seqID","seq")

O Dataframe tratado tem o seguinte esquema

In [None]:
fasta_df.printSchema()

inspeção do daframe

In [None]:
fasta_df.show()

### Calculate Kmers

Nesta sessão faremos o cálculo dos [kmers](https://en.wikipedia.org/wiki/K-mer) de tambo ```K```. O objetivo é associar cada ID de sequência ao conjunto de kmers distiontos presentes em todos os seus motifs

In [None]:
K = 3

In [None]:
Seq2kmerTy = T.ArrayType(T.StringType())
def seq2kmer(seq_):
    global K
    value = seq_[0].strip()
    num_kmers = len(value) - K + 1
    kmers_list = [value[n*K:K*(n+1)] for n in range(0, num_kmers)]
    
    # return len(value)
    return kmers_list

seq2kmer_udf = udf(seq2kmer,Seq2kmerTy)

In [None]:
fasta_kmers_df = fasta_df\
        .withColumn("kmers", seq2kmer_udf("seq"))\

inspeção do daframe

In [None]:
fasta_kmers_df.printSchema()

In [None]:
fasta_kmers_df.show()

Para validação, podemos obter estatísticas básicas dso kmers obtidos. Para isso vamos contar o número de kmers por ID de sequência e obter um describe da coluna

In [None]:
n_kmers_df = fasta_kmers_df\
                    .withColumn("n_kmers", size(col("kmers")))\
                    .select("n_kmers")\

In [None]:
n_kmers_df.describe().show()

## Análise das Sequências 

A seguir analisaremos as sequẽncias a partir dos kmers obtidos. O profile de uma seuquência é um mapeamento ```kmer->num ocorrencias``` que pode ser utilizado em análises de similaridade entre sequências.

In [None]:
KmerFreqTuple = T.MapType(T.StringType(), T.IntegerType())

def kmers_list2kmers_freq_dict(kmers_list):
    """
    Cálcula as frequências absolutas de cda kmer no dataframe
    Retorna:
        Um onjeto map("kmer" -> número de ocorrências ) para cada sequência
    """
    unique, counts = np.unique(kmers_list[0], return_counts=True)
    kmers_map = {str(k):int(v) for k, v in zip(unique, counts) if k}
    return kmers_map

kmers_list2kmers_freq_dict_udf = udf(kmers_list2kmers_freq_dict)

> esse dataframe foi criado apenas para inspeção, como utilizaremos o VectorCounter para criar features, o map em si tornou-se desnecessário

In [None]:
kmers_pofile_df = fasta_kmers_df\
            .groupby("seqID")\
            .agg(F.collect_list('kmers').alias('kmers_list'))\
            .withColumn('kmers_freq', kmers_list2kmers_freq_dict_udf('kmers_list'))

In [None]:
kmers_pofile_df.select("seqID", "kmers_freq").show()

### Extração de features

O número de K que defie o tamanho dos k-mers define um espaço de features de dimensão $4^K$, para codificar essas features podemos usar a classe ```CountVectorizer```. Essa codificação atribui ordinais a cada kmer único e cria duas listas para representar a presença e o frequência absoluta dos mesmos

In [None]:
from pyspark.ml.feature import CountVectorizer

In [None]:
kmers_df = fasta_kmers_df.select("seqID", "kmers")

In [None]:
%%time
kmers_pofile_df = fasta_kmers_df.rdd\
            .map(lambda r: (r.seqID, r.kmers))\
            .reduceByKey(lambda x,y: x+y)\
            .toDF(["seqID", "kmers_list"])

In [None]:
kmers_pofile_df.printSchema()

In [None]:
kmers_pofile_df.show()

In [None]:
%%time
cv = CountVectorizer(inputCol="kmers_list", outputCol="features")

model = cv.fit(kmers_pofile_df)

features_df = model.transform(kmers_pofile_df)

In [None]:
features_df.show()

In [None]:
%%time
unique_features_count = features_df.select("features").distinct().count()
print("Número de features únicas ",unique_features_count )

In [None]:
print("%d das %d sequências tem features únicas" % (unique_features_count, num_ids))

## Clustering

Para o ajuste dos hiperparâmetros da clusterização devemos fazer um parameter sweep para achar o número ideal de clusters. A avaliação da qualidade do cluster é dada pela [Métreica de Silhouette](https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/ml/evaluation/ClusteringEvaluator.html)

In [None]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
bkm = BisectingKMeans()
# model = bkm.fit(features_df)
clustering_pipeline = Pipeline(stages=[bkm])

In [None]:
%%time
paramGrid = ParamGridBuilder() \
    .addGrid(bkm.k, [2, 5, 10, 20, 50, 70, 100]) \
    .build()

crossval = CrossValidator(estimator=clustering_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=ClusteringEvaluator(),
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel= crossval.fit(features_df)

In [None]:
cluster_df = cvModel.transform(features_df)

In [None]:
cluster_df.show()

In [None]:
cluster_df.select("prediction").describe().show()