# Введение в Apache Spark (Практика PySpark)

## Часть 1

### Постановка задачи для разработчика по генератору синтетики создания датафреймов

#### Цель:
Разработать генератор синтетических данных для создания трех DataFrame в PySpark, включающих ФИО, города России, хобби на русском языке и телефонные номера в формате +7XXXXXXXXXX.

#### Требования к данным:
- ФИО, города, хобби и даты должны быть на русском языке.
- Телефонные номера должны соответствовать формату +7XXXXXXXXXX.
- Количество строк в каждом DataFrame должно быть равно `num_rows` (заданный параметр).

#### Тестирование:
- Провести тестирование кода, чтобы убедиться, что данные генерируются корректно и DataFrame создаются без ошибок.
- Проверить корректность форматов данных, особенно телефонных номеров.

#### Документация:
- Сопроводить код комментариями для объяснения работы функций.
- Подготовить краткую инструкцию по запуску и тестированию скрипта.
---

## Часть 2

1. **Создание RDD из различных источников данных:**
   - Загрузите текстовый файл созданный генератором текста (напишите его сами) и создайте RDD из его строк. Файл сохраняем в рабочую директорию блокнота, но в каталог `lesson2_hw`
   - Преобразуйте коллекцию Python в RDD.

2. **Операции над RDD:**
   - Используйте операцию `map` для преобразования каждой строки в RDD, например, разделите строки на слова.
   - Примените операцию `filter` для выборки строк из RDD, которые удовлетворяют определённому условию:
        * строки длиннее 10 символов;
        * строки содержащие числа: 0 или 1 или 2;
        * строки без слога "ка"

3. **Трансформационные операции: map, filter, flatMap:**
   - Создайте RDD, содержащий несколько предложений, с помощью генератора синтетики данных из задания 2.1, и используйте `flatMap` для преобразования каждого предложения в список слов.
   - Примените `filter` для извлечения слов:
       - начинающихся на букву "а";
       - без буквы "ъ" и "ь" знаков

4. **Действия: count, collect, reduce:**
   - Используйте `count` для определения количества элементов в RDD.
   - Примените `collect` для получения всех элементов RDD в виде списка.
   - Используйте `reduce` для агрегации элементов RDD:
       - для суммирования чисел;
       - для квадрата чисел

5. **Создание DataFrame из различных источников данных:**
   - Загрузите CSV-файл и создайте DataFrame с помощью генератора синтетики
   - Преобразуйте RDD, созданный в предыдущих заданиях, в DataFrame.

6. **Операции над DataFrame:**
   - Выполните операции `select` для выбора определенных столбцов из DataFrame-ов.
   - Используйте `filter` для фильтрации строк по определенному условию:
       - ФИО содержит слово "Петр";
       - все люди живут только города;
       - возраст от 20 до 30 лет

7. **Использование методов select, filter, groupBy:**
   - Примените `groupBy` для группировки данных по определенному столбцу и выполнения агрегации (например, подсчета).

8. **Применение функций для обработки данных в DataFrame:**
   - Создайте пользовательскую функцию для обработки данных в столбце и примените её к DataFrame. (**ПОВЫШЕННЫЙ УРОВЕНЬ**)

9. **Соединение и агрегация данных:**
   - Выполните соединение (inner join / left join) нескольких DataFrame по общему ключу.
   - Примените агрегационные функции (например, среднее, максимум) к результатам соединения.

10. **Объединение нескольких DataFrame и применение агрегационных функций:**
    - Объедините (union) два или более DataFrame.
    - Используйте агрегационные функции для анализа объединенных данных.
    


----

## Импорт библиотек

In [1]:
import os
import sys

import pandas as pd
from pyspark.sql import SparkSession, functions as F, types
from faker import Faker

fake = Faker('ru_RU')

## Настройка и запуск Spark приложения

In [4]:
spark = (SparkSession
         .builder.master('local')
         .enableHiveSupport()
         .appName('lesson2_hw')
         .getOrCreate())

23/12/23 16:31:58 WARN Utils: Your hostname, resolves to a loopback address: 127.0.0.1; using 192.168.94.184 instead (on interface en0)
23/12/23 16:31:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/23 16:31:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sc = spark.sparkContext

