In [10]:
import findspark
findspark.init('/home/joadmin/spark-2.1.2-bin-hadoop2.7/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('gdelt_1').config("spark.jars.packages").getOrCreate()

# Load csv file as RDD
train_path='/home/joadmin/BTC/train.csv'
test_path='/home/joadmin/BTC/test.csv'
# # 1. Data Loading and Parsing
train_df = spark.read.load(train_path,format="csv", delimiter=",", header=True)
test_df = spark.read.load(test_path,format="csv", delimiter=",", header=True)

In [11]:
train_df.show(5)
test_df.show(5)
train_df.printSchema() 
test_df.printSchema() 
print (train_df.count())
print (test_df.count())

+----------+-------+------------+------------+------------+------------+-----------+------------+------------+-----------+
|nextUpDown|pUpDown|   ratiotxin|  ratiotxout|ratiotxinout|  ratiotxBTC|  ratioopen|   ratiohigh|    ratiolow| ratioclose|
+----------+-------+------------+------------+------------+------------+-----------+------------+------------+-----------+
|         1|      1|-0.164619165|-0.181481481| 0.766917293| 0.040888661|          0| 0.166666667|           0|0.166666667|
|         0|      1| 0.067647059| 0.113122172| 1.264705882| 0.188267056|0.166666667| 0.285714286|-0.833333333|0.285714286|
|         1|      0| 0.104683196| 0.130081301| 1.201550388| 0.203346836|0.285714286| 0.333333333|           6|          0|
|         0|      1|-0.094763092| -0.14028777| 0.741935484|-0.342087427|          0| 0.083333333| 0.142857143|0.111111111|
|         0|      0| -0.05785124|-0.052301255| 0.965217391|-0.506795192|0.111111111|-0.230769231|       0.125|       -0.1|
+----------+----

In [13]:
# train_df add col. "Mark" with value train
train_df = train_df.withColumn('Mark',lit('train'))
train_df.show(3)
# test_df append col. "ACTION" with value '0' and  col. "Mark" with value 'test'
test_df = (test_df.withColumn('nextUpDown',lit(0)).withColumn('Mark',lit('test')))
test_df.show(3)
# declare test_df has the same header(ordered) with train_df
test_df = test_df[train_df.columns]
test_df.show(3)

+----------+-------+------------+------------+------------+-----------+-----------+-----------+------------+-----------+-----+
|nextUpDown|pUpDown|   ratiotxin|  ratiotxout|ratiotxinout| ratiotxBTC|  ratioopen|  ratiohigh|    ratiolow| ratioclose| Mark|
+----------+-------+------------+------------+------------+-----------+-----------+-----------+------------+-----------+-----+
|         1|      1|-0.164619165|-0.181481481| 0.766917293|0.040888661|          0|0.166666667|           0|0.166666667|train|
|         0|      1| 0.067647059| 0.113122172| 1.264705882|0.188267056|0.166666667|0.285714286|-0.833333333|0.285714286|train|
|         1|      0| 0.104683196| 0.130081301| 1.201550388|0.203346836|0.285714286|0.333333333|           6|          0|train|
+----------+-------+------------+------------+------------+-----------+-----------+-----------+------------+-----------+-----+
only showing top 3 rows

+-------+------------+------------+------------+------------+------------+------------

In [14]:
## Append Test data to Train data
df = train_df.unionAll(test_df)
print (df.count())
df.printSchema()

2745
root
 |-- nextUpDown: string (nullable = true)
 |-- pUpDown: string (nullable = true)
 |-- ratiotxin: string (nullable = true)
 |-- ratiotxout: string (nullable = true)
 |-- ratiotxinout: string (nullable = true)
 |-- ratiotxBTC: string (nullable = true)
 |-- ratioopen: string (nullable = true)
 |-- ratiohigh: string (nullable = true)
 |-- ratiolow: string (nullable = true)
 |-- ratioclose: string (nullable = true)
 |-- Mark: string (nullable = false)



In [16]:
#Convert string to Numeric
df = (df.withColumn('nextUpDown',df['nextUpDown'].cast("double")))
df = (df.withColumn('pUpDown',df['pUpDown'].cast("double")))
df = (df.withColumn('ratiotxin',df['ratiotxin'].cast("double")))
df = (df.withColumn('ratiotxout',df['ratiotxout'].cast("double")))
df = (df.withColumn('ratiotxinout',df['ratiotxinout'].cast("double")))
df = (df.withColumn('ratiotxBTC',df['ratiotxBTC'].cast("double")))
df = (df.withColumn('ratioopen',df['ratioopen'].cast("double")))
df = (df.withColumn('ratiohigh',df['ratiohigh'].cast("double")))
df = (df.withColumn('ratiolow',df['ratiolow'].cast("double")))
df = (df.withColumn('ratioclose',df['ratioclose'].cast("double")))
df.printSchema()

root
 |-- nextUpDown: double (nullable = true)
 |-- pUpDown: double (nullable = true)
 |-- ratiotxin: double (nullable = true)
 |-- ratiotxout: double (nullable = true)
 |-- ratiotxinout: double (nullable = true)
 |-- ratiotxBTC: double (nullable = true)
 |-- ratioopen: double (nullable = true)
 |-- ratiohigh: double (nullable = true)
 |-- ratiolow: double (nullable = true)
 |-- ratioclose: double (nullable = true)
 |-- Mark: string (nullable = false)



In [28]:
numVars = ['nextUpDown','pUpDown','ratiotxin','ratiotxout','ratiotxinout','ratiotxBTC','ratioopen','ratiohigh','ratiolow','ratioclose']
catVars = []
## Reserve a CatVar array for potential category features

from pyspark.ml.feature import StringIndexer
## make use of pipeline to index all categorical variables
def indexer(df,col):
    si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
    return si

indexers = [indexer(df,col) for col in catVars]

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df).transform(df)
df_indexed.show(3)


