# Курсовой проект по курсу "Потоковая обработка данных"

#### Описание задачи: Спрогнозировать, будет ли кандидат будет работать в компании-нанимателе.

Ссылка на набор даных: https://www.kaggle.com/arashnic/hr-analytics-job-change-of-data-scientists

**Описание датасета:**
Компания, занимающаяся Big Data и Data Science, хочет нанять специалистов по данным из числа людей, успешно прошедших курсы, проводимые компанией. Многие люди записываются на обучение. Компания хочет знать, кто из этих кандидатов действительно хочет работать в компании после обучения или ищет новую работу, потому что это помогает снизить затраты и время, а также качество обучения или планирование курсов и категоризацию кандидатов . Информация, связанная с демографическими данными, образованием, опытом, поступает от кандидатов при регистрации и зачислении.

Вся выборка разделена на обучающую и тестовую. Цель (`target`) не включена в тестовый датасет.

**NB!** Набор данных несбалансирован. Большинство функций являются категориальными (номинальные, порядковые, двоичные), некоторые имеют высокую мощность.

Признаки:
- enrollee_id: уникальный идентификатор кандидата
- city: Код города
- city_ development_index: индекс развития города (в масштабе)
- gender: Пол кандидата
- relvent_experience: Соответствующий опыт кандидата
- enrolled_university: Тип зачисленных университетских курсов, если таковые имеются.
- education_level: Уровень образования кандидата
- major_discipline: Обучение основной дисциплине кандидата
- experience: Кандидатский общий стаж в годах
- company_size: Количество сотрудников в компании текущего работодателя
- company_type: Тип текущего работодателя
- last_new_job: разница в годах между предыдущей работой и текущей работой
- training_hours: завершенные часы обучения
- tagret: 0 - Не ищу смены работы, 1 - Ищу смену работы

Данная задача является задачей бинарной классификации

#### Описание работы

Цель состоит в демонстрации возможностей совместной работы spark, kafka и cassandra на примере задачи по предсаказанию (см.выше).

Обучаем на тренировочном набое данных (далее - train) модель и затем делаем предсказания на на потоке данных, которые предварительно аггрегируются в таблицей из cassandra.

## Подготовка файлов перед загрузкой на сервер

#### Импорт библиотек

In [1]:
import pandas as pd
import numpy as np

### EDA

In [11]:
data = pd.read_csv('./data/aug_train.csv')