In [6]:
sc.setLogLevel('WARN')

In [7]:
spark

# Часть 1

In [8]:
def generate_data(num_rows):
    """
    Функция для генерации синтетических данных.

    Args:
    num_rows (int): Количество строк данных для генерации.

    Returns:
    DataFrame: DataFrame с синтетическими данными.
    """
    data = {
        # Генерирует случайное имя и фамилию
        'ФИО': [fake.name() for _ in range(num_rows)],
        # Генерирует случайный возраст
        'Возраст': [fake.ssn()[:2] for _ in range(num_rows)],
        # Генерирует название города
        'Город': [fake.city_name() for _ in range(num_rows)],
        # Генерирует случайное "бизнес-выражение" в качестве хобби
        'Хобби': [fake.bs() for _ in range(num_rows)],
        # Создает телефонный номер c добавлением префикса +7
        'Телефон': [f'+7{fake.msisdn()[3:]}' for _ in range(num_rows)]
    }

    # Создание DataFrame с помощью Pandas и конвертация в PySpark DataFrame
    pd_df = pd.DataFrame(data)
    spark_df = spark.createDataFrame(pd_df)

    return spark_df


def generate_text(filepath, num_paragraphs=5):
    """Генерирует текст с датами и записывает в файл."""
    # Открытие файла на запись
    with open(filepath, 'w') as file:
        # Генерация и запись текста в файл
        for _ in range(num_paragraphs):
            file.write(fake.date()+' ')
            file.write(fake.text())
            
    # Вывод сообщения об успешном выполнении
    print(f'Текстовый файл сгенерирован и сохранен в: {filepath}')

In [9]:
# Тестирование функции
num_rows = 5
test_df = generate_data(num_rows)
test_df.show(truncate=False)

# Проверка формата телефонных номеров
correct_format = all(test_df.select('Телефон').rdd.flatMap(lambda x: x).collect()[i].startswith('+7') and
                     len(test_df.select('Телефон').rdd.flatMap(lambda x: x).collect()[i]) == 12
                     for i in range(num_rows))

# Вывод результатов проверки
print(f'Формат телефонных номеров корректен: {correct_format}')