+----------+-------+------------+------------+------------+-----------+-----------+-----------+------------+-----------+-----+
|nextUpDown|pUpDown|   ratiotxin|  ratiotxout|ratiotxinout| ratiotxBTC|  ratioopen|  ratiohigh|    ratiolow| ratioclose| Mark|
+----------+-------+------------+------------+------------+-----------+-----------+-----------+------------+-----------+-----+
|       1.0|    1.0|-0.164619165|-0.181481481| 0.766917293|0.040888661|        0.0|0.166666667|         0.0|0.166666667|train|
|       0.0|    1.0| 0.067647059| 0.113122172| 1.264705882|0.188267056|0.166666667|0.285714286|-0.833333333|0.285714286|train|
|       1.0|    0.0| 0.104683196| 0.130081301| 1.201550388|0.203346836|0.285714286|0.333333333|         6.0|        0.0|train|
+----------+-------+------------+------------+------------+-----------+-----------+-----------+------------+-----------+-----+
only showing top 3 rows



In [29]:
# Convert to label/features format
# In order to apply ML/MLLIB, we need covert features to Vectors (either SparseVector or DenseVector).
catVarsIndexed = [i+'_indexed' for i in catVars]
featuresCol = numVars+catVarsIndexed
featuresCol.remove('nextUpDown')
labelCol = ['Mark','nextUpDown']

from pyspark.sql import Row
#from pyspark.mllib.linalg import DenseVector
from pyspark.ml.linalg import DenseVector
row = Row('mark','label','features')
# 0-mark, 1-label, 2-features
 
df_indexed = df_indexed[labelCol+featuresCol]
# 0-mark, 1-label, 2-features
# map features to DenseVector
#lf = df_indexed.map(lambda r: (row(r[0], r[1],DenseVector(r[2:])))).toDF()
lf = df_indexed.rdd.map(lambda r: (row(r[0], r[1],DenseVector(r[2:])))).toDF()
# index label
# convert numeric label to categorical, which is required by
# decisionTree and randomForest
lf = StringIndexer(inputCol = 'label',outputCol='index').fit(lf).transform(lf)

