
# Splicing Site Prediction using Apache Spark

## Notebook Info
This notebook shows the use of Apache Spark's MLLib to address the problem of Splicing Site Prediction.
## Author Info
Valerio Morfino [Linkedin page](https://www.linkedin.com/in/valerio-morfino/)

## Notebook Prerequisites
Prerequisites: 
- Python 3.5+
- Apache Spark 2.1+
- Jupyter Notebook 

In order to run this snotebook you need the following python libraries:
- findspark  (pip3 install findspark)
- tweepy	 (pip3 install tweepy)
- matplotlib (pip3 install matplotlib)
- seaborn    (pip3 install seaborn)

## How to run the notebook
Set the constant: SPARK_HOME
Make sure the dataset folder is in the same directory as the notebook.
Enjoy the notebook!

## Dataset Info
The dataset used is IPDATA (Irvine Primate splice-junction data set). It is a data set of human splice sites, and it consists of 767 donor splice sites, 765 acceptor splice sites, and 1654 false splice sites.


## More Info
For more info about Apache Spark and MLLib please visit:
- [Apache Spark Home Page](https://spark.apache.org/)
- [Apache Spark Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html)



In [None]:
#Set Spark Home
SPARK_HOME = "/home/osboxes/spark-2.2.1-bin-hadoop2.7"

In [None]:
#
# In this section are defined some function used later in the code
#

#Custom function to encode nucleotides according to Brain format 
def encodeDNASeq(seq, encoding='OneHot'):
    """Encode nucleotides from character to double or OneHot encoding.
    Using OneHot nucleotides are encoded as:
    A->1000; C->0100; G->0010; T->0001; other->0000
    Using Index as: A->1.0; C->2.0; G->3.0; t->4.0; other->0.0
    @param: seq A string containing a sequence of nucleotides 
    @param: encoding_type output encodig: OneHot or Index
    """    
    if encoding=="Index":
        mymap = {'A':1.0, 'C':2.0, 'G':3.0, 'T':4.0}

    else:
        mymap ={'A':SparseVector(4, [0], [1]), 
                'C':SparseVector(4, [1], [1]), 
                'G':SparseVector(4, [2], [1]), 
                'T':SparseVector(4, [3], [1])}    
    
    indexed_seq=list()
    #Verificare se si può sostituire con qualcosa di parallelizzabile
    #Mettere quì la selezione della finestra di osservazione
    for n in seq:
       indexed_seq.append(mymap.get(n) if n in mymap else SparseVector(4, [0], [0]))
    return indexed_seq   

#Split each line in single features
#encode each nucleotide using function encodeDNASeq
def load_dna_dataset(file_name,label_value, nrows=0, encoding='OneHot'):
    "Read Input Dataset contained in file_name. Data are labelled with value specified in label_value parameter"
    rdd = sc.textFile(file_name).flatMap(lambda line: [list(line)]).map(lambda s: encodeDNASeq(s,encoding)) 
        
    #Insert Label Column and convert Rdd into Dataframe in order to apply ML Algorithm
    return rdd.toDF().withColumn("label",lit(label_value))



#Plot Confusion Matrix and print all related indicators
def confusion_matrix(predictions_and_labels, print_heat_cm=True, print_Pandas_cm=False, print_summary=False ):
    "Print a summary of prediction via Confusion Matrix and other indicators. Return Confusion Matrix as array"
    import seaborn as sn
    import pandas as pd
    import matplotlib.pyplot as plt
    from pandas_ml import ConfusionMatrix
    
    df=predictions_and_labels.toPandas()
    cm = ConfusionMatrix(df['label'],df['prediction'])
    
    if print_Pandas_cm:
        cm.plot()

    if print_heat_cm:
        df_cm = pd.DataFrame(cm.to_array(), index = [0,1], columns = [0,1],)
        plt.figure(figsize = (10,7))
        ax=sn.heatmap(df_cm, annot=True,fmt="d")
        plt.ylabel("Actual")
        plt.xlabel("Predicted")
    
    if print_summary:
        cm.print_stats()
    return cm


In [None]:
#Set the Spark Home
import findspark
findspark.init(SPARK_HOME)

from pyspark.ml.linalg import SparseVector
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
import time
from pyspark.sql.functions import lit

#Inizialize Spark Context
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("splicing_site_prediction").getOrCreate()
sc = spark.sparkContext



In [None]:
#Load Training Set
#The input dataset is composed of 4 files: Positive instances and negative instances (Training and Test)
#Each data row cointains a string of nucleotides: A G C T.
#We will load each line as a single experiment and each character as a feature. 

#Load Positive instances 
training_set=load_dna_dataset("dataset/ipdata_tra_t_2018.txt",label_value=1.0)
#Load Negative instances
neg_tra=load_dna_dataset("dataset/ipdata_tra_f_2018.txt",label_value=0.0)
#Join in a single Dataframe
training_set = training_set.union(neg_tra)

In [None]:
#
#A bit of dataset exploration
#

#training_set.head()
#training_set.describe().show()
training_set.printSchema()
#training_set.count()
#training_set.filter("label=1.0").count()
#training_set.filter("label=0.0").count()


In [None]:
#Load Test set
test_set=load_dna_dataset("dataset/ipdata_test_t_2018.txt",label_value=1.0)
neg_test=load_dna_dataset("dataset/ipdata_test_f_2018.txt",label_value=0.0)
test_set = test_set.union(neg_test)

In [None]:
#Print test_Set schema
test_set.printSchema()

In [None]:
#Spark-ML algorithms requires a single vector containing each features
#Assemble vector of features
assembler = VectorAssembler(inputCols=training_set.columns[0:len(training_set.columns)-1],outputCol="features")
training=assembler.transform(training_set).select("label","features")
#Note: we are unsing the same vector assembler instantiated for trainig set.
test=assembler.transform(test_set).select("label","features")

In [None]:
#Assembled Data exploration
#training.printSchema()
#training.show()
training.head()
#training.printSchema()

In [None]:
#Cache data in memory.RDD (or Dataframe) are never persisted automatically by Spark.
#Here we use the cache because the same dataset is used for several training tasks
training.cache()
test.cache()
print("Training Tot instances: %s" %training.count())
print("Test Tot instances: %s" %test.count())


In [None]:
#
#Decision TREE CLASSIFIER
#
alg_label="DECISION TREE CLASSIFIER"

from pyspark.ml.classification import DecisionTreeClassifier

start_time = time.time()

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features",maxDepth=4)
dt_fitted = dt.fit(training)

print("%s Training time: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
start_time = time.time()

dt_predictions_and_labels = dt_fitted.transform(test)

print("%s Test: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))


In [None]:
confusion_matrix(dt_predictions_and_labels,print_summary=True)

In [None]:
#A bit of exploration
#print(dt_fitted.featureImportances)
print(dt_fitted.toDebugString)

In [None]:
#Ramdom Forest Classifier TRAINING
alg_label="RANDOM FOREST CLASSIFIER"
from pyspark.ml.classification import RandomForestClassifier
start_time = time.time()
rf = RandomForestClassifier(labelCol="label", featuresCol="features",numTrees=100, maxDepth=15)
rf_fitted = rf.fit(training)
print("%s Training time: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
start_time = time.time()
rf_predictions_and_labels = rf_fitted.transform(test)
print("%s Test: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
#Evaluation    
confusion_matrix(rf_predictions_and_labels,print_summary=True)

In [None]:
#Linear SVM TRAINING
alg_label="LINEAR SUPPORT VECTOR MACHINE"
from pyspark.ml.classification import LinearSVC
start_time = time.time()
lsvc = LinearSVC()
lsvc_fitted = lsvc.fit(training)
print("%s Training time: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
start_time = time.time()
lsvc_predictions_and_labels = lsvc_fitted.transform(test)
print("%s Test: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
#Evaluation    
confusion_matrix(lsvc_predictions_and_labels,print_summary=True)

In [None]:
#Naive Bayes
alg_label="Naive Bayes"
from pyspark.ml.classification import NaiveBayes
start_time = time.time()
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb_fitted = nb.fit(training)
print("%s Training time: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
start_time = time.time()
nb_predictions_and_labels = nb_fitted.transform(test)
print("%s Test: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
#Evaluation    
confusion_matrix(nb_predictions_and_labels,print_summary=True)


In [None]:
#Multilayer Perceptron Classifier
alg_label="MLP Classifier"

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
start_time = time.time()
# specify layers for the neural network:
# input layer of size 240 (Features)
# and output of size 2 (classes)
layers = [240, 40, 60, 2]

# create the trainer and set parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(training)
print("%s Training time: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
# compute accuracy on the test set
start_time = time.time()
nb_predictions_and_labels = model.transform(test)
print("%s Test: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))

#Print Confusion Matrix
confusion_matrix(nb_predictions_and_labels,print_summary=True)
