In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-1.8.0-openjdk-amd64/'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/home/mdldml/spark-2.2.0-bin-hadoop2.7/'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext

In [9]:
# reduced max resources because i don't have much RAM
conf = (SparkConf().setMaster("local[4]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "1g"))

In [10]:
sc = SparkContext(conf=conf)

In [11]:
sqlcontext = SQLContext(sc)

In [12]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD
import numpy as np

In [13]:
from pyspark.ml.classification import GBTClassificationModel

In [14]:
data=[
    LabeledPoint(0.0,[0.0]),
    LabeledPoint(1.0,[1.0]),
    LabeledPoint(3.0,[2.0]),
    LabeledPoint(2.0,[3.0])
]
lrm=LinearRegressionWithSGD.train(sc.parallelize(data),iterations=10,initialWeights=np.array([1.0]))
print(lrm.predict(np.array([1.0])))



0.928638123469


In [206]:
df = sqlcontext.read.format(
    'com.databricks.spark.csv').options(
    header='true').load('train.csv')

In [17]:
df.head(3)

[Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=None, Embarked=u'S'),
 Row(PassengerId=u'2', Survived=u'1', Pclass=u'1', Name=u'Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex=u'female', Age=u'38', SibSp=u'1', Parch=u'0', Ticket=u'PC 17599', Fare=u'71.2833', Cabin=u'C85', Embarked=u'C'),
 Row(PassengerId=u'3', Survived=u'1', Pclass=u'3', Name=u'Heikkinen, Miss. Laina', Sex=u'female', Age=u'26', SibSp=u'0', Parch=u'0', Ticket=u'STON/O2. 3101282', Fare=u'7.925', Cabin=None, Embarked=u'S')]

In [207]:
from pyspark.sql.functions import udf

from pyspark.sql import types



def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))
df.select('Embarked').distinct().collect()

[Row(Embarked=u'Q'), Row(Embarked=u'C'), Row(Embarked=u'S'), Row(Embarked=u'')]

In [208]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df = encoder.transform(indexed)


In [174]:
df

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, Title: string, TitleIndex: double, TitleVec: vector, EmbarkedIndex: double, EmbarkedVec: vector]

In [209]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [210]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [211]:
data = df_t.rdd.map(transf)

In [212]:
train, test = data.randomSplit([0.7, 0.3], seed=42)

In [213]:
train.cache()

PythonRDD[1896] at RDD at PythonRDD.scala:48

In [26]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)

