In [3]:
%autosave 300

Autosaving every 300 seconds


In [5]:
import seaborn as sns
import numpy as np
import pandas as pd
import collections #for frequeny counting
import findspark
findspark.init("./spark2/")

import pyspark
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql.functions import lit #create columns of *literal* value
from pyspark.sql.functions import col #Returns columns given on column name
from pyspark.ml.feature import StringIndexer #label encoding
from pyspark.ml import Pipeline

sc = pyspark.SparkContext(appName="helloworld")

In [6]:
import matplotlib.pyplot as plt
import warnings #for filtering warnings

#constants
%matplotlib inline
sns.set_style("dark")
#to ignore warnings in output
warnings.filterwarnings('ignore')
#global information settings
sigLev = 2 #three significant digits
percentMul = 100 #for percentage multiplication
figWidth = figHeight = 8

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.regression import *
from pyspark.ml.feature import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import random
import shelve
from sklearn import linear_model
#from sklearn.model_selection import cross_val_score


In [9]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [12]:
trainDF = spark.read.csv("data/train.csv", header="true")
testDF = spark.read.csv("data/test.csv", header="true")

In [13]:
## Add Survived column to test, and dataset name as a column
trainDF = trainDF.withColumn('Mark', lit('train'))
testDF = (testDF.withColumn('loss',lit(0))
                .withColumn('Mark', lit('test')))
testDF = testDF[trainDF.columns]

## Append Test data to Train data
df = trainDF.unionAll(testDF)

In [14]:
# Let's define function
def to_anytype(df, colnames, typename):
    for colname in colnames:
        df = df.withColumn("tmp", df[colname].cast(typename)) \
        .drop(colname) \
        .withColumnRenamed("tmp", colname)
    return(df)


In [95]:
numVars = ['id','cont1','cont2','cont3','cont4','cont5','cont6','cont7','cont8','cont9','cont10','cont11',
             'cont12','cont13','cont14','loss']

In [96]:
catVars = ['cat1','cat9','cat10','cat11','cat12',
              'cat13','cat14','cat15','cat16','cat17','cat18','cat19','cat20','cat21','cat22','cat23',
              'cat24','cat25','cat26','cat27','cat28','cat29','cat30','cat31','cat32','cat33','cat34',
              'cat35','cat36','cat37','cat38','cat39','cat40','cat41','cat42','cat43','cat44','cat45',
              'cat46','cat47','cat48','cat49','cat50','cat51','cat52','cat53','cat54','cat55','cat56',
              'cat57','cat58','cat59','cat60','cat61','cat62','cat63','cat64','cat65','cat66','cat67',
              'cat68','cat69','cat70','cat71','cat72','cat73','cat74','cat75','cat76','cat77','cat78',
              'cat79','cat80','cat81','cat82','cat83','cat84','cat85','cat86','cat87','cat88'
              ]

In [97]:
df = to_anytype(df, numVars, "float")

In [98]:
# make use of pipeline to index all categorical variables
def indexer(df, col):
    si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
    return si
 
indexers = [indexer(df, col) for col in catVars]

In [99]:
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df).transform(df)

In [101]:
df_indexed.select('cat44','cat44_indexed').show(3)

+-----+-------------+
|cat44|cat44_indexed|
+-----+-------------+
|    A|          0.0|
|    A|          0.0|
|    A|          0.0|
+-----+-------------+
only showing top 3 rows



In [102]:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

In [103]:
catVarsIndexed = [i + '_indexed' for i in catVars]

In [104]:
featuresCol = numVars + catVarsIndexed

In [105]:
featuresCol.remove('id')
featuresCol.remove('loss')
#featuresCol

