In [8]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DoubleType

#### Подключаемся к серверy

```bash
ssh 305_koryagin@37.139.32.56 -i ./id_rsa_305_koryagin.txt
```

#### Запускаем spark

```bash
/spark2.4/bin/pyspark \
    --driver-memory 512m \
    --driver-cores 1
```

```python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, DoubleType

spark = SparkSession.builder.appName("gogin_spark").getOrCreate()
```

## Создание DataFrame

```python
# для начала готовим DataFrame
data = spark.read \
    .options(delimiter=',', inferschema=True, header=True) \
    .csv(path="input_csv_for_recommend_system/data.csv")

data.printSchema()
```

<details>
    <summary> → вывод консоли SPARK</summary>

```bash
root
 |-- sale_date_date: string (nullable = true)
 |-- contact_id: string (nullable = true)
 |-- shop_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- product_sub_category_id: string (nullable = true)
 |-- product_category_id: string (nullable = true)
 |-- brand_id: string (nullable = true)
 |-- quantity: string (nullable = true)
```
</details>

## Обзор

#### Посмотрим число пропусков в каждом столбце.


```python
for col in data.columns:
    print(col, "\t", "with null values: ", data.filter(data[col].isNull()).count())
```


<details>
    <summary> → вывод консоли SPARK</summary>

```bash
...
('sale_date_date', 'count of null values = ', 2)
('contact_id', 'count of null values = ', 2)
('shop_id', 'count of null values = ', 3)
('product_id', 'count of null values = ', 3)
('name', 'count of null values = ', 3)
('product_sub_category_id', 'count of null values = ', 3)
('product_category_id', 'count of null values = ', 3)
('brand_id', 'count of null values = ', 3)
('quantity', 'count of null values = ', 3))
```
</details>

#### Посмотрим число -1 в каждом столбце.

```python
for col in data.columns:
    print(col, "count of -1 values = ", data.filter(data[col] == '-1').count())
```

<details>
    <summary> → вывод консоли SPARK</summary>

```bash
...
('sale_date_date', 'count of -1 values = ', 0)
('contact_id', 'count of -1 values = ', 0)
('shop_id', 'count of -1 values = ', 0)
('product_id', 'count of -1 values = ', 193)
('name', 'count of -1 values = ', 0)
('product_sub_category_id', 'count of -1 values = ', 647720)
('product_category_id', 'count of -1 values = ', 649395)
('brand_id', 'count of -1 values = ', 16169893)
('quantity', 'count of -1 values = ', 13874)
```
</details>

#### Посмотрим конец таблицы по 2 столбцам

```python
data.select('sale_date_date', 'contact_id') \
    .sort(F.col("sale_date_date").asc()) \
    .show(n=5, truncate=False)
```


<details>
    <summary> → вывод консоли SPARK</summary>

```bash
+--------------+-----------+
|sale_date_date|contact_id |
+--------------+-----------+
|null          |null       |
|null          |null       |
|(затронуто стр|к: 20000000|
|2018-01-01    |1970794    |
|2018-01-01    |850958     |
+--------------+-----------+
only showing top 5 rows
```
</details>

## Преобразование типов

#### Удалить последние 3 строки.

```python
data = data.where(F.col("sale_date_date") != "(затронуто стр")
data.count()
```
```bash
20000000
```

#### Переведем sale_date_date в формат DateType

```python
data = data.withColumn(colName="sale_date_date", col=data["sale_date_date"].cast(DateType()))
```

#### Переведем `contact_id`, `shop_id`, `product_id`, `product_sub_category_id`, `product_category_id`, `brand_id` в формат *IntegerType*

```python
data = data \
    .withColumn(colName="contact_id", col=data["contact_id"].cast(IntegerType())) \
    .withColumn(colName="shop_id", col=data['shop_id'].cast(IntegerType())) \
    .withColumn(colName='product_id', col=data['product_id'].cast(IntegerType())) \
    .withColumn(colName='product_sub_category_id', col=data['product_sub_category_id'].cast(IntegerType())) \
    .withColumn(colName='product_category_id', col=data['product_category_id'].cast(IntegerType())) \
    .withColumn(colName='brand_id', col=data['brand_id'].cast(IntegerType()))
```

#### Переведем `quantity` в формат *FloatType*

```python
data = data.withColumn(colName='quantity', col=F.regexp_replace(str='quantity', pattern=',', replacement='.'))
data = data.withColumn(colName='quantity', col=data['quantity'].cast(FloatType()))
```

## Обзор, продолжение

#### Описание колонки `quantity`

```python
data.select('quantity').describe().show()
```
```bash
+-------+-----------------+
|summary|         quantity|
+-------+-----------------+
|  count|         20000000|
|   mean|4.538670170527493|
| stddev|91.54221194709096|
|    min|             -1.0|
|    max|          10000.0|
+-------+-----------------+

```

#### Число уникальных значений в каждой колонке

```python
data.select(*[F.countDistinct(col).alias(col) for col in data.columns]) \
    .show()
```

```bash
+--------------+----------+-------+----------+-----+-----------------------+-------------------+--------+--------+
|sale_date_date|contact_id|shop_id|product_id| name|product_sub_category_id|product_category_id|brand_id|quantity|
+--------------+----------+-------+----------+-----+-----------------------+-------------------+--------+--------+
|           214|   1642379|    851|     36549|36113|                    440|                145|    1617|    1296|
+--------------+----------+-------+----------+-----+-----------------------+-------------------+--------+--------+
```

## Пересохранение файла в формат .parquet

```python
data.write.parquet(path="input_csv_for_recommend_system/data.parquet", mode='overwrite')
```

#### Проверка

```bash
-bash-4.2$ hdfs dfs -du -h input_csv_for_recommend_system/

2.2 G    4.3 G    input_csv_for_recommend_system/data.csv
429.3 M  858.5 M  input_csv_for_recommend_system/data.parquet
```