# Сохранение DataFrame в CSV
csv_filepath = './lesson2_hw/generated_csv'
generate_data(num_rows=5000).write.csv(csv_filepath, mode='overwrite', header=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+----------------------------+-------+------------+----------------------------------+------------+
|ФИО                         |Возраст|Город       |Хобби                             |Телефон     |
+----------------------------+-------+------------+----------------------------------+------------+
|Юдин Нестор Ярославович     |37     |Лабинск     |Сравнение интуитивных платформ    |+76756767423|
|Орехова Нина Борисовна      |93     |Верхотурье  |Адаптация совместных каналов      |+70549222318|
|Антонова Ангелина Викторовна|76     |Новомосковск|Конструирование целостных парадигм|+79536391638|
|Гордеев Галактион Чеславович|92     |Каргасок    |Ускорение интуитивных систем      |+75700613975|
|Надежда Игоревна Одинцова   |20     |Избербаш    |Формирование передовых интерфейсов|+73668311770|
+----------------------------+-------+------------+----------------------------------+------------+

Формат телефонных номеров корректен: True


----

# Часть 2

### 1. Создание RDD из различных источников данных

In [10]:
generate_text(txt_filepath:='./lesson2_hw/generated_text.txt')

Текстовый файл сгенерирован и сохранен в: ./lesson2_hw/generated_text.txt


In [11]:
text_rdd = sc.textFile(txt_filepath)
print('RDD:', text_rdd.collect())

RDD: ['2004-02-07 Недостаток табак выкинуть художественный. Следовательно домашний приходить расстегнуть. Миф зеленый командир выраженный.1979-02-09 Изредка металл что непривычный сутки чувство недостаток сопровождаться. Да промолчать художественный миг труп.2000-10-22 Расстегнуть мальчишка человечек ход мусор военный. Угодный за выдержать стакан сохранять космос достоинство.2022-10-29 Инструкция терапия командующий чувство дыхание художественный заведение. Приятель падаль угроза радость манера куча поймать.1991-09-25 Социалистический рота прощение руководитель призыв. Задержать роскошный мальчишка куча постоянный применяться.']


In [12]:
python_list = [1, 2, 3, 4, 5]
list_rdd = sc.parallelize(python_list)
print('RDD:', list_rdd.collect())

RDD: [1, 2, 3, 4, 5]


### 2. Операции над RDD

Преобразование строк:

In [13]:
convert_rdd = text_rdd.map(lambda x: x.replace('.', ' ')).flatMap(lambda x: x.split())
print('RDD:', convert_rdd.collect())

RDD: ['2004-02-07', 'Недостаток', 'табак', 'выкинуть', 'художественный', 'Следовательно', 'домашний', 'приходить', 'расстегнуть', 'Миф', 'зеленый', 'командир', 'выраженный', '1979-02-09', 'Изредка', 'металл', 'что', 'непривычный', 'сутки', 'чувство', 'недостаток', 'сопровождаться', 'Да', 'промолчать', 'художественный', 'миг', 'труп', '2000-10-22', 'Расстегнуть', 'мальчишка', 'человечек', 'ход', 'мусор', 'военный', 'Угодный', 'за', 'выдержать', 'стакан', 'сохранять', 'космос', 'достоинство', '2022-10-29', 'Инструкция', 'терапия', 'командующий', 'чувство', 'дыхание', 'художественный', 'заведение', 'Приятель', 'падаль', 'угроза', 'радость', 'манера', 'куча', 'поймать', '1991-09-25', 'Социалистический', 'рота', 'прощение', 'руководитель', 'призыв', 'Задержать', 'роскошный', 'мальчишка', 'куча', 'постоянный', 'применяться']


Фильтрация строк:

In [14]:
print('Длинные строки:', convert_rdd.filter(lambda x: len(x) > 10).collect())

Длинные строки: ['художественный', 'Следовательно', 'расстегнуть', 'непривычный', 'сопровождаться', 'художественный', 'Расстегнуть', 'достоинство', 'командующий', 'художественный', 'Социалистический', 'руководитель', 'применяться']


In [15]:
num_list = ['0', '1', '2']
print('Строки с числами:', convert_rdd.filter(lambda x: any(num in x for num in num_list)).collect())

Строки с числами: ['2004-02-07', '1979-02-09', '2000-10-22', '2022-10-29', '1991-09-25']


In [16]:
print('Строки без "ка":', convert_rdd.filter(lambda x: 'ка' not in x).collect())

Строки без "ка": ['2004-02-07', 'Недостаток', 'табак', 'выкинуть', 'художественный', 'Следовательно', 'домашний', 'приходить', 'расстегнуть', 'Миф', 'зеленый', 'командир', 'выраженный', '1979-02-09', 'металл', 'что', 'непривычный', 'сутки', 'чувство', 'недостаток', 'сопровождаться', 'Да', 'промолчать', 'художественный', 'миг', 'труп', '2000-10-22', 'Расстегнуть', 'человечек', 'ход', 'мусор', 'военный', 'Угодный', 'за', 'выдержать', 'сохранять', 'космос', 'достоинство', '2022-10-29', 'Инструкция', 'терапия', 'командующий', 'чувство', 'дыхание', 'художественный', 'заведение', 'Приятель', 'падаль', 'угроза', 'радость', 'манера', 'куча', 'поймать', '1991-09-25', 'Социалистический', 'рота', 'прощение', 'руководитель', 'призыв', 'Задержать', 'роскошный', 'куча', 'постоянный', 'применяться']


### 3. Трансформационные операции

In [17]:
convert_rdd = text_rdd.flatMap(lambda x: x.split())
print('RDD:', convert_rdd.collect())

RDD: ['2004-02-07', 'Недостаток', 'табак', 'выкинуть', 'художественный.', 'Следовательно', 'домашний', 'приходить', 'расстегнуть.', 'Миф', 'зеленый', 'командир', 'выраженный.1979-02-09', 'Изредка', 'металл', 'что', 'непривычный', 'сутки', 'чувство', 'недостаток', 'сопровождаться.', 'Да', 'промолчать', 'художественный', 'миг', 'труп.2000-10-22', 'Расстегнуть', 'мальчишка', 'человечек', 'ход', 'мусор', 'военный.', 'Угодный', 'за', 'выдержать', 'стакан', 'сохранять', 'космос', 'достоинство.2022-10-29', 'Инструкция', 'терапия', 'командующий', 'чувство', 'дыхание', 'художественный', 'заведение.', 'Приятель', 'падаль', 'угроза', 'радость', 'манера', 'куча', 'поймать.1991-09-25', 'Социалистический', 'рота', 'прощение', 'руководитель', 'призыв.', 'Задержать', 'роскошный', 'мальчишка', 'куча', 'постоянный', 'применяться.']


In [18]:
print('Начинающиеся на букву "а":', convert_rdd.filter(lambda x: x.startswith('а')).collect())

Начинающиеся на букву "а": []


In [19]:
print('Без букв "ъ" и "ь":', convert_rdd.filter(lambda x: 'ъ' not in x and 'ь' not in x).collect())

Без букв "ъ" и "ь": ['2004-02-07', 'Недостаток', 'табак', 'художественный.', 'домашний', 'Миф', 'зеленый', 'командир', 'выраженный.1979-02-09', 'Изредка', 'металл', 'что', 'непривычный', 'сутки', 'чувство', 'недостаток', 'Да', 'художественный', 'миг', 'труп.2000-10-22', 'человечек', 'ход', 'мусор', 'военный.', 'Угодный', 'за', 'стакан', 'космос', 'достоинство.2022-10-29', 'Инструкция', 'терапия', 'командующий', 'чувство', 'дыхание', 'художественный', 'заведение.', 'угроза', 'манера', 'куча', 'Социалистический', 'рота', 'прощение', 'призыв.', 'роскошный', 'куча', 'постоянный']


### 4. Действия: count, collect, reduce

In [20]:
sc.parallelize([1, 2, 3, 4, 5]).count()

5

In [21]:
sc.parallelize([1, 2, 3, 4, 5]).collect()

[1, 2, 3, 4, 5]

In [22]:
sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda x, y: x + y)

