<a href="https://colab.research.google.com/github/zoya-ivanova/BigData/blob/main/SparkApache5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Spark Apache

### Cоздай csv файл с содержимым:

title,author,genre,sales,year

"1984", "George Orwell", "Science Fiction", 5000, 1949

"The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954

"To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960

"The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951

"The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925

### Используя Spark:

— Прочитайте данные из файла csv.

— Фильтруйте данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров.

— Сгруппируйте данные по жанру и вычислите общий объем продаж для каждого жанра.

— Отсортируйте данные по общему объему продаж в порядке убывания.

— Выведите результаты на экран.

In [28]:
import csv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Данные для CSV файла передаем в переменную books
books = [
    ["1984", "George Orwell", "Science Fiction", 5000, 1949],
    ["The Lord of the Rings", "J.R.R. Tolkien", "Fantasy", 3000, 1954],
    ["To Kill a Mockingbird", "Harper Lee", "Southern Gothic", 4000, 1960],
    ["The Catcher in the Rye", "J.D. Salinger", "Novel", 2000, 1951],
    ["The Great Gatsby", "F. Scott Fitzgerald", "Novel", 4500, 1925]
]

In [29]:
# Создание CSV файла
with open('books.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["title", "author", "genre", "sales", "year"])  # Заголовок
    writer.writerows(books)

In [31]:
# Создаем SparkSession
spark = SparkSession.builder.appName("BookSalesAnalysis").getOrCreate()

In [36]:
# Считываем данные из CSV файла
df = spark.read.csv("books.csv", header=True, inferSchema=True)
df.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|The Lord of the R...|     J.R.R. Tolkien|        Fantasy| 3000|1954|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|The Catcher in th...|      J.D. Salinger|          Novel| 2000|1951|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
+--------------------+-------------------+---------------+-----+----+



In [37]:
# Фильтруем данные, чтобы оставить только книги, продажи которых превышают 3000 экземпляров
filtered_df = df.filter(col("sales") > 3000)
filtered_df.show()

+--------------------+-------------------+---------------+-----+----+
|               title|             author|          genre|sales|year|
+--------------------+-------------------+---------------+-----+----+
|                1984|      George Orwell|Science Fiction| 5000|1949|
|To Kill a Mocking...|         Harper Lee|Southern Gothic| 4000|1960|
|    The Great Gatsby|F. Scott Fitzgerald|          Novel| 4500|1925|
+--------------------+-------------------+---------------+-----+----+



In [39]:
# Сгруппируем данные по жанру и вычислим общий объем продаж для каждого жанра
grouped_df = filtered_df.groupBy("genre").agg(sum("sales").alias("total_sales"))
grouped_df.show()

+---------------+-----------+
|          genre|total_sales|
+---------------+-----------+
|Southern Gothic|       4000|
|          Novel|       4500|
|Science Fiction|       5000|
+---------------+-----------+



In [40]:
# Отсортируем данные по общему объему продаж в порядке убывания
sorted_df = grouped_df.orderBy(col("total_sales").desc())
sorted_df.show()

+---------------+-----------+
|          genre|total_sales|
+---------------+-----------+
|Science Fiction|       5000|
|          Novel|       4500|
|Southern Gothic|       4000|
+---------------+-----------+



In [41]:
# Остановка SparkSession
spark.stop()

### Spark on scala -->


In [1]:
!pip install pyspark >> None

In [11]:
!apt-get install -y scala

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
scala is already the newest version (2.11.12-5).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [20]:
!pip install spylon-kernel  # код для установки ядра Scala



In [21]:
!python -m spylon_kernel install

[InstallKernelSpec] Removing existing kernelspec in /usr/local/share/jupyter/kernels/spylon-kernel
[InstallKernelSpec] Installed kernelspec spylon-kernel in /usr/local/share/jupyter/kernels/spylon-kernel


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, upper # импортируем функции Spark SQL

spark = SparkSession.builder \
    .appName("SparkSqlbasicexample") \
    .getOrCreate()

# Создаем данные
data = [
    ("John", 30, "2024-02-29", True),
    ("Jane", 25, "2024-02-28", False)
]

# Создаем DataFrame
df = spark.createDataFrame(data, ["Name", "Age", "BirthDate", "IsActive"])

# Выводим DataFrame
df.show()

+----+---+----------+--------+
|Name|Age| BirthDate|IsActive|
+----+---+----------+--------+
|John| 30|2024-02-29|    true|
|Jane| 25|2024-02-28|   false|
+----+---+----------+--------+



In [7]:
# Пример фильтрации
filteredDf = df.filter(col("Age") > 25)
filteredDf.show()

+----+---+----------+--------+
|Name|Age| BirthDate|IsActive|
+----+---+----------+--------+
|John| 30|2024-02-29|    true|
+----+---+----------+--------+



In [8]:
# Пример агрегации
avgAge = df.agg(avg("Age"))
avgAge.show()

+--------+
|avg(Age)|
+--------+
|    27.5|
+--------+



In [9]:
# Пример преобразования
withUppercaseName = df.withColumn("Name", upper(col("Name")))
withUppercaseName.show()

+----+---+----------+--------+
|Name|Age| BirthDate|IsActive|
+----+---+----------+--------+
|JOHN| 30|2024-02-29|    true|
|JANE| 25|2024-02-28|   false|
+----+---+----------+--------+



In [10]:
spark.stop()

In [17]:
!pip list | grep pyspark

pyspark                          3.5.1


### Задача 1:
У вас есть RDD со следующей структурой: (ключ, значение). Напишите программу на Apache Spark, которая найдет среднее значение для каждого уникального ключа.

Данный код выполняет следующие действия:

1. Импортирует необходимые модули из библиотеки PySpark: `SparkConf` и `SparkContext`.
2. Создает объект `SparkConf`, который представляет конфигурацию для создания `SparkContext`.
   - Метод `setAppName("Average By Key Example")` устанавливает имя приложения в "Average By Key Example".
   - Метод `setMaster("local[*]")` устанавливает локальный режим выполнения Spark с использованием всех доступных ядер процессора.
3. Создает объект `SparkContext` (sc) на основе конфигурации `SparkConf`.
4. Создает RDD (Resilient Distributed Dataset) под названием `dataRDD` с помощью метода `parallelize`, передавая список пар ключ-значение.
5. Преобразует RDD `dataRDD`, используя цепочку операций:
   - Метод `mapValues` применяет функцию `lambda value: (value, 1)` к каждому значению RDD, превращая его в кортеж (значение, 1).
   - Метод `reduceByKey` выполняет суммирование значений для каждого ключа, суммируя значения и соответствующие им счетчики.
   - Метод `mapValues` применяет функцию `lambda sum_count: sum_count[0] / sum_count[1]` к каждой сумме и счетчику, вычисляя среднее значение для каждого ключа.
6. Вызывает операцию `collect` на RDD `averageRDD` для получения всех результатов в локальном списке.
7. Итерируется по результатам и выводит каждую пару ключ-значение.
8. Вызывает метод `stop` у объекта `SparkContext` для остановки Spark контекста и освобождения ресурсов.

Таким образом, данный код вычисляет среднее значение для каждого ключа в RDD `dataRDD` с использованием Spark и выводит результаты./

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Average By Key Example").setMaster("local[*]")
sc = SparkContext(conf=conf)

dataRDD = sc.parallelize([
    ("key1", 10),
    ("key2", 20),
    ("key1", 30),
    ("key2", 40),
    ("key1", 50)
])

averageRDD = dataRDD \
    .mapValues(lambda value: (value, 1)) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .mapValues(lambda sum_count: sum_count[0] / sum_count[1])

# Collect the data and print it
result = averageRDD.collect()
for key, value in result:
    print(key, value)

sc.stop()

key1 30.0
key2 30.0


### Задача 2

У вас есть RDD со следующей структурой: (ключ, значение). Напишите программу на Apache Spark, которая найдет максимальное значение для каждого уникального ключа.

Данный код использует PySpark для выполнения операции поиска максимального значения для каждого ключа в параллельно распределенном наборе данных (RDD).

Вот пошаговое описание кода:

1. Импортируются необходимые модули: `SparkContext` и `SparkConf` из пакета `pyspark`.
2. Создается объект `SparkConf` с настройками для приложения Spark. В данном случае, устанавливается имя приложения "MaxValueByKeyExample".
3. Создается объект `SparkContext` (`sc`), который представляет соединение с кластером Spark с использованием настроек из `SparkConf`.
4. Создается список `data`, который содержит пары ключ-значение. Каждая пара представлена в виде кортежа.
5. Создается RDD (`rdd`) из списка `data` с помощью метода `parallelize()`. RDD - это распределенный неизменяемый набор данных, на котором можно выполнять параллельные операции.
6. Выполняется операция `reduceByKey(max)` на RDD `rdd`. Операция `reduceByKey()` объединяет значения для каждого ключа и применяет функцию `max` для нахождения максимального значения.
7. Результат операции `reduceByKey()` сохраняется в переменной `max_values`.
8. Вызывается метод `collect()` на `max_values` для сбора всех результатов в драйвере программы.
9. Затем происходит итерация по результатам (`results`), и для каждого результата выводится ключ и соответствующее максимальное значение с помощью оператора `print()`.
10. Наконец, вызывается метод `stop()` на объекте `SparkContext` (`sc`), чтобы закрыть соединение с кластером Spark.

Таким образом, данный код выполняет операцию поиска максимального значения для каждого ключа в RDD с использованием Spark.

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("MaxValueByKeyExample")
sc = SparkContext(conf=conf)

data = [("key1", 10), ("key2", 5), ("key1", 20), ("key2", 15), ("key3", 8)]
rdd = sc.parallelize(data)

max_values = rdd.reduceByKey(max)

results = max_values.collect()
for result in results:
    print(result[0], result[1])

sc.stop()

key1 20
key2 15
key3 8


### Задача 3

Напишите программу на Apache Spark, которая считает сумму всех чисел в заданном RDD (Resilient Distributed Dataset).

Данный код использует PySpark для вычисления суммы элементов в распределенном наборе данных (RDD).

Вот пошаговое описание кода:

1. Импортируются необходимые модули: `SparkContext` и `SparkConf` из пакета `pyspark`.
2. Создается объект `SparkConf` с настройками для приложения Spark. В данном случае, устанавливается имя приложения "SumRDDExample".
3. Создается объект `SparkContext` (`sc`), который представляет соединение с кластером Spark с использованием настроек из `SparkConf`.
4. Создается список `data`, который содержит числа.
5. Создается RDD (`rdd`) из списка `data` с помощью метода `parallelize()`. RDD - это распределенный неизменяемый набор данных, на котором можно выполнять параллельные операции.
6. Выполняется операция `sum()` на RDD `rdd`. Операция `sum()` вычисляет сумму всех элементов в RDD.
7. Результат операции `sum()` сохраняется в переменной `total_sum`.
8. Выводится сообщение с помощью оператора `print()`, которое содержит сумму всех чисел.
9. Вызывается метод `stop()` на объекте `SparkContext` (`sc`), чтобы закрыть соединение с кластером Spark.

Таким образом, данный код вычисляет сумму всех чисел в RDD с использованием Spark и выводит эту сумму на экран.

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("SumRDDExample")
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

total_sum = rdd.sum()

print("Сумма всех чисел:", total_sum)

sc.stop()

Сумма всех чисел: 15


### Задача 4

У вас есть RDD со следующей структурой: (ключ, значение). Напишите программу на Apache Spark, которая найдет топ N ключей с наибольшим значением, где N - заданное число.

Данный код использует PySpark для нахождения N ключей с наибольшими значениями в параллельно распределенном наборе данных (RDD).

Вот пошаговое описание кода:

1. Импортируются необходимые модули: `SparkContext` и `SparkConf` из пакета `pyspark`.
2. Создается объект `SparkConf` с настройками для приложения Spark. В данном случае, устанавливается имя приложения "TopNKeysExample".
3. Создается объект `SparkContext` (`sc`), который представляет соединение с кластером Spark с использованием настроек из `SparkConf`.
4. Создается список `data`, который содержит пары ключ-значение. Каждая пара представлена в виде кортежа.
5. Создается RDD (`rdd`) из списка `data` с помощью метода `parallelize()`. RDD - это распределенный неизменяемый набор данных, на котором можно выполнять параллельные операции.
6. Устанавливается значение переменной `N`, которая определяет количество ключей с наибольшими значениями, которые будут выбраны.
7. Выполняется операция `takeOrdered(N, key=lambda x: -x[1])` на RDD `rdd`. Операция `takeOrdered()` возвращает N элементов RDD, отсортированных в порядке возрастания на основе заданного ключа. В данном случае, ключом является второй элемент кортежа (значение), и сортировка осуществляется в порядке убывания, указав `-x[1]`.
8. Результат операции `takeOrdered()` сохраняется в переменной `top_N_keys`.
9. Затем происходит итерация по результатам (`top_N_keys`), и для каждой пары ключ-значение выводится ключ и значение с помощью оператора `print()`.
10. Наконец, вызывается метод `stop()` на объекте `SparkContext` (`sc`), чтобы закрыть соединение с кластером Spark.

Таким образом, данный код находит N ключей с наибольшими значениями в RDD и выводит их на экран в порядке убывания значений.

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("TopNKeysExample")
sc = SparkContext(conf=conf)

data = [("key1", 10), ("key2", 5), ("key3", 20), ("key4", 15), ("key5", 8)]
rdd = sc.parallelize(data)

N = 3

top_N_keys = rdd.takeOrdered(N, key=lambda x: -x[1])

for key, value in top_N_keys:
    print(key, value)

sc.stop()

key3 20
key4 15
key1 10


### Задача 5

Напишите программу на Apache Spark, которая читает CSV-файл и выводит количество строк в файле.

Данный код использует PySpark для подсчета количества строк в CSV-файле с использованием Spark SQL и DataFrame API.

Вот пошаговое описание кода:

1. Импортируются необходимые модули: `SparkContext`, `SparkConf` из пакета `pyspark`, а также `SparkSession` из модуля `pyspark.sql`.
2. Создается объект `SparkConf` с настройками для приложения Spark. В данном случае, устанавливается имя приложения "CountCSVLinesExample".
3. Создается объект `SparkContext` (`sc`), который представляет соединение с кластером Spark с использованием настроек из `SparkConf`.
4. Создается объект `SparkSession` (`spark`) с помощью метода `SparkSession.builder.getOrCreate()`. `SparkSession` предоставляет доступ к функциональности Spark SQL.
5. Устанавливается путь к CSV-файлу, который будет использоваться для подсчета количества строк. Путь указывается в переменной `csv_file_path`.
6. С помощью метода `read.csv()` на объекте `spark` загружается CSV-файл. Методу передаются следующие аргументы: `csv_file_path` - путь к файлу, `header=True` - указывает, что первая строка файла содержит заголовки столбцов, `inferSchema=True` - указывает на автоматическое определение схемы данных.
7. Результат чтения CSV-файла сохраняется в переменной `df` в виде DataFrame. DataFrame - это распределенная коллекция данных, организованная в именованные столбцы.
8. Выполняется операция `count()` на объекте `df`. Операция `count()` возвращает количество строк в DataFrame.
9. Результат операции `count()` сохраняется в переменной `line_count`.
10. Выводится сообщение с помощью оператора `print()`, которое содержит количество строк в файле.
11. Вызывается метод `stop()` на объекте `SparkContext` (`sc`), чтобы закрыть соединение с кластером Spark.

Таким образом, данный код загружает CSV-файл с использованием Spark SQL, подсчитывает количество строк в файле и выводит это число на экран.

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("CountCSVLinesExample")
sc = SparkContext(conf=conf)

spark = SparkSession.builder.getOrCreate()

csv_file_path = "path/to/your/csv/file.csv"

df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

line_count = df.count()

print("Количество строк в файле:", line_count)

sc.stop()

### Задача 6

У вас есть RDD со следующей структурой: (ключ, значение). Напишите программу на Apache Spark, которая найдет сумму значений для каждого уникального ключа, но только для ключей, значение которых больше заданного порога.

Данный код использует PySpark для фильтрации и суммирования значений, превышающих заданный порог, в распределенном наборе данных (RDD).

Вот пошаговое описание кода:

1. Импортируются необходимые модули: `SparkContext` и `SparkConf` из пакета `pyspark`.
2. Создается объект `SparkConf` с настройками для приложения Spark. В данном случае, устанавливается имя приложения "SumValuesAboveThreshold".
3. Создается объект `SparkContext` (`sc`), который представляет соединение с кластером Spark с использованием настроек из `SparkConf`.
4. Создается список `data`, который содержит пары ключ-значение. Каждая пара представлена в виде кортежа.
5. Создается RDD (`rdd`) из списка `data` с помощью метода `parallelize()`. RDD - это распределенный неизменяемый набор данных, на котором можно выполнять параллельные операции.
6. Устанавливается значение переменной `threshold`, которое определяет пороговое значение для фильтрации.
7. Выполняется операция `filter(lambda x: x[1] > threshold)` на RDD `rdd`. Операция `filter()` фильтрует элементы RDD с использованием заданного условия, в данном случае, значения второго элемента кортежа должны быть больше порогового значения.
8. Результат фильтрации сохраняется в переменной `filtered_rdd`.
9. Выполняется операция `reduceByKey(lambda a, b: a + b)` на `filtered_rdd`. Операция `reduceByKey()` суммирует значения для каждого ключа в RDD. В данном случае, значения суммируются с использованием лямбда-функции `lambda a, b: a + b`.
10. Результат операции `reduceByKey()` сохраняется в переменной `result`.
11. Затем происходит итерация по результатам (`result`), и для каждой пары ключ-значение выводится ключ и значение с помощью оператора `print()`.
12. Наконец, вызывается метод `stop()` на объекте `SparkContext` (`sc`), чтобы закрыть соединение с кластером Spark.

Таким образом, данный код фильтрует значения в RDD, оставляя только те, которые превышают заданный порог, а затем суммирует значения для каждого ключа и выводит результаты на экран.

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("SumValuesAboveThreshold")
sc = SparkContext(conf=conf)

data = [("key1", 10), ("key2", 20), ("key1", 30), ("key3", 40), ("key2", 50)]

rdd = sc.parallelize(data)

threshold = 25

filtered_rdd = rdd.filter(lambda x: x[1] > threshold)

result = filtered_rdd.reduceByKey(lambda a, b: a + b)

for key, value in result.collect():
    print(f"{key}: {value}")

sc.stop()

key1: 30
key2: 50
key3: 40


### Задача 7

У вас есть RDD со следующей структурой: (ключ, список значений). Напишите программу на Apache Spark, которая найдет среднюю длину списка значений для каждого ключа.

Данный код использует PySpark для нахождения средней длины списков значений для каждого ключа в распределенном наборе данных (RDD) с использованием Spark SQL и DataFrame API.

Вот пошаговое описание кода:

1. Импортируется модуль `SparkSession` из пакета `pyspark.sql`.
2. Создается объект `spark` с помощью метода `SparkSession.builder.appName("AverageListLength").getOrCreate()`. `SparkSession` предоставляет доступ к функциональности Spark SQL.
3. Создается список `data`, который содержит пары ключ-список значений. Каждая пара представлена в виде кортежа.
4. Создается RDD (`rdd`) из списка `data` с помощью метода `spark.sparkContext.parallelize(data)`. RDD - это распределенный неизменяемый набор данных, на котором можно выполнять параллельные операции.
5. Выполняется операция `mapValues(lambda x: len(x))` на RDD `rdd`. Операция `mapValues()` применяет заданную функцию к каждому значению в RDD, в данном случае, функция `lambda x: len(x)` возвращает длину списка значений.
6. Результат операции `mapValues()` сохраняется в переменной `rdd_key_length`.
7. Выполняется операция `combineByKey()` на `rdd_key_length`. Операция `combineByKey()` используется для агрегации значений по ключу. В данном случае, используются три функции:
   - Первая функция `lambda x: (x, 1)` преобразует каждое значение в кортеж `(значение, 1)`.
   - Вторая функция `lambda acc, x: (acc[0] + x, acc[1] + 1)` объединяет значения для каждого ключа, складывая длины и подсчитывая количество списков значений.
   - Третья функция `lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])` объединяет результаты агрегации для разных партиций или ключей.
8. Результат операции `combineByKey()` сохраняется в переменной `rdd_sum_count`.
9. Выполняется операция `mapValues(lambda x: x[0] / x[1])` на `rdd_sum_count`. Операция `mapValues()` применяет заданную функцию к каждому значению в RDD, в данном случае, функция `lambda x: x[0] / x[1]` вычисляет среднюю длину путем деления суммы длин на количество списков значений.
10. Результат операции `mapValues()` сохраняется в переменной `rdd_avg_length`.
11. Выполняется операция `collect()` на `rdd_avg_length`, чтобы получить все результаты.
12. Затем происходит итерация по результатам (`result`), и для каждого ключа и средней длины выводится сообщение с помощью оператора `print()`.
13. Вызывается метод `stop()` на объекте `spark`, чтобы закрыть соединение с кластером Spark.

Таким образом, данный код находит среднюю длину списков значений для каждого ключа в RDD и выводит результаты на экран.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AverageListLength").getOrCreate()

data = [("key1", [1, 2, 3]),
        ("key2", [4, 5]),
        ("key1", [6, 7, 8, 9]),
        ("key2", [10, 11, 12])]

rdd = spark.sparkContext.parallelize(data)

rdd_key_length = rdd.mapValues(lambda x: len(x))

rdd_sum_count = rdd_key_length.combineByKey(
    lambda x: (x, 1),
    lambda acc, x: (acc[0] + x, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)

rdd_avg_length = rdd_sum_count.mapValues(lambda x: x[0] / x[1])

result = rdd_avg_length.collect()
for key, avg_length in result:
    print("Key: {}, Average Length: {}".format(key, avg_length))

spark.stop()

Key: key1, Average Length: 3.5
Key: key2, Average Length: 2.5


### Задача 8

У вас есть RDD, содержащий записи о посещениях веб-сайта со следующей структурой: (пользователь, дата, время). Напишите программу на Apache Spark, которая найдет количество уникальных пользователей для каждой даты.

Данный код использует PySpark для подсчета уникальных пользователей для каждой даты в распределенном наборе данных (RDD) с использованием Spark SQL и DataFrame API.

Вот пошаговое описание кода:

1. Импортируется модуль `SparkSession` из пакета `pyspark.sql`.
2. Создается объект `spark` с помощью метода `SparkSession.builder.appName("UniqueUsersPerDate").getOrCreate()`. `SparkSession` предоставляет доступ к функциональности Spark SQL.
3. Создается список `data`, который содержит записи о пользователях с указанием имени пользователя, даты и времени.
4. Создается RDD (`rdd`) из списка `data` с помощью метода `spark.sparkContext.parallelize(data)`. RDD - это распределенный неизменяемый набор данных, на котором можно выполнять параллельные операции.
5. Выполняется операция `map(lambda x: (x[1], x[0]))` на RDD `rdd`. Операция `map()` применяет заданную функцию к каждому элементу в RDD, в данном случае, функция `lambda x: (x[1], x[0])` меняет местами значение даты и имени пользователя, чтобы ключом стала дата.
6. Результат операции `map()` сохраняется в переменной `rdd_date_user`.
7. Выполняется операция `distinct()` на `rdd_date_user`. Операция `distinct()` удаляет дублирующиеся элементы из RDD и оставляет только уникальные значения.
8. Результат операции `distinct()` сохраняется в переменной `rdd_unique_users`.
9. Выполняется операция `countByKey()` на `rdd_unique_users`. Операция `countByKey()` подсчитывает количество значений для каждого ключа в RDD и возвращает словарь, где ключами являются уникальные даты, а значениями - количество уникальных пользователей для каждой даты.
10. Результат операции `countByKey()` сохраняется в переменной `result`.
11. Затем происходит итерация по результатам (`result`), и для каждой даты и количества уникальных пользователей выводится сообщение с помощью оператора `print()`.
12. Вызывается метод `stop()` на объекте `spark`, чтобы закрыть соединение с кластером Spark.

Таким образом, данный код находит уникальных пользователей для каждой даты в RDD и выводит результаты на экран.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("UniqueUsersPerDate").getOrCreate()

data = [("user1", "2024-04-01", "10:00:00"),
        ("user2", "2024-04-01", "11:30:00"),
        ("user3", "2024-04-02", "09:45:00"),
        ("user1", "2024-04-02", "14:20:00"),
        ("user2", "2024-04-03", "16:30:00"),
        ("user3", "2024-04-03", "18:15:00")]

rdd = spark.sparkContext.parallelize(data)

rdd_date_user = rdd.map(lambda x: (x[1], x[0]))

rdd_unique_users = rdd_date_user.distinct()

result = rdd_unique_users.countByKey()

for date, count in result.items():
    print("Date: {}, Unique Users: {}".format(date, count))

spark.stop()

Date: 2024-04-02, Unique Users: 2
Date: 2024-04-03, Unique Users: 2
Date: 2024-04-01, Unique Users: 2


### Задача 9

У вас есть два RDD с одинаковой структурой: (ключ, значение). Напишите программу на Apache Spark, которая выполнит объединение (join) этих RDD по ключу и найдет сумму значений для каждого уникального ключа.

Данный код использует PySpark для объединения двух распределенных наборов данных (RDD) по ключу и вычисления суммы значений для каждого ключа с использованием Spark SQL и DataFrame API.

Вот пошаговое описание кода:

1. Импортируется модуль `SparkSession` из пакета `pyspark.sql`.
2. Создается объект `spark` с помощью метода `SparkSession.builder.appName("RDDJoinAndSum").getOrCreate()`. `SparkSession` предоставляет доступ к функциональности Spark SQL.
3. Создается список `data1`, который содержит пары ключ-значение.
4. Создается RDD (`rdd1`) из списка `data1` с помощью метода `spark.sparkContext.parallelize(data1)`.
5. Создается список `data2`, который содержит пары ключ-значение.
6. Создается RDD (`rdd2`) из списка `data2` с помощью метода `spark.sparkContext.parallelize(data2)`.
7. Выполняется операция `join(rdd2)` на RDD `rdd1`. Операция `join()` объединяет два RDD по ключу, в данном случае, по ключу "key". Результат операции сохраняется в переменной `rdd_join`.
8. Выполняется операция `mapValues(lambda x: x[0] + x[1])` на `rdd_join`. Операция `mapValues()` применяет заданную функцию к каждому значению в RDD, в данном случае, функция `lambda x: x[0] + x[1]` складывает значения по ключу. Результат операции сохраняется в переменной `rdd_sum`.
9. Выполняется операция `collect()` на `rdd_sum`, чтобы получить все результаты.
10. Затем происходит итерация по результатам (`result`), и для каждого ключа и суммы значений выводится сообщение с помощью оператора `print()`.
11. Вызывается метод `stop()` на объекте `spark`, чтобы закрыть соединение с кластером Spark.

Таким образом, данный код объединяет два RDD по ключу и вычисляет сумму значений для каждого ключа, выводя результаты на экран.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDDJoinAndSum").getOrCreate()

data1 = [("key1", 1),
         ("key2", 2),
         ("key3", 3)]

rdd1 = spark.sparkContext.parallelize(data1)

data2 = [("key1", 4),
         ("key2", 5),
         ("key4", 6)]

rdd2 = spark.sparkContext.parallelize(data2)

rdd_join = rdd1.join(rdd2)

rdd_sum = rdd_join.mapValues(lambda x: x[0] + x[1])

result = rdd_sum.collect()
for key, sum_value in result:
    print("Key: {}, Sum: {}".format(key, sum_value))

spark.stop()

Key: key1, Sum: 5
Key: key2, Sum: 7


### Задача 10

Подсчет количества уникальных слов в тексте с использованием DataFrame

Данный код использует PySpark для подсчета количества уникальных слов в заданном тексте с использованием Spark SQL и DataFrame API.

Вот пошаговое описание кода:

1. Импортируются модули `SparkSession`, `explode`, `split` и `StringType` из соответствующих пакетов.
2. Создается объект `spark` с помощью метода `SparkSession.builder`. Устанавливается имя приложения с помощью `appName("CountUniqueWords")`.
3. Указывается режим запуска приложения с помощью `master("local[*]")`, где `"local[*]"` означает запуск приложения локально с использованием всех доступных ядер процессора.
4. Вызывается метод `getOrCreate()` на объекте `spark` для создания или получения существующего сеанса Spark.
5. Задается текст в виде списка строк `text`, который будет использован для создания DataFrame.
6. Создается DataFrame `textDF` с помощью метода `spark.createDataFrame(text, StringType()).toDF("text")`. Здесь каждая строка текста представляется как отдельная запись в DataFrame.
7. Выполняется операция `split("text", " ")` на столбце `"text"` DataFrame `textDF`. Операция `split()` разделяет каждую строку текста на отдельные слова, используя пробел как разделитель.
8. Выполняется операция `explode(...).alias("word")` на результате операции `split()`. Операция `explode()` создает новую строку для каждого элемента в массиве слов. Результат операции сохраняется в DataFrame `wordsDF` с столбцом `"word"`.
9. Выполняется операция `distinct()` на DataFrame `wordsDF`. Операция `distinct()` удаляет дублирующиеся строки и оставляет только уникальные слова.
10. Результат операции `distinct()` сохраняется в DataFrame `uniqueWordsDF`.
11. Выполняется операция `count()` на DataFrame `uniqueWordsDF` для подсчета количества уникальных слов.
12. Результат подсчета сохраняется в переменной `uniqueWordCount`.
13. Выводится сообщение с помощью оператора `print()` с указанием количества уникальных слов.
14. Вызывается метод `stop()` на объекте `spark`, чтобы закрыть соединение с кластером Spark.

Таким образом, данный код создает DataFrame из заданного текста, разбивает текст на отдельные слова, находит уникальные слова и подсчитывает их количество, выводя результат на экран.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StringType

spark = SparkSession.builder \
    .appName("CountUniqueWords") \
    .master("local[*]") \
    .getOrCreate()

text = ["Hello Spark", "Hello Scala", "Spark is awesome"]
textDF = spark.createDataFrame(text, StringType()).toDF("text")
wordsDF = textDF.select(explode(split("text", " ")).alias("word"))
uniqueWordsDF = wordsDF.distinct()
uniqueWordCount = uniqueWordsDF.count()

print("Unique word count:", uniqueWordCount)

spark.stop()

Unique word count: 5


Данный код использует PySpark для создания объекта DataFrame из заданного списка словарей, фильтрации DataFrame по возрасту и вывода результата на экран.

Вот пошаговое описание кода:

1. Импортируется модуль `SparkSession` из пакета `pyspark.sql`.
2. Создается объект `spark` с помощью метода `SparkSession.builder`. Устанавливается имя приложения с помощью `appName("OOP Example")`.
3. Вызывается метод `getOrCreate()` на объекте `spark` для создания или получения существующего сеанса Spark.
4. Задается список словарей `data`, представляющий данные, из которых будет создан DataFrame.
5. Создается DataFrame `df` с помощью метода `spark.createDataFrame(data)`. DataFrame создается на основе заданного списка словарей, где каждый словарь представляет отдельную запись в DataFrame.
6. Выполняется операция фильтрации на DataFrame `df` с помощью метода `filter(df.age > 30)`. Здесь фильтр применяется к столбцу `age` и оставляет только записи, где значение столбца `age` больше 30.
7. Результат фильтрации сохраняется в DataFrame `df_filtered`.
8. Вызывается метод `show()` на DataFrame `df_filtered`, чтобы вывести результат фильтрации на экран.
9. Вызывается метод `stop()` на объекте `spark`, чтобы закрыть соединение с кластером Spark.

Таким образом, данный код создает DataFrame из заданного списка словарей, фильтрует записи по возрасту (>30) и выводит результат фильтрации на экран.

In [None]:
from pyspark.sql import SparkSession

# Создание объекта SparkSession
spark = SparkSession.builder \
    .appName("OOP Example") \
    .getOrCreate()

data = [
    {"name": "Alice", "age": 25},
    {"name": "Bob", "age": 30},
    {"name": "Charlie", "age": 35}
]
df = spark.createDataFrame(data)

df_filtered = df.filter(df.age > 30)
df_filtered.show()

spark.stop()