['cont1',
 'cont2',
 'cont3',
 'cont4',
 'cont5',
 'cont6',
 'cont7',
 'cont8',
 'cont9',
 'cont10',
 'cont11',
 'cont12',
 'cont13',
 'cont14',
 'cat1_indexed',
 'cat9_indexed',
 'cat10_indexed',
 'cat11_indexed',
 'cat12_indexed',
 'cat13_indexed',
 'cat14_indexed',
 'cat15_indexed',
 'cat16_indexed',
 'cat17_indexed',
 'cat18_indexed',
 'cat19_indexed',
 'cat20_indexed',
 'cat21_indexed',
 'cat22_indexed',
 'cat23_indexed',
 'cat24_indexed',
 'cat25_indexed',
 'cat26_indexed',
 'cat27_indexed',
 'cat28_indexed',
 'cat29_indexed',
 'cat30_indexed',
 'cat31_indexed',
 'cat32_indexed',
 'cat33_indexed',
 'cat34_indexed',
 'cat35_indexed',
 'cat36_indexed',
 'cat37_indexed',
 'cat38_indexed',
 'cat39_indexed',
 'cat40_indexed',
 'cat41_indexed',
 'cat42_indexed',
 'cat43_indexed',
 'cat44_indexed',
 'cat45_indexed',
 'cat46_indexed',
 'cat47_indexed',
 'cat48_indexed',
 'cat49_indexed',
 'cat50_indexed',
 'cat51_indexed',
 'cat52_indexed',
 'cat53_indexed',
 'cat54_indexed',
 'cat55_ind

In [106]:
labelCol = ['Mark','loss']
labelCol

['Mark', 'loss']

In [107]:
row = Row('mark','label','features') 
row

<Row(mark, label, features)>

In [108]:
df_indexed = df_indexed[labelCol + featuresCol]
#df_indexed

DataFrame[Mark: string, loss: float, cont1: float, cont2: float, cont3: float, cont4: float, cont5: float, cont6: float, cont7: float, cont8: float, cont9: float, cont10: float, cont11: float, cont12: float, cont13: float, cont14: float, cat1_indexed: double, cat9_indexed: double, cat10_indexed: double, cat11_indexed: double, cat12_indexed: double, cat13_indexed: double, cat14_indexed: double, cat15_indexed: double, cat16_indexed: double, cat17_indexed: double, cat18_indexed: double, cat19_indexed: double, cat20_indexed: double, cat21_indexed: double, cat22_indexed: double, cat23_indexed: double, cat24_indexed: double, cat25_indexed: double, cat26_indexed: double, cat27_indexed: double, cat28_indexed: double, cat29_indexed: double, cat30_indexed: double, cat31_indexed: double, cat32_indexed: double, cat33_indexed: double, cat34_indexed: double, cat35_indexed: double, cat36_indexed: double, cat37_indexed: double, cat38_indexed: double, cat39_indexed: double, cat40_indexed: double, cat41

In [109]:
lf = df_indexed.rdd.map(lambda r: (row(r[0], r[1],DenseVector(r[2:])))).toDF()
lf.show()

+-----+------------------+--------------------+
| mark|             label|            features|
+-----+------------------+--------------------+
|train| 2213.179931640625|[0.72630000114440...|
|train|1283.5999755859375|[0.33051401376724...|
|train| 3005.090087890625|[0.26184099912643...|
|train| 939.8499755859375|[0.32159399986267...|
|train|  2763.85009765625|[0.27320399880409...|
|train|   5142.8701171875|[0.54667001962661...|
|train| 1132.219970703125|[0.47144699096679...|
|train|           3585.75|[0.82659101486206...|
|train|  10280.2001953125|[0.33051401376724...|
|train|     6184.58984375|[0.72630000114440...|
|train|  6396.85009765625|[0.49606299400329...|
|train|  5965.72998046875|[0.52069801092147...|
|train| 1193.050048828125|[0.32159399986267...|
|train|  1071.77001953125|[0.35135799646377...|
|train| 585.1799926757812|[0.89433300495147...|
|train| 1395.449951171875|[0.47289198637008...|
|train|  6609.31982421875|[0.42416200041770...|
|train| 2658.699951171875|[0.83474701642

In [110]:
# index label
# convert numeric label to categorical, which is required by
# decisionTree and randomForest
model = StringIndexer(inputCol = 'label', outputCol='index').fit(lf)
lf = model.transform(lf)
 
lf.show(3)

+-----+------------------+--------------------+--------+
| mark|             label|            features|   index|
+-----+------------------+--------------------+--------+
|train| 2213.179931640625|[0.72630000114440...| 22259.0|
|train|1283.5999755859375|[0.33051401376724...|132074.0|
|train| 3005.090087890625|[0.26184099912643...|106882.0|
+-----+------------------+--------------------+--------+
only showing top 3 rows



In [111]:
lf.printSchema()

root
 |-- mark: string (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- index: double (nullable = true)



In [112]:
train = lf.where(lf.mark =='train')
test = lf.where(lf.mark =='test')

In [65]:
# random split further to get train/validate
#train, validate = train.randomSplit([0.7,0.3], seed =121)

In [113]:
# Evaluate model based on auc ROC(default for binary classification)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
def testModel(model, validate = validate):
    pred = model.transform(validate)
    evaluator = BinaryClassificationEvaluator(labelCol = 'label')
    return evaluator.evaluate(pred)

In [114]:
from pyspark.ml.regression import RandomForestRegressor

In [115]:
rf = RandomForestRegressor(numTrees=20, maxDepth=8, maxMemoryInMB=512, seed=142, labelCol = 'label')

In [116]:
model = rf.fit(train)

In [117]:
model.featureImportances.values,model.featureImportances.indices

(array([  1.36562422e-02,   5.69502299e-02,   1.58322610e-02,
          1.14801508e-02,   4.02885024e-03,   9.06710967e-03,
          4.39887957e-02,   5.19509966e-03,   5.08359114e-03,
          8.50822868e-03,   2.12329725e-02,   2.35456355e-02,
          4.90439090e-03,   7.69191045e-03,   1.42280533e-02,
          4.54697969e-03,   3.26044916e-02,   9.07192315e-03,
          5.99028670e-02,   2.41502350e-03,   9.79931364e-04,
          6.56895658e-07,   5.93005566e-04,   2.61677691e-04,
          6.11494424e-05,   4.61614237e-04,   7.83558759e-05,
          2.02807333e-05,   1.29534728e-05,   1.82964431e-03,
          4.21814831e-04,   1.66551204e-03,   2.46511485e-03,
          2.71117498e-03,   6.61502071e-04,   3.02149654e-04,
          1.92254325e-04,   7.68516017e-04,   1.91066915e-04,
          3.55872148e-04,   1.40022452e-04,   5.11584636e-05,
          2.49484467e-03,   2.46245495e-03,   5.96703856e-03,
          6.66929953e-04,   1.64598187e-03,   5.42290851e-04,
        

In [82]:
#model.transform(validate).head()

Row(mark='train', label=36.0, features=DenseVector([0.5851, 0.4222, 0.5714, 0.5547, 0.5942, 0.4992, 0.4438, 0.4352, 0.662, 0.5772, 0.644, 0.6309, 0.3543, 0.4073, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 2.0, 0.0, 0.0, 8.0, 0.0, 0.0, 0.0, 1.0, 2.0, 0.0, 0.0, 0.0, 0.0, 4.0, 3.0, 17.0, 24.0, 0.0, 2.0, 0.0]), index=97543.0, prediction=2448.4054668797735)

In [118]:
prediction_test = model.transform(test)
prediction_test.head()

Row(mark='test', label=0.0, features=DenseVector([0.3216, 0.2991, 0.2469, 0.4029, 0.2811, 0.4666, 0.3177, 0.6123, 0.3437, 0.3802, 0.3777, 0.3699, 0.7041, 0.3926, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0]), index=0.0, prediction=1904.7742873124164)

In [122]:
prediction_test.createOrReplaceTempView('pred_test')
testDF.createOrReplaceTempView("test")

In [123]:
prediction_col = spark.sql(
    "SELECT prediction from pred_test")
id_col = spark.sql(
    "SELECT id from test")

In [124]:
id = id_col.toPandas().values.T.tolist()
prediction = prediction_col.toPandas().values.T.tolist()

In [125]:
result = pd.DataFrame({"id" : id[0], "loss" : prediction[0]})

In [126]:
result.to_csv("submission_s2.csv", index=False)

### GBT

In [127]:
from pyspark.ml.regression import GeneralizedLinearRegression

from pyspark.ml.regression import GBTRegressor

In [128]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [134]:
gbt = GBTRegressor(labelCol = 'label', maxDepth=8, maxBins=15,stepSize=0.8, maxIter=5,  maxMemoryInMB=512)

In [135]:
model = gbt.fit(train)

In [136]:
model.transform(test).head().prediction

2155.8509524294614

In [137]:
prediction_test = model.transform(test)

In [140]:
prediction_test.createOrReplaceTempView('pred_test')
testDF.createOrReplaceTempView("test")

In [141]:
prediction_col = spark.sql(
    "SELECT prediction from pred_test")
id_col = spark.sql(
    "SELECT id from test")

In [142]:
id = id_col.toPandas().values.T.tolist()
prediction = prediction_col.toPandas().values.T.tolist()

In [143]:
result = pd.DataFrame({"id" : id[0], "loss" : prediction[0]})

In [144]:
result.to_csv("submission_s3.csv", index=False)

In [138]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="index", predictionCol="prediction", metricName="accuracy")


In [139]:
accuracy = evaluator.evaluate(prediction_test)
print("Test Error = %g" % (accuracy))

Test Error = 0


In [56]:
prediction_test.createOrReplaceTempView('pred_test')
testDF.createOrReplaceTempView("test")

In [57]:
prediction_col = spark.sql(
    "SELECT prediction from pred_test")
id_col = spark.sql(
    "SELECT id from test")

In [58]:
id = id_col.toPandas().values.T.tolist()
prediction = prediction_col.toPandas().values.T.tolist()

In [59]:
result = pd.DataFrame({"id" : id[0], "loss" : prediction[0]})

In [60]:
result.to_csv("submission_gbt.csv", index=False)