15

In [23]:
sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x**2).reduce(lambda x, y: x + y)

55

### 5. Создание DataFrame из различных источников данных

Создание DataFrame из CSV:

In [24]:
schema = types.StructType([
    types.StructField('ФИО', types.StringType(), True),
    types.StructField('Возраст', types.IntegerType(), True),
    types.StructField('Город', types.StringType(), True),
    types.StructField('Хобби', types.StringType(), True),
    types.StructField('Телефон', types.StringType(), True),
])

spark_df = spark.read.csv(csv_filepath, header=True, schema=schema)

spark_df.printSchema()
spark_df.show(5, truncate=False)

root
 |-- ФИО: string (nullable = true)
 |-- Возраст: integer (nullable = true)
 |-- Город: string (nullable = true)
 |-- Хобби: string (nullable = true)
 |-- Телефон: string (nullable = true)

+-----------------------------+-------+----------+-----------------------------------+------------+
|ФИО                          |Возраст|Город     |Хобби                              |Телефон     |
+-----------------------------+-------+----------+-----------------------------------+------------+
|Феврония Сергеевна Степанова |61     |Волгоград |Производство революционных платформ|+72241564256|
|Сидоров Трофим Феликсович    |71     |Прохладный|Шкалирование сенсационных ниш      |+73822310631|
|Глафира Станиславовна Павлова|76     |Челябинск |Переход онлайн и офлайн аудиторий  |+76017942750|
|Азарий Захарьевич Орехов     |2      |Кинешма   |Максимизация стратегических методик|+70021939530|
|Орехов Лучезар Якубович      |88     |Ребриха   |Культивация распространенных систем|+79077797748|
+-----

Преобразование RDD в DataFrame:

In [25]:
spark.createDataFrame(text_rdd, types.StringType()).show()

