<div class="alert alert-block alert-info">
    <p><b>Курс: </b>Big Data for Data Science</p>
    <p><b>Проект</b> часть 2</p>
    <p><b>Раздел: </b>Data Engineering</p>
</div>

Мы с вами уже исследовали данные, которые у нас есть.

Теперь нам необходимо будет обработать наши данные и подготовить их к обучению моделей.

Для этого выделим ключевые параметры и определим целевую фичу (CTR).

При этом наши данные могут впоследствии изменяться, накапливаться и достигать больших объемов, поэтому нам необходимо реализовать задачу обработки данных, которую мы при необходимости сможем многократно выполнять для получения результата над любым объемом данных.

Структура данных:

- `date` - день, в который происходят события

- `time` - точное время события
- `event` - тип события, может быть или показ или клик по рекламе
- `platform` - платформа, на которой произошло рекламное событие
- `ad_id` - id рекламного объявления
- `client_union_id` - id рекламного клиента
- `campaign_union_id` - id рекламной кампании
- `ad_cost_type` - тип объявления с оплатой за клики (CPC) или за показы (CPM)
- `ad_cost` - стоимость объявления в рублях, для CPC объявлений - это цена за клик, для CPM - цена за 1000 показов
- `has_video` - есть ли у рекламного объявления видео
- `target_audience_count` - размер аудитории, на которую таргетируется объявление

Представьте, что все ваши инженеры по данным заболели, но и один в поле воин! 

У вас есть отличный шанс удивить их! Вам необходимо реализовать на PySpark задачу обработки данных для их подготовки к обучению моделей.

В результате выполнения вашей задачи, например, выполнив команду:

`spark-submit PySparkJob.py clickstream.parquet result`

или 

`python PySparkJob.py clickstream.parquet result`

Вы должны прочитать указанный в параметрах файл, обработать его и получить структуру папок вида:

```
- /result/train
- /result/test
- /result/validate
```

С наборами данных в следующем соотношении `train/test/validate = 0.5/0.25/0.25 (randomSplit)`.

Где в каждой папке должен находиться `parquet`-файл (число партиций не принципиально) со следующей структурой данных:

- `ad_id` [integer] - id рекламного объявления

- `target_audience_count` [decimal] -	размер аудитории, на которую таргетируется объявление
- `has_video` [integer] - 1 если есть видео, иначе 0
- `is_cpm` [integer] - 1 если тип объявления CPM, иначе 0
- `is_cpc` [integer] - 1 если тип объявления CPC, иначе 0
- `ad_cost` [double] - стоимость объявления в рублях
- `day_count` [integer] - Число дней, которое показывалась реклама
- `CTR`	[double] - Отношение числа кликов к числу просмотров

