# Решение с помощью pandas

In [72]:
import pandas as pd

## 1. Получение данных

Датасет вот отсюда: http://archive.ics.uci.edu/ml/datasets/Adult

In [113]:
# датасет не включает имена колонок, поэтому задаем их вручную
column_names = [
    'age',
    'workclass',
    'fnlwgt',
    'education',
    'education-num',
    'marital-status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'capital-gain',
    'capital-loss',
    'hours-per-week',
    'native-country',
    'salary'
]

# загружаем из файлов обучающую и тестовую выборки
train_df = pd.read_csv('./adult.data', names=column_names)
test_df = pd.read_csv('./adult.test', names=column_names)

## 2. Подготовка данных

In [74]:
train_df['native-country'].value_counts()

Unnamed: 0_level_0,count
native-country,Unnamed: 1_level_1
United-States,29170
Mexico,643
?,583
Philippines,198
Germany,137
Canada,121
Puerto-Rico,114
El-Salvador,106
India,100
Cuba,95


In [75]:
test_df['native-country'].value_counts()

Unnamed: 0_level_0,count
native-country,Unnamed: 1_level_1
United-States,14662
Mexico,308
?,274
Philippines,97
Puerto-Rico,70
Germany,69
Canada,61
India,51
El-Salvador,49
China,47


In [76]:
# во всех строковых данных удаляем незначимые пробелы
train_df = train_df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)

#train_df_cp = train_df.copy()
# в тестовой выборке отсутствуют записи со значением 'Holand-Netherlands'
# в поле 'native-country', поэтому удаляем их также из обучающей выборки
#train_df_cp = train_df_cp.loc[train_df_cp['native-country'] != 'Holand-Netherlands']

# удалим запись с Голландией из обучающей выборки
train_df.drop(train_df[train_df['native-country'] == 'Holand-Netherlands'].index, inplace = True)


# удаляем незначимые пробелы в строках также и в тестовой выборке
test_df = test_df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
# в тестовой выборке есть лишние строки с пустыми данными для кросс-валидации
# удалим их
test_df.dropna(inplace = True)

In [77]:
print(train_df['salary'].value_counts())
print(test_df['salary'].value_counts())

salary
<=50K    24719
>50K      7841
Name: count, dtype: int64
salary
<=50K.    12435
>50K.      3846
Name: count, dtype: int64


In [78]:
print('Размер обучающей выборки: ', train_df.shape)
train_df.head()

Размер обучающей выборки:  (32560, 15)


Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [79]:
print('Размер тестовой выборки: ', test_df.shape)
test_df.head()

Размер тестовой выборки:  (16281, 15)


Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
1,25,Private,226802.0,11th,7.0,Never-married,Machine-op-inspct,Own-child,Black,Male,0.0,0.0,40.0,United-States,<=50K.
2,38,Private,89814.0,HS-grad,9.0,Married-civ-spouse,Farming-fishing,Husband,White,Male,0.0,0.0,50.0,United-States,<=50K.
3,28,Local-gov,336951.0,Assoc-acdm,12.0,Married-civ-spouse,Protective-serv,Husband,White,Male,0.0,0.0,40.0,United-States,>50K.
4,44,Private,160323.0,Some-college,10.0,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688.0,0.0,40.0,United-States,>50K.
5,18,?,103497.0,Some-college,10.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K.


In [80]:
# проверим, что пропусков нет в обучающей выборке
train_df.isnull().sum()

Unnamed: 0,0
age,0
workclass,0
fnlwgt,0
education,0
education-num,0
marital-status,0
occupation,0
relationship,0
race,0
sex,0


In [81]:
train_df.dtypes

Unnamed: 0,0
age,int64
workclass,object
fnlwgt,int64
education,object
education-num,int64
marital-status,object
occupation,object
relationship,object
race,object
sex,object


In [82]:
test_df.dtypes

Unnamed: 0,0
age,object
workclass,object
fnlwgt,float64
education,object
education-num,float64
marital-status,object
occupation,object
relationship,object
race,object
sex,object


In [83]:
# почему-то возраст в тестовой определился как строка, переведем в число
test_df.age = pd.to_numeric(test_df.age)
# также числовые поля, которые в обучающей были целочисленными, здесь стали
# отчего-то вещественными, это может привести к проблемам в дальнейшем
test_df.fnlwgt = test_df.fnlwgt.astype('int64')
test_df['education-num'] = test_df['education-num'].astype('int64')
test_df['capital-gain'] = test_df['capital-gain'].astype('int64')
test_df['capital-loss'] = test_df['capital-loss'].astype('int64')
test_df['hours-per-week'] = test_df['hours-per-week'].astype('int64')

In [84]:
test_df.dtypes