+--------------------+
|               value|
+--------------------+
|2004-02-07 Недост...|
+--------------------+



### 6. Операции над DataFrame

In [26]:
spark_df.select('ФИО', 'Телефон').show(5, truncate=False)

+-----------------------------+------------+
|ФИО                          |Телефон     |
+-----------------------------+------------+
|Феврония Сергеевна Степанова |+72241564256|
|Сидоров Трофим Феликсович    |+73822310631|
|Глафира Станиславовна Павлова|+76017942750|
|Азарий Захарьевич Орехов     |+70021939530|
|Орехов Лучезар Якубович      |+79077797748|
+-----------------------------+------------+
only showing top 5 rows



In [27]:
spark_df.filter(F.col('ФИО').contains('Петр ')).show(5, truncate=False)

+------------------------+-------+----------+-------------------------------------------+------------+
|ФИО                     |Возраст|Город     |Хобби                                      |Телефон     |
+------------------------+-------+----------+-------------------------------------------+------------+
|Николаев Петр Даниилович|71     |Териберка |Оцифровка мультимедийных интернет-магазинов|+75437097015|
|Петр Арсенович Мясников |95     |Белоярский|Оцифровка стандартных областей интереса    |+73654320466|
|Петр Викторович Тимофеев|76     |Эльбрус   |Внедрение открытых архитектур              |+78662120453|
|Гордеев Петр Дорофеевич |42     |Кулунда   |Управление расширяемых приложений          |+74174946964|
|Петр Анисимович Петров  |15     |Тимашевск |Шкалирование наглядных рынков              |+73582750999|
+------------------------+-------+----------+-------------------------------------------+------------+
only showing top 5 rows



In [28]:
spark_df.filter((F.col('Возраст') >= 20) & (F.col('Возраст') <= 30)).show(5, truncate=False)

+------------------------------+-------+--------+---------------------------------------+------------+
|ФИО                           |Возраст|Город   |Хобби                                  |Телефон     |
+------------------------------+-------+--------+---------------------------------------+------------+
|Александра Робертовна Калинина|29     |Макушино|Перезагрузка прибыльных инфопосредников|+70199708751|
|Стрелкова Феврония Аркадьевна |20     |Токсово |Производство совместных отношений      |+76592973086|
|Аксенов Пров Дмитриевич       |30     |Хасан   |Сравнение корпоративных действий       |+71331115721|
|Соколов Гостомысл Ерофеевич   |25     |Химки   |Объединение популярных инфопосредников |+78443712965|
|Ширяева Алла Вячеславовна     |29     |Аргаяш  |Использование открытых приложений      |+73954908836|
+------------------------------+-------+--------+---------------------------------------+------------+
only showing top 5 rows



### 7. Использование методов select, filter, groupBy

In [29]:
spark_df.groupBy('Город').count().filter(F.col('count') > 3).select('Город', 'count').show(5)

+------------+-----+
|       Город|count|
+------------+-----+
|   Волгоград|    9|
|Заводоуковск|   10|
|       Муром|   14|
|     Чусовой|    6|
|    Куртамыш|    5|
+------------+-----+
only showing top 5 rows



In [30]:
spark_df.groupBy('Город').agg(F.avg('Возраст').alias('Средний_возраст')).show(5)

+------------+------------------+
|       Город|   Средний_возраст|
+------------+------------------+
|   Волгоград| 49.77777777777778|
|Заводоуковск|              37.5|
|       Муром|54.357142857142854|
|     Чусовой|61.333333333333336|
|    Куртамыш|              19.8|
+------------+------------------+
only showing top 5 rows



### 8. Применение функций для обработки данных в DataFrame

In [31]:
# Регистрация функции как UDF
extract_surname_udf = F.udf(lambda x: x.split()[0], types.StringType())

# Пример новой колонки "Фамилия" в DataFrame
spark_df.withColumn('Фамилия', extract_surname_udf(spark_df['ФИО'])).show(5)