Пожалуйста, используйте шаблон [PySparkJob.py](https://raw.githubusercontent.com/AlexKbit/stepik-ds-course/master/Week3/Project/PySparkJob.py)

- [Документация](https://spark.apache.org/docs/latest/api/python/index.html)

- [Файл с данными](https://github.com/AlexKbit/stepik-ds-course/raw/master/Week3/Project/clickstream.parquet)

In [1]:
import io
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.appName('PySparkJob').getOrCreate()

In [3]:
spark

In [4]:
spark.conf.set('spark.sql.session.timeZone', 'GMT+3')

In [5]:
df = spark.read.parquet('_data/clickstream.parquet')

In [6]:
df.show(2)

+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|      date|               time|event|platform| ad_id|client_union_id|compaign_union_id|ad_cost_type|ad_cost|has_video|target_audience_count|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|2019-04-01|2019-04-01 00:00:48| view| android| 45061|          34734|            45061|         CPM|  200.6|        0|              1955269|
|2019-04-01|2019-04-01 00:00:48| view|     web|121288|         121288|           121288|         CPM|  187.4|        0|               232011|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
only showing top 2 rows



In [7]:
df.columns

['date',
 'time',
 'event',
 'platform',
 'ad_id',
 'client_union_id',
 'compaign_union_id',
 'ad_cost_type',
 'ad_cost',
 'has_video',
 'target_audience_count']

In [8]:
df.count()

1000000

In [9]:
ad_cnt = df[['ad_id']].distinct().count()
ad_cnt

965

```
1 000 000 событий для 965 объявлений
```

In [10]:
def get_features(df):
    ndf = df.withColumn('is_cpm', F.when(col('ad_cost_type') =='CPM', 1).otherwise(0)) \
            .withColumn('is_cpc', F.when(col('ad_cost_type') =='CPC', 1).otherwise(0)) \
            .withColumn('is_view', F.when(col('event') =='view', 1).otherwise(0)) \
            .withColumn('is_click', F.when(col('event') =='click', 1).otherwise(0))
    
    result_df = ndf.groupBy('ad_id') \
        .agg(F.max(col('target_audience_count')), 
             F.max(col('has_video')), 
             F.max(col('is_cpm')), 
             F.max(col('is_cpc')),
             F.max(col('ad_cost')),
             F.sum(col('is_view')),
             F.sum(col('is_click')),
             F.countDistinct(col('date')).astype('int'))\
        .withColumnRenamed('max(target_audience_count)', 'target_audience_count') \
        .withColumnRenamed('max(has_video)', 'has_video') \
        .withColumnRenamed('max(is_cpm)', 'is_cpm') \
        .withColumnRenamed('max(is_cpc)', 'is_cpc') \
        .withColumnRenamed('max(ad_cost)', 'ad_cost') \
        .withColumnRenamed('sum(is_view)', 'views_cnt') \
        .withColumnRenamed('sum(is_click)', 'cliks_cnt') \
        .withColumnRenamed('CAST(count(DISTINCT date) AS INT)', 'day_count') \
        .withColumn('CTR', col('cliks_cnt') / col('views_cnt')) \
        .drop(col('views_cnt')) \
        .drop(col('cliks_cnt')) \
        .sort('ad_id', ascending=True)
        
    
    return result_df

In [11]:
result_df = get_features(df)
result_df.show(5)

+-----+---------------------+---------+------+------+-------+---------+--------------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|ad_cost|day_count|                 CTR|
+-----+---------------------+---------+------+------+-------+---------+--------------------+
|    2|                14841|        0|     1|     0|  186.4|        2|0.009900990099009901|
|    3|                45035|        0|     0|     1|   46.7|        2|0.016304347826086956|
| 1902|                 1970|        0|     0|     1|   42.5|        2|0.024390243902439025|
| 2064|              4145879|        0|     1|     0|  203.4|        2|                 0.0|
| 2132|               672432|        0|     0|     1|   47.5|        3|0.017241379310344827|
+-----+---------------------+---------+------+------+-------+---------+--------------------+
only showing top 5 rows



In [12]:
result_df.dtypes

[('ad_id', 'int'),
 ('target_audience_count', 'decimal(10,0)'),
 ('has_video', 'int'),
 ('is_cpm', 'int'),
 ('is_cpc', 'int'),
 ('ad_cost', 'double'),
 ('day_count', 'int'),
 ('CTR', 'double')]

```
типы результирующего датафрейма соответствуют заданным в условии
```

In [13]:
result_cnt = result_df.count()
result_cnt

965

In [14]:
result_cnt == ad_cnt

True

```
в результирующем датафрейме 965 записей, по 1 на каждый ad_id исходных данных
```

In [15]:
def split(df, train_size, test_size, valid_size):
    return df.randomSplit([train_size, test_size, valid_size])

In [16]:
train_df, test_df, validate_df = split(result_df, train_size=0.5, test_size=0.25, valid_size=0.25)

In [17]:
train_cnt = train_df.count()
test_cnt = test_df.count()
validate_cnt = validate_df.count()

train_cnt, test_cnt, validate_cnt

(468, 228, 269)

In [18]:
train_cnt + test_cnt + validate_cnt == result_cnt

True

```
разбиение на тренировочный, тестовый и валидационный датафреймы успешно
```

In [19]:
train_df.coalesce(1).write.parquet('_data/result/train')
test_df.coalesce(1).write.parquet('_data/result/test')
validate_df.coalesce(1).write.parquet('_data/result/validate')