In [12]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19158 entries, 0 to 19157
Data columns (total 14 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   enrollee_id             19158 non-null  int64  
 1   city                    19158 non-null  object 
 2   city_development_index  19158 non-null  float64
 3   gender                  14650 non-null  object 
 4   relevent_experience     19158 non-null  object 
 5   enrolled_university     18772 non-null  object 
 6   education_level         18698 non-null  object 
 7   major_discipline        16345 non-null  object 
 8   experience              19093 non-null  object 
 9   company_size            13220 non-null  object 
 10  company_type            13018 non-null  object 
 11  last_new_job            18735 non-null  object 
 12  training_hours          19158 non-null  int64  
 13  target                  19158 non-null  float64
dtypes: float64(2), int64(2), object(10)
me

In [13]:
data.head(3)

Unnamed: 0,enrollee_id,city,city_development_index,gender,relevent_experience,enrolled_university,education_level,major_discipline,experience,company_size,company_type,last_new_job,training_hours,target
0,8949,city_103,0.92,Male,Has relevent experience,no_enrollment,Graduate,STEM,>20,,,1,36,1.0
1,29725,city_40,0.776,Male,No relevent experience,no_enrollment,Graduate,STEM,15,50-99,Pvt Ltd,>4,47,0.0
2,11561,city_21,0.624,,No relevent experience,Full time course,Graduate,STEM,5,,,never,83,0.0


In [36]:
numerical_features = data.select_dtypes(include=[np.number]).drop('target',
                                                                  axis=1)
categorical_features = data.select_dtypes(include=[np.object])

pd.DataFrame({'all features': len(data.columns.tolist()[2:]),
              'numeric features': numerical_features.shape[1],
              'categorical features': categorical_features.shape[1]}, index=['count'])

Unnamed: 0,all features,numeric features,categorical features
count,12,3,10


In [41]:
print('numerical_features: ', numerical_features.columns.tolist(), '\n')
print('categorical_features: ', categorical_features.columns.tolist())

numerical_features:  ['enrollee_id', 'city_development_index', 'training_hours'] 

categorical_features:  ['city', 'gender', 'relevent_experience', 'enrolled_university', 'education_level', 'major_discipline', 'experience', 'company_size', 'company_type', 'last_new_job']


### Подготовка для загрузки

In [42]:
train = pd.read_csv('./data/aug_train.csv')
test = pd.read_csv('./data/aug_test.csv')

#### Для кассандры объединяем train и test

In [43]:
aug_all_for_cassandra = train.append(test)

#### Для кафки выделяем из test уникальные `enrollee_id` и добавляем к ним синтетический столбец с `datetime`, имитирующий поступление данных по времени.

In [44]:
enrollee_id_for_kafka = pd.DataFrame(test['enrollee_id'])
start_date = pd.to_datetime('1/4/2021')
enrollee_id_for_kafka['datetime'] = pd \
    .to_timedelta(enrollee_id_for_kafka.index - 1, unit='H') + start_date
enrollee_id_for_kafka.head(n=3)

Unnamed: 0,enrollee_id,datetime
0,32403,2021-01-03 23:00:00
1,9858,2021-01-04 00:00:00
2,31806,2021-01-04 01:00:00


#### Сохранение датафреймов в файлы

In [32]:
aug_all_for_cassandra.to_csv('./data/aug_all_for_cassandra.csv')
enrollee_id_for_kafka.to_csv('./data/enrollee_id_for_kafka.csv')

### Загружаем необходимые файлы на удаленный сервер с помошью mobaxtern и копируем файл на hdfs

```bash
hdfs dfs -put for_stream/aug_train.csv input_csv_for_stream
hdfs dfs -put for_stream/aug_all_for_cassandra.csv input_csv_for_stream
hdfs dfs -put for_stream/enrollee_id_for_kafka.csv input_csv_for_stream

hdfs dfs -ls input_csv_for_stream
```

![Title](img/2021-04-05_223454.jpg)

## Загрузка данных в kafka и cassandra

### Загрузка в kafka

#### Создадим новый топик `enrollee_id`

```bash
/kafka/bin/kafka-topics.sh \
    --create --topic enrollee_id \
    --zookeeper 10.0.0.6:2181 \
    --partitions 1 \
    --replication-factor 2 \
    --config retention.ms=-1
```

#### Запись в kafka статичного DataFrame

```python
# %load aug_to_kafka.py
"""
export SPARK_KAFKA_VERSION=0.10
/spark2.4/bin/pyspark \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 \
    --driver-memory 512m \
    --num-executors 1 \
    --executor-memory 512m \
    --master local[1]
"""

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField

spark = SparkSession.builder.appName("gogin_spark").getOrCreate()

kafka_brokers = "10.0.0.6:6667"

enrollee_schema = StructType([StructField('c0', StringType()),
                              StructField('enrollee_id', StringType()),
                              StructField('datetime', StringType())])

enrollee_id_df = spark.read \
    .options(delimiter=',', schema=enrollee_schema, header=True) \
    .csv(path="input_csv_for_stream/enrollee_id_for_kafka.csv") \
    .select('enrollee_id', 'datetime')

enrollee_id_df \
    .selectExpr("CAST(null AS STRING) as key", "CAST(to_json(struct(*)) AS STRING) as value") \
    .write \
    .format(source="kafka") \
    .option("topic", "enrollee_id") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("checkpointLocation", "checkpoints/enrollee_id_for_kafka") \
    .save()
```

#### Проверка записанныз данных

```python
def console_output(df, freq, truncate=True, numRows=20):
    """
    Вывод на консоль вместо show()
    :param numRows: number of lines to display
    :param df: spark DataFrame
    :param freq: frequency in seconds
    :param truncate: truncate values, bool
    """
    return df.writeStream \
        .format(source="console") \
        .trigger(processingTime='%s seconds' % freq) \
        .options(truncate=truncate, numRows=numRows) \
        .start()


# Проверка, прочитали потихоньку
raw_data = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", "enrollee_id") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "1") \
    .load() \
    .select(F.col("value").cast("String"), "offset") \
    .select(F.from_json(F.col("value"), enrollee_schema).alias("value"), "offset") \
    .select("value.*", "offset") \
    .select('enrollee_id', 'datetime', 'offset')

out = console_output(df=raw_data, freq=5, truncate=False)
out.stop()
```

<img src="img/2021-04-05_224640.jpg" alt="drawing" width="700"/>

### Загрузка в cassandra

#### Создадим таблицу в кассандре

```bash
/cassandra/bin/cqlsh  # (подключиться к касандре через консоль)

# создать схему
CREATE  KEYSPACE IF NOT EXISTS koryagin 
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1 };

USE koryagin;

# создание таблицы
CREATE TABLE IF NOT EXISTS aug_all (
    enrollee_id int,
    city text,
    city_development_index double,
    gender text,
    relevent_experience text,
    enrolled_university text,
    education_level text,
    major_discipline text,
    experience text,
    company_size text,
    company_type text,
    last_new_job text,
    training_hours int,
    target double,
    primary key (enrollee_id));
```

#### Запись dataframe в cassandra

```python
# %load aug_to_cassandra.py
"""
export SPARK_KAFKA_VERSION=0.10
/spark2.4/bin/pyspark \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,com.datastax.spark:spark-cassandra-connector_2.11:2.4.2 \
    --driver-memory 512m \
    --driver-cores 1 \
    --master local[1]
"""

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("gogin_spark").getOrCreate()

# для начала готовим DataFrame
aug_all_df = spark.read \
    .options(delimiter=',', inferschema=True, header=True) \
    .csv(path="input_csv_for_stream/aug_all_for_cassandra.csv") \
    .select('enrollee_id', 'city', 'city_development_index', 'gender', 'relevent_experience',
            'enrolled_university', 'education_level', 'major_discipline', 'experience', 'company_size',
            'company_type', 'last_new_job', 'training_hours', 'target')

# пишем
aug_all_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="aug_all", keyspace="koryagin") \
    .mode("append") \
    .save()
```

```python
# Проверяем - читаем большой большой датасет по ключу
cass_big_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="aug_all", keyspace="koryagin") \
    .load()

cass_big_df.filter(F.col("enrollee_id") == "27107").show()
```

![Title](img/2021-04-05_225607.jpg)

## Обучение модели

#### Для обучения модели запустить следующий код

```python
# %load aug_ml_train.py
"""
export SPARK_KAFKA_VERSION=0.10
/spark2.4/bin/pyspark \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,com.datastax.spark:spark-cassandra-connector_2.11:2.4.2 \
    --driver-memory 512m \
    --driver-cores 1 \
    --master local[1]
"""

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField, FloatType, DoubleType
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler, StringIndexer, IndexToString

spark = SparkSession.builder.appName("gogin_spark").getOrCreate()

enrollee_known = spark.read \
    .options(delimiter=',', inferschema=True, header=True) \
    .csv(path="input_csv_for_stream/aug_train.csv")

# в общем - все анализируемые колонки заносим в колонку-вектор features
categoricalColumns = ['city', 'gender', 'relevent_experience', 'enrolled_university', 'education_level',
                      'major_discipline', 'experience', 'company_type', 'last_new_job']
numericCols = ['city_development_index', 'training_hours']

stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], 
                                     outputCols=[categoricalCol + "classVec"]).setHandleInvalid("keep")
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol='target', outputCol='label').setHandleInvalid("keep")
stages += [label_stringIdx]

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
stages += [assembler]

lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)
stages += [lr]

label_stringIdx_fit = label_stringIdx.fit(dataset=enrollee_known)
indexToStringEstimator = IndexToString() \
    .setInputCol("prediction") \
    .setOutputCol("prediction_target") \
    .setLabels(label_stringIdx_fit.labels)

stages += [indexToStringEstimator]

pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(dataset=enrollee_known)

# сохраняем модель на HDFS
pipelineModel.write().overwrite().save("ml_models/my_LR_model_enrollees")
```

#### Для наглядности можно посчитать процент полной сходимости

```python
pipelineModel.transform(enrollee_known).select("prediction_target", "target").show(n=10)
```

![Title](img/2021-04-05_230936.jpg)

## Предсказание на стриме

#### Написание функций и dataframes

```python
# %load aug_ml_predict.py
"""
export SPARK_KAFKA_VERSION=0.10
/spark2.4/bin/pyspark \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,com.datastax.spark:spark-cassandra-connector_2.11:2.4.2 \
    --driver-memory 512m \
    --driver-cores 1 \
    --master local[1]
"""

from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("gogin_spark").getOrCreate()

kafka_brokers = "10.0.0.6:6667"

# читаем кафку по одной записи, но можем и по 1000 за раз
enrollee_schema = StructType([StructField('c0', StringType()),
                              StructField('enrollee_id', StringType()),
                              StructField('datetime', StringType())])

enrollees_df = spark.readStream \
    .format(source="kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", "enrollee_id") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "1") \
    .load() \
    .select(F.col("value").cast("String"), "offset") \
    .select(F.from_json(F.col("value"), enrollee_schema).alias("value"), "offset") \
    .select("value.*", "offset") \
    .select('enrollee_id', 'datetime', 'offset')

enrollees_df.printSchema()


def console_output(df, freq, truncate=True, numRows=20):
    """
    Вывод на консоль вместо show()
    :param df: spark DataFrame
    :param freq: frequency in seconds
    :param truncate: truncate values, bool
    """
    return df.writeStream \
        .format("console") \
        .trigger(processingTime='%s seconds' % freq) \
        .options(truncate=truncate) \
        .options(numRows=numRows) \
        .start()


# out = console_output(df=enrollees_df, freq=5, truncate=False)
# out.stop()

###############
# подготавливаем DataFrame для запросов к касандре с историческими данными
cassandra_features_raw = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="aug_all", keyspace="koryagin") \
    .load()

cassandra_features_selected = cassandra_features_raw.drop("target")
# cassandra_features_selected.show(n=5)

# подгружаем ML из HDFS
pipeline_model = PipelineModel.load(path="ml_models/my_LR_model_enrollees")


##########
# вся логика в этом foreachBatch
def writer_logic(df, epoch_id):
    """ Function for foreachBatch
    :param df: stream DataFrame from kafka
    :param epoch_id: --
    """
    df.persist()
    print("---------I've got new batch--------")
    print("This is what I've got from Kafka:")
    df.show()
    features_from_kafka = df.select("enrollee_id")
    #
    # оставим только уникальные "enrollee_id"
    users_list_df = features_from_kafka.distinct()
    # превращаем DataFrame(Row) в Array(Row)
    users_list_rows = users_list_df.collect()
    # превращаем Array(Row) в Array(String)
    users_list = map(lambda x: str(x.__getattr__("enrollee_id")), users_list_rows)
    # print(users_list)
    where_string = " enrollee_id = " + " or enrollee_id = ".join(users_list)
    print("I'm gonna select this from Cassandra:")
    print(where_string)
    features_from_cassandra = cassandra_features_selected \
        .where(where_string) \
        .na.fill(0) \
        .drop("enrollee_id")
    features_from_cassandra.persist()
    # объединяем микробатч из кафки и микробатч касандры
    cassandra_kafka_aggregation = features_from_cassandra
    predict = pipeline_model.transform(cassandra_kafka_aggregation)
    # predict_short = predict.select('city', 'city_development_index', 'gender', 'relevent_experience',
    #                                'enrolled_university', 'education_level', 'major_discipline', 'experience',
    #                                'comp,any_size', 'company_type', 'last_new_job', 'training_hours', 'prediction',
    #                                F.col("prediction_target").alias("target"))
    predict_short_short = features_from_kafka \
        .crossJoin(predict.select('prediction_target'))
    print("Here is what I've got after model transformation:")
    predict_short_short.show()
    # TODO: доделать сохранение предсказания обратно в Cassandra
    # обновляем исторический агрегат в касандре
    # predict_short.write \
    #     .format("org.apache.spark.sql.cassandra") \
    #     .options(table="aug_all", keyspace="koryagin") \
    #     .mode("append") \
    #     .save()
    # print("I saved the prediction and aggregation in Cassandra. Continue...")
    features_from_cassandra.unpersist()
    df.unpersist()


# связываем источник Кафки и foreachBatch функцию
stream = enrollees_df \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .foreachBatch(func=writer_logic) \
    .option("checkpointLocation", "checkpoints/sales_unknown_checkpoint")
```

#### Поехали

```python
# поехали
s = stream.start()

s.stop()
```

![Title](img/2021-04-05_233452.jpg)

In [63]:
# -- The End --