In [3]:
def encodeDNASeq(seq, encoding='OneHot'):
    """Encode nucleotides from character to double or OneHot encoding.
    Using OneHot nucleotides are encoded as:
    A->1000; C->0100; G->0010; T->0001; other->0000
    Using Index as: A->1.0; C->2.0; G->3.0; t->4.0; other->0.0
    @param: seq A string containing a sequence of nucleotides 
    @param: encoding_type output encodig: OneHot or Index

    """    
    if encoding=="Index":
        mymap = {'A':1.0, 'C':2.0, 'G':3.0, 'T':4.0, 'N':0.0}

    else:
        mymap ={'A':SparseVector(4, [0], [1]), 
                'C':SparseVector(4, [1], [1]), 
                'G':SparseVector(4, [2], [1]), 
                'T':SparseVector(4, [3], [1]), 
                'N':SparseVector(4,[0],[0])}    
    
    indexed_seq=list()
    for n in seq:
       indexed_seq.append(mymap.get(n) if n in mymap else SparseVector(4, [0], [0]))
    return indexed_seq   


#Inizialize Spark Context
import findspark
findspark.init("/home/osboxes/spark-2.3.1-bin-hadoop2.7")

import os
import pyspark
from pyspark.ml.linalg import SparseVector
from pyspark.ml.feature import VectorAssembler
import time
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession

#Load external Jar
os.environ['PYSPARK_SUBMIT_ARGS'] = ' --jars /home/osboxes/brainscala/target/scala-2.11/brain-scala-utils_2.11-1.0.jar pyspark-shell'


#Create Spark session
spark = SparkSession \
    .builder \
    .appName("test") \
    .getOrCreate()
sc = spark.sparkContext

#Infer data schema. The schema have to be passed to StreamingDataFrame
sdf = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("streaming-seqs/*.csv")
sdf.createOrReplaceTempView("seqs")
staticSchema = sdf.schema

#Create the StreamingDataFrame
#Each row of each file trigger a streaming event
streamingDataFrame = spark.readStream \
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("csv")\
.option("header","true")\
.load("streaming-seqs/*.csv")

#Create a groupByaction
classified=streamingDataFrame.groupBy("seq").count()

#For each event triggered the in memory table "instances" is updated
classified.writeStream\
    .format("memory")\
    .queryName("instances")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9b2b5563c8>

In [7]:
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
sqlContext = SQLContext(sc)

model = sc._jvm.brain.scala.BrainScalaModel.loadModel("ipdata-model")

for r in spark.sql(" select seq from instances").collect():
    seq = r.asDict()["seq"]
    encoded=encodeDNASeq(seq)
    encodedDF = spark.createDataFrame(Row(encoded)) 
    assembler = VectorAssembler(inputCols=encodedDF.columns[0:len(encodedDF.columns)],outputCol="features")
    unClassified=assembler.transform(encodedDF).select("features")
    classified = model.transform(unClassified._jdf)
    dfTestClassified = DataFrame(classified,sqlContext)
    dfTestClassified.show()    


+--------------------+-----+
|            features|class|
+--------------------+-----+
|(240,[0,7,9,12,18...|  0.0|
+--------------------+-----+

+--------------------+-----+
|            features|class|
+--------------------+-----+
|(240,[3,7,9,12,18...|  0.0|
+--------------------+-----+

+--------------------+-----+
|            features|class|
+--------------------+-----+
|(240,[2,7,9,12,18...|  0.0|
+--------------------+-----+

+--------------------+-----+
|            features|class|
+--------------------+-----+
|(240,[1,7,9,12,18...|  0.0|
+--------------------+-----+