lf.show()
print (lf.count())

+-----+-----+--------------------+-----+
| mark|label|            features|index|
+-----+-----+--------------------+-----+
|train|  1.0|[1.0,-0.164619165...|  0.0|
|train|  0.0|[1.0,0.067647059,...|  1.0|
|train|  1.0|[0.0,0.104683196,...|  0.0|
|train|  0.0|[1.0,-0.094763092...|  1.0|
|train|  0.0|[0.0,-0.05785124,...|  1.0|
|train|  1.0|[0.0,0.029239766,...|  0.0|
|train|  0.0|[1.0,0.034090909,...|  1.0|
|train|  0.0|[0.0,0.008241758,...|  1.0|
|train|  0.0|[0.0,-0.10626703,...|  1.0|
|train|  0.0|[0.0,0.344512195,...|  1.0|
|train|  0.0|[0.0,0.036281179,...|  1.0|
|train|  0.0|[0.0,0.266958425,...|  1.0|
|train|  0.0|[0.0,0.595854922,...|  1.0|
|train|  1.0|[0.0,-0.228354978...|  0.0|
|train|  0.0|[1.0,-0.225806452...|  1.0|
|train|  1.0|[0.0,0.09057971,0...|  0.0|
|train|  0.0|[1.0,-0.141196013...|  1.0|
|train|  1.0|[0.0,0.067698259,...|  0.0|
|train|  1.0|[1.0,-0.257246377...|  0.0|
|train|  1.0|[1.0,0.263414634,...|  0.0|
+-----+-----+--------------------+-----+
only showing top

In [30]:
# split back train/test data
train = lf.where(lf.mark =='train')
test = lf.where(lf.mark =='test')

# random split further to get train/validate
train,validate = train.randomSplit([0.8,0.2],seed =121)
 
print ('Train Data Number of Row: '+ str(train.count()))
print ('Validate Data Number of Row: '+ str(validate.count()))
print ('Test Data Number of Row: '+ str(test.count()))

Train Data Number of Row: 2142
Validate Data Number of Row: 553
Test Data Number of Row: 50


In [31]:
# 4. Apply Models from ML/MLLIB
# ML is built based on DataFrame, while mllib is based on RDD.
# I’m going to fit the logistic, decision tree and random forest models from ML packages.

#Logistic Regression
from pyspark.ml.classification import LogisticRegression
 
# regPara: lasso regularisation parameter (L1)
lr = LogisticRegression(maxIter = 100, regParam = 0.05, labelCol='index').fit(train)
print (type(lr))
# Evaluate model based on auc ROC(default for binary classification)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

def testModel(model, validate = validate):
    pred = model.transform(validate)
    evaluator = BinaryClassificationEvaluator(labelCol = 'index')
    #return evaluator.evaluate(prod)
    return evaluator.evaluate(pred)
 
print ('AUC ROC of Logistic Regression model is: '+str(testModel(lr)))

<class 'pyspark.ml.classification.LogisticRegressionModel'>
AUC ROC of Logistic Regression model is: 0.535529715762274


In [35]:
#Decision Tree and Random Forest
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
 
dt = DecisionTreeClassifier(maxDepth = 3, labelCol ='index').fit(train)
rf = RandomForestClassifier(numTrees = 100, labelCol = 'index').fit(train)
 
models = {'LogisticRegression':lr,
          'DecistionTree':dt,
          'RandomForest':rf}

#modelPerf = {k:testModel(v) for k,v in models.iteritems()}
modelPerf = {k:testModel(v) for k,v in models.items()}
 
print (modelPerf)

{'RandomForest': 0.5428202288667405, 'DecistionTree': 0.5257672836576491, 'LogisticRegression': 0.535529715762274}
