In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64/'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/home/sony/Oleg/hse/big_data/spark-2.2.0-bin-hadoop2.7/'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [7]:
conf = (SparkConf().setMaster("local[8]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "2g"))

In [8]:
sc = SparkContext(conf=conf)

In [9]:
sqlcontext = SQLContext(sc)

In [10]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [11]:
from pyspark.ml.classification import GBTClassificationModel

In [12]:
data=[
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]
lrm=LinearRegressionWithSGD.train(sc.parallelize(data),iterations=10,initialWeights=np.array([1.0]))
print(lrm.predict(np.array([1.0])))



0.928638123469


In [13]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('/home/sony/Oleg/hse/big_data/train.csv')

In [14]:
df.head(3)

[Row(PassengerId='1', Survived='0', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S'),
 Row(PassengerId='2', Survived='1', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C'),
 Row(PassengerId='3', Survived='1', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S')]

### How to one-hot encode using build-in functions and remove None using udf

In [15]:
from pyspark.sql.functions import udf

from pyspark.sql import types



def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
print("New Embarked values = ", df.select('Embarked').distinct().collect())# print values of new column

from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)
print("\nNew df = ",df_t)

New Embarked values =  [Row(Embarked='Q'), Row(Embarked='C'), Row(Embarked='S'), Row(Embarked='')]

New df =  DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector]


### Hw udf feature

In [16]:
import re
def cabin_transform(x):
    """select unique letters from the feature
    """
    if x != None:
        uq_letters = set("".join(re.findall("[a-zA-Z]+", x)))
        uq_letters = sorted(list(uq_letters))
        return ''.join(uq_letters)
    else:
        return 'NA'

cabin_udf = udf(cabin_transform, types.StringType())
df_t = df_t.withColumn('Cabin', cabin_udf(df_t['Cabin']))
print("New Cabin values = ", df_t.select('Cabin').distinct().collect())

stringIndexerC = StringIndexer(inputCol="Cabin", outputCol="CabinIndex")
modelC = stringIndexerC.fit(df_t)
indexedC = modelC.transform(df_t)
encoderC = OneHotEncoder(inputCol="CabinIndex", outputCol="CabinVec")
df_t = encoderC.transform(indexedC)
print("\nNew df = ", df_t)

New Cabin values =  [Row(Cabin='F'), Row(Cabin='FG'), Row(Cabin='NA'), Row(Cabin='E'), Row(Cabin='T'), Row(Cabin='B'), Row(Cabin='D'), Row(Cabin='EF'), Row(Cabin='C'), Row(Cabin='A'), Row(Cabin='G')]

New df =  DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, EmbarkedIndex: double, EmbarkedVec: vector, CabinIndex: double, CabinVec: vector]


### Building data

In [19]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [30]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
            
            parse_age(r.Age) > 18,
            int(r.SibSp) > 1,
            int(r.Pclass) > 3,
            int(r.Parch) > 0,
            
        ] + list(r.EmbarkedVec.toArray())+ list(r.CabinVec.toArray())
    )

In [31]:
data = df_t.rdd.map(transf)

In [32]:
train, test = data.randomSplit([0.7, 0.3])

In [33]:
train.cache()

PythonRDD[178] at RDD at PythonRDD.scala:48

### Defining models

In [36]:
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.tree import GradientBoostedTrees,DecisionTree
from pyspark.mllib.tree import RandomForest, RandomForestModel

models = {}
models["Random_forest"] = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)
models["SVM"] = SVMWithSGD.train(train, regType='l2',regParam=0.01, iterations=200,intercept=True)
models["Gradient_Boosted_Trees"] = GradientBoostedTrees.trainClassifier(train,
                                                                      categoricalFeaturesInfo={}, numIterations=200)
models["Decision_tree"] = DecisionTree.trainClassifier(train, numClasses=2,categoricalFeaturesInfo={})

### Calculating metrics

In [37]:
def acc(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

def f1_score(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    tp = comp.map(lambda x: (x[0] == 1 and x[1] == 1)).sum()
    prec = tp/yhat.sum()
    recall = tp/y.sum()
    return 2 * prec*recall/(prec + recall)

In [38]:
for name,model in models.items():
    print("Testing "+name+":")
    print("accuracy = ",acc(model,test))
    print("f1 score = ",f1_score(model,test))

Testing Gradient_Boosted_Trees:
accuracy =  0.8065693430656934
f1 score =  0.7336683417085428
Testing Random_forest:
accuracy =  0.8065693430656934
f1 score =  0.7135135135135134
Testing SVM:
accuracy =  0.6313868613138687
f1 score =  0.12173913043478259
Testing Decision_tree:
accuracy =  0.781021897810219
f1 score =  0.7


In [None]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру