# Открытая школа Data Engineer
## Лекция 2 - Введение в Apache Spark (Практика PySpark)
## Домашняя работа

![image.png](attachment:image.png)

# Что вы должны знать по итогу первой лекции
## Часть 1: Введение в PySpark и настройка среды
1. Краткое введение в Apache Spark и роль PySpark в обработке данных.
2. Установка и настройка PySpark на Jupyter Notebook.
3. Запуск PySpark в Jupyter
4. Импорт необходимых модулей и инициализация SparkSession.

## Часть 2: Работа с RDD (Resilient Distributed Datasets)
1. Объяснение концепции RDD.
2. Создание RDD из различных источников данных.
3. Операции над RDD
4. Трансформационные операции: map, filter, flatMap.
5. Действия: count, collect, reduce.
6. Ленивые вычисления и устойчивость

## Часть 3: Работа с DataFrame
1. Ознакомление с концепцией DataFrame.
2. Создание DataFrame из различных источников данных.
3. Операции над DataFrame
4. Использование методов select, filter, groupBy.
5. Применение функций для обработки данных в DataFrame.
6. Соединение и агрегация данных
7. Объединение нескольких DataFrame.
8. Применение агрегационных функций.

# Домашнее задание
## Часть 1
1. Установите на локальную машину(можно в Docker) Anaconda и настройте pyspark вместе переменными средами
2. Установите PySpark с сайта и настройте его на локальной машине

---
***Перед выполненнием данной части задания, напишите генератор синтетики по следующей постановке***

### Постановка задачи для разработчика по генератору синтетики создания датафреймов

#### Цель:
Разработать генератор синтетических данных для создания трех DataFrame в PySpark, включающих ФИО, города России, хобби на русском языке и телефонные номера в формате +7XXXXXXXXXX.

#### Требования к данным:
- ФИО, города, хобби и даты должны быть на русском языке.
- Телефонные номера должны соответствовать формату +7XXXXXXXXXX.
- Количество строк в каждом DataFrame должно быть равно `num_rows` (заданный параметр).

#### Тестирование:
- Провести тестирование кода, чтобы убедиться, что данные генерируются корректно и DataFrame создаются без ошибок.
- Проверить корректность форматов данных, особенно телефонных номеров.

#### Документация:
- Сопроводить код комментариями для объяснения работы функций.
- Подготовить краткую инструкцию по запуску и тестированию скрипта.
---

## Часть 2

1. **Создание RDD из различных источников данных:**
   - Загрузите текстовый файл созданный генератором текста (напишите его сами) и создайте RDD из его строк. Файл сохраняем в рабочую директорию блокнота, но в каталог `lesson2_hw`
   - Преобразуйте коллекцию Python в RDD.

2. **Операции над RDD:**
   - Используйте операцию `map` для преобразования каждой строки в RDD, например, разделите строки на слова.
   - Примените операцию `filter` для выборки строк из RDD, которые удовлетворяют определённому условию:
        * строки длиннее 10 символов;
        * строки содержащие числа: 0 или 1 или 2;
        * строки без слога "ка"

3. **Трансформационные операции: map, filter, flatMap:**
   - Создайте RDD, содержащий несколько предложений, с помощью генератора синтетики данных из задания 2.1, и используйте `flatMap` для преобразования каждого предложения в список слов.
   - Примените `filter` для извлечения слов:
       - начинающихся на букву "а";
       - без буквы "ъ" и "ь" знаков

4. **Действия: count, collect, reduce:**
   - Используйте `count` для определения количества элементов в RDD.
   - Примените `collect` для получения всех элементов RDD в виде списка.
   - Используйте `reduce` для агрегации элементов RDD:
       - для суммирования чисел;
       - для квадрата чисел

5. **Создание DataFrame из различных источников данных:**
   - Загрузите CSV-файл и создайте DataFrame с помощью генератора синтетики
   - Преобразуйте RDD, созданный в предыдущих заданиях, в DataFrame.