Unnamed: 0,0
age,int64
workclass,object
fnlwgt,int64
education,object
education-num,int64
marital-status,object
occupation,object
relationship,object
race,object
sex,object


In [85]:
# сохраним результат в csv, чтобы потом можно было легко использовать выборки
train_df.to_csv('train.csv', index=False, header=False)
test_df.to_csv('test.csv', index=False, header=False)

In [86]:
# для категориальных полей выведем количество категорий
train_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

Unnamed: 0,0
workclass,9
education,16
marital-status,7
occupation,15
relationship,6
race,5
sex,2
native-country,41
salary,2


In [87]:
# и для тестовой выборки тоже
test_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

Unnamed: 0,0
workclass,9
education,16
marital-status,7
occupation,15
relationship,6
race,5
sex,2
native-country,41
salary,2


In [88]:
# переводим все категориальные переменные в One Hot Encoding
# предварительно подставим более удобные для обработки значения в поле salary
train_df['salary'] = train_df['salary'].apply(lambda x: 0 if x == '<=50K' else 1)
test_df['salary'] = test_df['salary'].apply(lambda x: 0 if x == '<=50K.' else 1)

train_df = pd.get_dummies(train_df)
test_df = pd.get_dummies(test_df)

print('Размер признаков в обучающей выборке: ', train_df.shape)
print('Размер признаков в тестовой выборке: ', test_df.shape)

Размер признаков в обучающей выборке:  (32560, 108)
Размер признаков в тестовой выборке:  (16281, 108)


ИСПРАВЛЕНО: Здесь есть проблема с тем, что в тестовой выборке не хватает записей с Голландией в качестве родной страны. Используем соединение таблиц для выравнивания колонок обучающей и тестовой выборок.

In [89]:
# train_df, test_df = train_df.align(test_df, join = 'inner', axis = 1)

# print('Размер признаков в обучающей выборке: ', train_df.shape)
# print('Размер признаков в тестовой выборке: ', test_df.shape)

In [90]:
train_df.head()

Unnamed: 0,age,fnlwgt,education-num,capital-gain,capital-loss,hours-per-week,salary,workclass_?,workclass_Federal-gov,workclass_Local-gov,...,native-country_Portugal,native-country_Puerto-Rico,native-country_Scotland,native-country_South,native-country_Taiwan,native-country_Thailand,native-country_Trinadad&Tobago,native-country_United-States,native-country_Vietnam,native-country_Yugoslavia
0,39,77516,13,2174,0,40,0,False,False,False,...,False,False,False,False,False,False,False,True,False,False
1,50,83311,13,0,0,13,0,False,False,False,...,False,False,False,False,False,False,False,True,False,False
2,38,215646,9,0,0,40,0,False,False,False,...,False,False,False,False,False,False,False,True,False,False
3,53,234721,7,0,0,40,0,False,False,False,...,False,False,False,False,False,False,False,True,False,False
4,28,338409,13,0,0,40,0,False,False,False,...,False,False,False,False,False,False,False,False,False,False


In [91]:
# определимся с целевым признаком
X_train = train_df.drop('salary', axis=1)
y_train = train_df['salary']

X_test = test_df.drop('salary', axis=1)
y_test = test_df['salary']

In [92]:
# нормализуем данные
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range = (0, 1))
scaler.fit(X_train)

X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

In [93]:
X_test[:5]