+--------------------+-------+----------+--------------------+------------+--------+
|                 ФИО|Возраст|     Город|               Хобби|     Телефон| Фамилия|
+--------------------+-------+----------+--------------------+------------+--------+
|Феврония Сергеевн...|     61| Волгоград|Производство рево...|+72241564256|Феврония|
|Сидоров Трофим Фе...|     71|Прохладный|Шкалирование сенс...|+73822310631| Сидоров|
|Глафира Станислав...|     76| Челябинск|Переход онлайн и ...|+76017942750| Глафира|
|Азарий Захарьевич...|      2|   Кинешма|Максимизация стра...|+70021939530|  Азарий|
|Орехов Лучезар Як...|     88|   Ребриха|Культивация распр...|+79077797748|  Орехов|
+--------------------+-------+----------+--------------------+------------+--------+
only showing top 5 rows



### 9. Соединение и агрегация данных

In [32]:
# Создание двух DataFrame для примера
# DataFrame 1
data1 = [('Анна', 1000),
         ('Борис', 1500),
         ('Светлана', 2000)]
df1 = spark.createDataFrame(data1, schema=['Имя', 'Зарплата'])

# DataFrame 2
data2 = [('Анна', 'Маркетинг'),
         ('Борис', 'Продажи'),
         ('Алексей', 'Разработка')]
df2 = spark.createDataFrame(data2, schema=['Имя', 'Отдел'])

# Выполнение соединения
# Здесь используем inner join, но можно также использовать 'left' для left join
joined_df = df1.join(df2, 'Имя', 'inner')

# Агрегационные функции
# Вычислим для примера среднюю и максимальную зарплаты в каждом отделе
aggregated_df = joined_df.groupBy('Отдел').agg(
    F.avg('Зарплата').alias('Средняя_зарплата'),
    F.max('Зарплата').alias('Максимальная_зарплата'),
)

# Выведем результат
aggregated_df.show()

+---------+----------------+---------------------+
|    Отдел|Средняя_зарплата|Максимальная_зарплата|
+---------+----------------+---------------------+
|Маркетинг|          1000.0|                 1000|
|  Продажи|          1500.0|                 1500|
+---------+----------------+---------------------+



### 10. Объединение нескольких DataFrame и применение агрегационных функций

In [34]:
# Регистрация функции как UDF
extract_name_udf = F.udf(lambda x: x.split()[1], types.StringType())

# Создание примера DataFrame
df_one = generate_data(num_rows=1000)
df_two = generate_data(num_rows=1000)

# Объединение DataFrame
union_df = df_one.union(df_two)
union_df = union_df.withColumn('Имя', extract_name_udf(union_df['ФИО']))

# Агрегационные функции
result_df = (union_df
             .groupBy('Город')
             .agg(
                 F.avg('Возраст').alias('Средний_возраст'),
                 F.count('Имя').alias('Количество_человек'),
             )
             .orderBy(F.col('Количество_человек').desc())
             .show(5))

spark.stop()

+-------------------+-----------------+------------------+
|              Город|  Средний_возраст|Количество_человек|
+-------------------+-----------------+------------------+
|            Уварово|68.77777777777777|                 9|
|         Домодедово|66.11111111111111|                 9|
|              Ельня|48.77777777777778|                 9|
|Гремячинск (Бурят.)|           49.625|                 8|
|            Петушки|           52.125|                 8|
+-------------------+-----------------+------------------+
only showing top 5 rows



# Чек-лист домашнего задания
## Часть 1
- [x] Установите на локальную машину(можно в Docker) Anaconda и настройте pyspark вместе переменными средами
- [x] Установите PySpark с сайта и настройте его на локальной машине

## Часть 2

- [x] Создание RDD из различных источников данных
- [x] Операции над RDD
- [x] Трансформационные операции: map, filter, flatMap
- [x] Действия: count, collect, reduce
- [x] Создание DataFrame из различных источников данных
- [x] Операции над DataFrame
- [x] Использование методов select, filter, groupBy
- [x] Применение функций для обработки данных в DataFrame
- [x] Соединение и агрегация данных
- [x] Объединение нескольких DataFrame и применение агрегационных функций

## Дополнительные задания
- [x] написан Генератор синтетики
- [x] выполненно дополнительное задание