6. **Операции над DataFrame:**
   - Выполните операции `select` для выбора определенных столбцов из DataFrame-ов.
   - Используйте `filter` для фильтрации строк по определенному условию:
       - ФИО содержит слово "Петр";
       - все люди живут только города;
       - возраст от 20 до 30 лет

7. **Использование методов select, filter, groupBy:**
   - Примените `groupBy` для группировки данных по определенному столбцу и выполнения агрегации (например, подсчета).

8. **Применение функций для обработки данных в DataFrame:**
   - Создайте пользовательскую функцию для обработки данных в столбце и примените её к DataFrame. (**ПОВЫШЕННЫЙ УРОВЕНЬ**)

9. **Соединение и агрегация данных:**
   - Выполните соединение (inner join / left join) нескольких DataFrame по общему ключу.
   - Примените агрегационные функции (например, среднее, максимум) к результатам соединения.

10. **Объединение нескольких DataFrame и применение агрегационных функций:**
    - Объедините (union) два или более DataFrame.
    - Используйте агрегационные функции для анализа объединенных данных.
    


<div style="background-color: red; color: white; padding: 10px; border: 2px solid black;">По итогу выполнения домашнего задания - сверься с чек-листом в конце</div>


----
# Практика

## Задание 1
### Введение в PySpark и настройка среды
1. Краткое введение в Apache Spark и роль PySpark в обработке данных.
2. Установка и настройка PySpark на Jupyter Notebook.
3. Запуск PySpark в Jupyter
4. Импорт необходимых модулей и инициализация SparkSession.



1. Первый шаг, нужно **скачать** spark c сайта - [Apache Spark](https://spark.apache.org/downloads.html). Выбирайте последнюю актуальную версию![image_spark_1.png](attachment:image_spark_1.png)

## Настройка Jupyter notebook

### 1 Установка PySpark в среде Anaconda:
Запустите Anaconda Navigator или откройте терминал и выполните следующую команду, чтобы установить PySpark:
```conda install -c conda-forge pyspark```
### 2 Установка Java:
PySpark требует установленной Java. Убедитесь, что у вас установлена Java, а проверить вы это можете при запуске кода, если уидите в логе проблему с JDK, то устрановите системную переменную и задайте PATH.
### 3 Настройка переменной PYSPARK_PYTHON:
Укажите Anaconda Python как Python, используемый PySpark. Для этого установите переменную окружения PYSPARK_PYTHON. Откройте терминал и выполните:
```export PYSPARK_PYTHON=/путь/к/вашему/python3.x```
Здесь ```/путь/к/вашему/python3.x``` - это полный путь к вашему исполняемому файлу Python из Anaconda. Например, если ваш Python находится в папке Anaconda, это может быть что-то вроде ```/anaconda3/bin/python```.
### 4 Запуск PySpark:
Теперь вы можете запустить PySpark из вашего терминала или Jupyter Notebook, используя следующую команду:
```pyspark```
Или, если вы используете Jupyter Notebook:

```pyspark --master local[2] --driver-memory 2g --executor-memory 1g --packages graphframes:graphframes:0.8.1-s_2.12```

Здесь ```--packages graphframes:graphframes:0.8.1-s_2.12``` - пример добавления дополнительного пакета GraphFrames, но можете изменить его в соответствии с вашими потребностями.

Эти шаги должны настроить Anaconda Python для успешного запуска PySpark. Убедитесь, что переменная PYSPARK_PYTHON установлена перед запуском PySpark.

<div style="background-color: blue; color: white; padding: 10px; border: 2px solid black; text-align: center;">

</div>


# Задание 2

## Импорт библиотек

In [1]:
import os
import sys

## Настройка сред

В рабочем каталоге Jupyter формируем файл init_spark_env.py,а дальше импортируем его. В этом файле вы настраиваете PySpark
* распоковали в директорию dir
* проверили наличие JDK
* запустили pyspark
* скопировали настройки в локальный файл init_spark_env.py
    *  для import
    * подложить в директорию ~/.ipython/profile_default/startup
   

In [4]:
#Для случая, если установили спарк с оф.сайта
#import init_spark_env

**Определение настроек из PySpark**
 * ```os.enviroment['SPARK_HOME']```
 * ```sys.path()```


## Настройка и запуск Spark приложения

***Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel)***