array([[0.10958904, 0.14569009, 0.4       , 0.        , 0.        ,
        0.39795918, 0.        , 0.        , 0.        , 0.        ,
        1.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 1.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        1.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        1.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 1.        , 0.        , 0.        , 0.        ,
        0.        , 1.        , 0.        , 0.        , 0.        ,
        1.        , 0.        , 0.        , 0.        , 0.        ,
        0.        , 0.        , 0.        , 0.  

## 3. Анализ данных

In [94]:
# используем модель логистической регрессии для классификации
from sklearn.linear_model import LogisticRegression

lr = LogisticRegression()
lr.fit(X_train, y_train)

In [95]:
# прогноз на первых пяти примерах из тестовой выборки
lr.predict(X_test[:5])

array([0, 0, 0, 1, 0])

In [96]:
lr_pred = lr.predict(X_test)

# для оценки используем метрику accuracy (точность)
from sklearn.metrics import accuracy_score, precision_score, recall_score

print(accuracy_score(y_test, lr_pred))
print(precision_score(y_test, lr_pred))
print(recall_score(y_test, lr_pred))

0.8510533750998096
0.7275056035862952
0.5907436297451898


In [97]:
print(lr_pred)
len(lr_pred)

[0 0 0 ... 1 0 1]


16281

In [98]:
# используем метод k ближайших соседей для классификации
from sklearn.neighbors import KNeighborsClassifier

lr = KNeighborsClassifier(n_neighbors=7)
lr.fit(X_train, y_train)

lr_pred = lr.predict(X_test)

print(accuracy_score(y_test, lr_pred))
print(precision_score(y_test, lr_pred))
print(recall_score(y_test, lr_pred))

0.8288802899084823
0.6568047337278107
0.5772230889235569


## Метрики классификации

Обозначим один из классов позитивным, затем посчитаем из всех предсказанных объектов количество:

TP - количество верно предсказанных позитивных исходов

TN - количество верно предсказанных негативных исходов

FP - количество неверно предсказанных позитивных исходов

FN - количество неверно предсказанных негативных исходов

Метрика accuracy (переводят как процент верно предсказанных) равна $\frac{TP+TN}{TP+TN+FP+FN}.$

Метрика precision (точность) равна $\frac{TP}{TP + FP}.$ Интуитивно ее можно воспринимать как способность алгоритма не называть исход позитивным, если он негативный.

Метрика recall (полнота) равна $\frac{TP}{TP + FN}.$ Интуитивно это способность алгоритма верно угадать все позитивные исходы.

In [99]:
lr_pred_train = lr.predict(X_train)

print(accuracy_score(y_train, lr_pred_train))
print(precision_score(y_train, lr_pred_train))
print(recall_score(y_train, lr_pred_train))

0.8671990171990172
0.7552620119030338
0.6635633210049738


# Решение с помощью PySpark

Сюда необходимо дописать аналогичный код для загрузки и предобработки данных с использованием датафреймов PySpark

In [100]:
!pip install pyspark



In [105]:
from pyspark import SparkConf, SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

Для тестирования результат можно запустить код ниже, который обучает модель логистической регрессии.

In [117]:
spark = SparkSession.builder.appName("Logistic-reg-adult").getOrCreate()

In [126]:
# Определение имен колонок
column_names = [
    'age',
    'workclass',
    'fnlwgt',
    'education',
    'education-num',
    'marital-status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'capital-gain',
    'capital-loss',
    'hours-per-week',
    'native-country',
    'salary'
]

# Загрузка данных
train_df = spark.read.csv("adult.data", header=False, inferSchema=True, sep=",").toDF(*column_names)
train_df

DataFrame[age: int, workclass: string, fnlwgt: double, education: string, education-num: double, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: double, capital-loss: double, hours-per-week: double, native-country: string, salary: string]

In [127]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType


int_columns = ["fnlwgt", "education-num", "capital-gain", "capital-loss", "hours-per-week"]
for col_name in int_columns:
    train_df = train_df.withColumn(col_name, col(col_name).cast(IntegerType()))

In [130]:
train_df = train_df.filter(train_df["native-country"] != "Holand-Netherlands")

In [136]:
# Identify all string columns
string_columns = [field.name for field in train_df.schema.fields if isinstance(field.dataType, StringType)]

# Create a StringIndexer for each string column
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in string_columns]

# Create and fit the pipeline
pipeline = Pipeline(stages=indexers)
model = pipeline.fit(train_df)
indexed_df = model.transform(train_df)

# Replace original string columns with indexed integer values
indexed_df = indexed_df.select([
    col(c) if c not in string_columns
    else col(f"{c}_index").cast("integer").alias(c)
    for c in train_df.columns
])
indexed_df.show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native-country|salary|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
| 39|        4| 77516|        2|           13|             1|         3|           1|   0|  0|        2174|           0|            40|             0|     0|
| 50|        1| 83311|        2|           13|             0|         2|           0|   0|  0|           0|           0|            13|             0|     0|
| 38|        0|215646|        0|            9|             2|         9|           1|   0|  0|           0|           0|            40|             0|     0|
| 53|        0|234721|        5|            7|      

In [137]:
indexed_df

DataFrame[age: int, workclass: int, fnlwgt: int, education: int, education-num: int, marital-status: int, occupation: int, relationship: int, race: int, sex: int, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: int, salary: int]

In [141]:
assembler = VectorAssembler(inputCols=column_names, outputCol="features")

indexed_df = assembler.transform(indexed_df)
# обратим внимание, что используемая здесь для обучения модель логистической регрессии
# взята из библиотеки MLLib

lr = LogisticRegression(featuresCol="features", labelCol="salary")
model = lr.fit(indexed_df)

In [151]:
pred = model.transform(indexed_df)
pred

DataFrame[age: int, workclass: int, fnlwgt: int, education: int, education-num: int, marital-status: int, occupation: int, relationship: int, race: int, sex: int, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: int, salary: int, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [146]:
summary = model.summary

In [147]:
summary.accuracy

1.0

In [148]:
summary.precisionByLabel

[1.0, 1.0]

In [149]:
summary.recallByLabel

[1.0, 1.0]

In [150]:
summary.fMeasureByLabel()

[1.0, 1.0]