In [62]:
from pyspark.sql import SparkSession
from pyspark import SparkContext


sc =SparkContext.getOrCreate()
#загружаем файлы
dfTrain = spark.read.format('csv').options(header='true', 
                                            inferSchema='true').load('/home/zorro/VM/BigData/Spark/titanic/train.csv')
dfTest = spark.read.format('csv').options(header='true', 
                                            inferSchema='true').load('/home/zorro/VM/BigData/Spark/titanic/test.csv')


In [63]:
#Выбираем влияющие на выживаемость показатели
train_df = dfTrain.select(['Survived', 'Pclass', 'Sex', 'Age', 'Fare', 'Embarked'])
#удалим поля null
train_df = train_df.na.drop()
train_df.describe().show()


+-------+------------------+------------------+------+-----------------+------------------+--------+
|summary|          Survived|            Pclass|   Sex|              Age|              Fare|Embarked|
+-------+------------------+------------------+------+-----------------+------------------+--------+
|  count|               712|               712|   712|              712|               712|     712|
|   mean|0.4044943820224719| 2.240168539325843|  null|29.64209269662921| 34.56725140449432|    null|
| stddev|0.4911389472541192|0.8368543166903446|  null|14.49293290032352|52.938648174710906|    null|
|    min|                 0|                 1|female|             0.42|               0.0|       C|
|    max|                 1|                 3|  male|             80.0|          512.3292|       S|
+-------+------------------+------------------+------+-----------------+------------------+--------+



In [68]:
# преобразуем текстовые столбцы в числовые для распознавания машинныи мозгом
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, OneHotEncoder
sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')
sex_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

embarked_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embarked_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

In [71]:
res = VectorAssembler(inputCols=['Pclass', 'SexVec', 'Age', 'Fare', 'EmbarkedVec'], outputCol='AllFeatures')
#используем логистическую регрессию
from pyspark.ml.classification import LogisticRegression
logistic_reg_model = LogisticRegression(featuresCol='AllFeatures', labelCol='Survived')
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[sex_indexer, embarked_indexer, sex_encoder, embarked_encoder, 
                            res, logistic_reg_model])

In [73]:
#Создаем модель для обучения
model_fill = pipeline.fit(train_df)

In [74]:
#Подготовка данных для тестового набора и результат будет из двух полей Survived по PassengerId
test_df = dfTest.select(['PassengerId', 'Pclass', 'Sex', 'Age', 'Fare', 'Embarked'])
#расчет средних значений
age_mean = test_df.agg({'Age': 'mean'}).first()[0]
fare_mean = test_df.agg({'Fare': 'mean'}).first()[0]
#проставим вместо null средние значения для того, чтобы все столбцы были равны по строкам
test_df = test_df.fillna(age_mean, subset=['Age'])
test_df = test_df.fillna(fare_mean, subset=['Fare'])
test_df.describe().show()


+-------+------------------+------------------+------+------------------+------------------+--------+
|summary|       PassengerId|            Pclass|   Sex|               Age|              Fare|Embarked|
+-------+------------------+------------------+------+------------------+------------------+--------+
|  count|               418|               418|   418|               418|               418|     418|
|   mean|            1100.5|2.2655502392344498|  null|30.272590361445815|  35.6271884892086|    null|
| stddev|120.81045760473994|0.8418375519640503|  null|12.634534168325061|55.840500479541056|    null|
|    min|               892|                 1|female|              0.17|               0.0|       C|
|    max|              1309|                 3|  male|              76.0|          512.3292|       S|
+-------+------------------+------------------+------+------------------+------------------+--------+



In [79]:
#Передаем тестовые данные в модель для анализап модели
results = model_fill.transform(test_df)

In [88]:
# Результаты по ID выживет или нет (1/0)
#out_results = results.select('PassengerId', 'prediction', 'probability')
out_results = results.select('PassengerId', 'prediction')
out_results.show()

+-----------+----------+
|PassengerId|prediction|
+-----------+----------+
|        892|       0.0|
|        893|       0.0|
|        894|       0.0|
|        895|       0.0|
|        896|       1.0|
|        897|       0.0|
|        898|       0.0|
|        899|       0.0|
|        900|       1.0|
|        901|       0.0|
|        902|       0.0|
|        903|       0.0|
|        904|       1.0|
|        905|       0.0|
|        906|       1.0|
|        907|       1.0|
|        908|       0.0|
|        909|       0.0|
|        910|       1.0|
|        911|       1.0|
+-----------+----------+
only showing top 20 rows



In [89]:
# Сохранение результата в CSV файл
import pandas as pd
out_results.toPandas().to_csv(r'/home/zorro/VM/BigData/Spark/titanic/titanic_results.csv')