*Это предупреждение сообщает вам, что уровень логирования установлен на "WARN" по умолчанию. Если вам нужны более подробные логи, вы можете использовать sc.setLogLevel("newLevel") для установки другого уровня логирования. Например, "INFO".*

----

***NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable***

*Это предупреждение указывает на то, что Spark не может загрузить нативную библиотеку Hadoop для вашей платформы. В этом случае, Spark будет использовать встроенные классы Java, что может повлиять на производительность, но не должно вызывать критические проблемы.*

----

***GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors***

*Это предупреждение указывает на то, что Spark предупреждает, что не включены некоторые не встроенные (non-built-in) сборщики мусора. Если вам необходимо собирать метрики по сборке мусора для нестандартных сборщиков мусора (например, G1 Concurrent GC), вы должны настроить их в файлах конфигурации Spark.*

---

```
23/12/13 23:17:00 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/12/13 23:17:00 INFO SharedState: Warehouse path is 'file:/Users/lovcovvladimir/spark-warehouse'.
```
Эти строки лога говорят о том, что Spark устанавливает значение параметра hive.metastore.warehouse.dir в значение параметра `spark.sql.warehouse.dir`. В данном случае `hive.metastore.warehouse.dir` устанавливается в `'null'`, а `spark.sql.warehouse.dir` имеет значение `'file:/Users/lovcovvQWAladimir/spark-warehouse'`.

В `Spark hive.metastore.warehouse.dir` используется для определения расположения хранилища данных Hive (если используется Hive). В нашем случае, Hive не используется, поэтому устанавливается значение `'null'`.

Следующая строка информирует нас о том, что директория, используемая для хранения данных Spark, установлена в `'file:/Users/lovcovvQWAladimir/spark-warehouse'`.

In [5]:
spark

23/12/18 22:48:38 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# Часть 2

### 1. Создание RDD из различных источников данных

### 2. Операции над RDD

Преобразование строк:

Фильтрация строк:

### 3. Трансформационные операции

### 4. Действия: count, collect, reduce

### 5. Создание DataFrame из различных источников данных

Создание DataFrame из CSV:

Преобразование RDD в DataFrame:

### 6. Операции над DataFrame

### 7. Использование методов select, filter, groupBy

### 8. Применение функций для обработки данных в DataFrame

### 9. Соединение и агрегация данных

### 10. Объединение нескольких DataFrame и применение агрегационных функций

# Чек-лист домашнего задания
## Часть 1
- [ ] Установите на локальную машину(можно в Docker) Anaconda и настройте pyspark вместе переменными средами
- [ ] Установите PySpark с сайта и настройте его на локальной машине

## Часть 2

- [ ] Создание RDD из различных источников данных
- [ ] Операции над RDD
- [ ] Трансформационные операции: map, filter, flatMap
- [ ] Действия: count, collect, reduce
- [ ] Создание DataFrame из различных источников данных
- [ ] Операции над DataFrame
- [ ] Использование методов select, filter, groupBy
- [ ] Применение функций для обработки данных в DataFrame
- [ ] Соединение и агрегация данных
- [ ] Объединение нескольких DataFrame и применение агрегационных функций

## Дополнительные задания
- [ ] написан Генератор синтетики
- [ ] выполненно дополнительное задание




![image.png](attachment:image.png)

---
<footer style="text-align: center; padding: 20px 0; background-color: #f8f8f8; border-top: 1px solid #e7e7e7;">
  <p style="margin: 0; font-size: 16px; color: #777;">&copy; 2023 Владимир Ловцов, ГК Иннотех</p>
  <p style="margin: 0; font-size: 14px; color: #555;">Больше информации <a href="https://t.me/it_underside" target="_blank">Мой телеграм канал - Изнанка ИТ</a></p>
</footer>