In [233]:
def acc(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

In [115]:
acc(rfc, test)

0.8076923076923077

In [None]:
# добавить 5 новых фичей
# 3 фичи высчитываются из имеющихся
# хотя бы одна использует udf

# попробовать 3 новых модели

# f1 меру

In [182]:
from pyspark.sql.functions import col, when

Дискретизируем стоимость проезда:

In [214]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import DoubleType

df = df.withColumn('FareNumeric', df.Fare.cast(DoubleType()))
bucketizer = Bucketizer(splits=[-float("inf"), 0.0, 10.0, 25.0, 50.0, 100.0, 250.0, float("inf")], inputCol="FareNumeric", outputCol="FareBucket")

df = bucketizer.transform(df)


Также добавим вместо количества братьев/сестер и родителей две новых фичи: плыл ли человек на корабле в одиночку, и была ли у него большая (4 человека или больше) семья.

In [215]:
def sails_alone(row):
    return row.SibSp + row.Parch > 0

def has_big_family(row):
    return row.SibSp + row.Parch >= 4

Теперь возраст, вместо числа выделим несколько отдельных фич: был ли пассажир младенцем, несовершеннолетним или относительно молодым (до 40 лет) человеком:

In [216]:
def is_baby(str_age):
    try:
        return float(str_age) < 3
    except:
        return False

def is_child(str_age):
    try:
        return float(str_age) < 18
    except:
        return False

def is_young(str_age):
    try:
        return float(str_age) < 40
    except:
        return False    

Теперь извлечем из имен обращения:

In [217]:
def parse_title(name):
    try:
        return name.split(", ")[1].split(".")[0]
    except:
        return ''

title_udf = udf(parse_title, types.StringType())
df = df.withColumn('Title', title_udf(df['Name']))
df.select('Title').groupBy('Title').count().collect()

[Row(Title=u'Don', count=1),
 Row(Title=u'Miss', count=182),
 Row(Title=u'Col', count=2),
 Row(Title=u'Rev', count=6),
 Row(Title=u'Lady', count=1),
 Row(Title=u'Master', count=40),
 Row(Title=u'Mme', count=1),
 Row(Title=u'Capt', count=1),
 Row(Title=u'Mr', count=517),
 Row(Title=u'Dr', count=7),
 Row(Title=u'Mrs', count=125),
 Row(Title=u'Sir', count=1),
 Row(Title=u'Jonkheer', count=1),
 Row(Title=u'Mlle', count=2),
 Row(Title=u'Major', count=2),
 Row(Title=u'Ms', count=1),
 Row(Title=u'the Countess', count=1)]

Будем использовать только 4 самых популярных, остальное добавим в пятую группу:

In [218]:
stringIndexer = StringIndexer(inputCol="Title", outputCol="TitleIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
# удалим редко встречающиеся титулы:
indexed = indexed.withColumn('TitleIndex', when(col("TitleIndex") <= 3, col("TitleIndex")).otherwise(4.0))
encoder = OneHotEncoder(inputCol="TitleIndex", outputCol="TitleVec")
df = encoder.transform(indexed)

In [220]:
df_train, df_test = df.randomSplit([0.7, 0.3], seed=42)

In [221]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            # уберем старые фичи, на основе которых были созданы новые
            #float(r.Fare),
            #int(r.SibSp),
            #int(r.Parch),
            #parse_age(r.Age),
            
            
        ] 
        + list(r.EmbarkedVec.toArray())
        
        +  # добавленные фичи
        [
            sails_alone(r),
            
            has_big_family(r),
            
            is_baby(r.Age),
            is_child(r.Age),
            is_young(r.Age),
            
            r.FareBucket,
        ]
        + list(r.TitleVec.toArray())
    )

In [222]:
train, test = df_train.rdd.map(transf), df_test.rdd.map(transf)
rfc = RandomForest.trainClassifier(train, numClasses=2,
                             categoricalFeaturesInfo={},
                            numTrees=100)
acc(rfc, test)

0.8302583025830258

Попробуем использовать другие классификаторы. Логистическая регрессия:

In [225]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

In [228]:
lrc = LogisticRegressionWithSGD.train(train)

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


In [234]:
acc(lrc, test)

0.8081180811808117

SVM:

In [230]:
from pyspark.mllib.classification import SVMWithSGD

In [231]:
svmc = SVMWithSGD.train(train)

In [235]:
acc(svmc, test)

0.8154981549815499

Наивный байесовский:

In [236]:
from pyspark.mllib.classification import NaiveBayes

In [237]:
nbc = NaiveBayes.train(train)

In [238]:
acc(nbc, test)

0.8302583025830258

Вычислим теперь также F1-меру для всех классификаторов:

In [268]:
def f1(m, test):
    values = test.map(lambda x: x.features)
    yhat = m.predict(values).map(lambda x: float(x))
    y = test.map(lambda x: x.label)    
    comp = yhat.zip(y)
    
    tp = comp.map(lambda x: 1 if x[0] == 1 and x[1] == 1 else 0).sum()
    fp = comp.map(lambda x: 1 if x[0] == 1 and x[1] == 0 else 0).sum()
    fn = comp.map(lambda x: 1 if x[0] == 0 and x[1] == 1 else 0).sum()

    precision = 1.0 * tp / (tp + fp)
    recall    = 1.0 * tp / (tp + fn)
    
    return 2.0 * precision * recall / (precision + recall)

In [269]:
f1(rfc, test)

0.7578947368421053

In [270]:
f1(lrc, test)

0.7142857142857143

In [271]:
f1(svmc, test)

0.7311827956989247

In [272]:
f1(nbc, test)

0.7628865979381443