In [2]:
# Встановлення PySpark
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType
from pyspark.sql.functions import col, split, explode, lower, regexp_replace, count, sum, avg, max, min, desc, year, month, format_number
import pyspark.sql.functions as F

# Створення SparkSession
spark = SparkSession.builder.master('local').appName('Lab4').getOrCreate()

print("="*80)
print("ЧАСТИНА 1: Основні операції з DataFrame")
print("="*80)

ЧАСТИНА 1: Основні операції з DataFrame


In [3]:
print("\n" + "="*80)
print("Завдання 1: Створення DataFrame з складною структурою")
print("="*80)

# Дані з вкладеними структурами
data = [
    (("Іван", "Петрович", "Коваль"), "001", ("Київ", "вул. Хрещатик, 1"), 35000, ["Python", "Spark", "SQL"]),
    (("Марія", "Олександрівна", "Петренко"), "002", ("Львів", "вул. Стрийська, 25"), 42000, ["Java", "Scala"]),
    (("Олександр", "", "Сидоренко"), "003", ("Одеса", "вул. Дерибасівська, 10"), 38000, ["Python", "ML"]),
    (("Анна", "Василівна", "Іваненко"), "004", ("Харків", "просп. Науки, 5"), 45000, ["Data Science", "Python", "R"]),
    (("Дмитро", "Іванович", "Мельник"), "005", ("Дніпро", "вул. Набережна, 15"), 40000, ["Big Data", "Spark"])
]

# Визначення схеми з вкладеними структурами
schema = StructType([
    StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
        StructField('lastname', StringType(), True)
    ])),
    StructField('employee_id', StringType(), True),
    StructField('address', StructType([
        StructField('city', StringType(), True),
        StructField('street', StringType(), True)
    ])),
    StructField('salary', IntegerType(), True),
    StructField('skills', StringType(), True)
])

df1 = spark.createDataFrame(data=data, schema=schema)

print("\nСхема DataFrame:")
df1.printSchema()

print("\nВміст DataFrame:")
df1.show(truncate=False)

# Демонстрація роботи з collect()
print("\n--- Демонстрація функції collect() ---")
print("\n1. Отримання всіх елементів:")
all_data = df1.collect()
print(f"Кількість рядків: {len(all_data)}")

print("\n2. Перший рядок:")
first_row = df1.collect()[0]
print(first_row)

print("\n3. Перша комірка (об'єкт name):")
first_cell = df1.collect()[0][0]
print(first_cell)

print("\n4. Ім'я першого співробітника:")
first_name = df1.collect()[0][0][0]
print(f"Ім'я: {first_name}")

print("\n5. Вибірка певних стовпців:")
df1.select("name.firstname", "name.lastname", "address.city", "salary").show()


Завдання 1: Створення DataFrame з складною структурою

Схема DataFrame:
root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- employee_id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- skills: string (nullable = true)


Вміст DataFrame:
+--------------------------------+-----------+-------------------------------+------+-------------------------+
|name                            |employee_id|address                        |salary|skills                   |
+--------------------------------+-----------+-------------------------------+------+-------------------------+
|{Іван, Петрович, Коваль}        |001        |{Київ, вул. Хрещатик, 1}       |35000 |[Python, Spark, SQL]     |
|{Марія, Олександрівна, Петренко}|002 

In [4]:
print("\n" + "="*80)
print("Завдання 2: Мінімальна температура для кожної станції")
print("="*80)

# Читання даних з файлу problem2_data set.txt
df_temp = spark.read.format("csv") \
    .option("header", "false") \
    .option("inferSchema", "true") \
    .load("problem2_data set.txt")

# Перейменування колонок
df_temp = df_temp.select(
    col("_c0").alias("stationId"),
    col("_c1").alias("date"),
    col("_c2").alias("readingType"),
    col("_c3").alias("temperature")
)

print("\nПерші 10 рядків вхідних даних:")
df_temp.show(10)

# Фільтрація тільки TMIN і пошук мінімуму для кожної станції
result_temp = df_temp.filter(col("readingType") == "TMIN") \
    .groupBy("stationId") \
    .agg(min("temperature").alias("min_temperature"))

print("\nМінімальна температура для кожної станції:")
result_temp.show()


Завдання 2: Мінімальна температура для кожної станції

Перші 10 рядків вхідних даних:
+-----------+--------+-----------+-----------+
|  stationId|    date|readingType|temperature|
+-----------+--------+-----------+-----------+
|ITE00100554|18000101|       TMAX|        -75|
|ITE00100554|18000101|       TMIN|       -148|
|GM000010962|18000101|       PRCP|          0|
|EZE00100082|18000101|       TMAX|        -86|
|EZE00100082|18000101|       TMIN|       -135|
|ITE00100554|18000102|       TMAX|        -60|
|ITE00100554|18000102|       TMIN|       -125|
|GM000010962|18000102|       PRCP|          0|
|EZE00100082|18000102|       TMAX|        -44|
|EZE00100082|18000102|       TMIN|       -130|
+-----------+--------+-----------+-----------+
only showing top 10 rows


Мінімальна температура для кожної станції:
+-----------+---------------+
|  stationId|min_temperature|
+-----------+---------------+
|ITE00100554|           -148|
|EZE00100082|           -135|
+-----------+---------------+



In [5]:
print("\n" + "="*80)
print("Завдання 3: Топ-10 найпопулярніших слів")
print("="*80)

# Читання даних з файлу problem3_data set.txt
df_text = spark.read.text("problem3_data set.txt")

print("\nКількість рядків у файлі:", df_text.count())

# Розбиття тексту на слова, приведення до нижнього регістру
words_df = df_text.select(explode(split(lower(col("value")), "\\s+")).alias("word"))

# Видалення порожніх слів
words_df = words_df.filter(col("word") != "")

word_counts = words_df.groupBy("word") \
    .count() \
    .orderBy(desc("count")) \
    .limit(10)

print("\nТоп-10 найпопулярніших слів:")
word_counts.show(truncate=False)


Завдання 3: Топ-10 найпопулярніших слів

Кількість рядків у файлі: 611

Топ-10 найпопулярніших слів:
+---------+-----+
|word     |count|
+---------+-----+
|data     |361  |
|big      |285  |
|in       |171  |
|training |114  |
|course   |105  |
|hadoop   |100  |
|online   |58   |
|courses  |53   |
|spark    |42   |
|bangalore|40   |
+---------+-----+



In [6]:
print("\n" + "="*80)
print("Завдання 4: Топ-10 клієнтів з максимальними витратами")
print("="*80)

# Читання даних з файлу problem4_data set.txt
df_customers = spark.read.format("csv") \
    .option("header", "false") \
    .option("inferSchema", "true") \
    .load("problem4_data set.txt")

# Перейменування колонок
df_customers = df_customers.select(
    col("_c0").alias("CustomerId"),
    col("_c1").alias("ProductId"),
    col("_c2").alias("Amount")
)

print("\nПерші 10 рядків вхідних даних:")
df_customers.show(10)

# Групування за клієнтами та підрахунок загальної суми витрат
top_customers = df_customers.groupBy("CustomerId") \
    .agg(sum("Amount").alias("TotalSpent")) \
    .orderBy(desc("TotalSpent")) \
    .limit(10)

print("\nТоп-10 клієнтів за витратами:")
top_customers.show()


Завдання 4: Топ-10 клієнтів з максимальними витратами

Перші 10 рядків вхідних даних:
+----------+---------+------+
|CustomerId|ProductId|Amount|
+----------+---------+------+
|        44|     8602| 37.19|
|        35|     5368| 65.89|
|         2|     3391| 40.64|
|        47|     6694| 14.98|
|        29|      680| 13.08|
|        91|     8900| 24.59|
|        70|     3959| 68.68|
|        85|     1733| 28.53|
|        53|     9900| 83.55|
|        14|     1505|  4.32|
+----------+---------+------+
only showing top 10 rows


Топ-10 клієнтів за витратами:
+----------+-----------------+
|CustomerId|       TotalSpent|
+----------+-----------------+
|        68|6375.449999999997|
|        73|6206.199999999999|
|        39|6193.109999999999|
|        54|6065.389999999999|
|        71|5995.660000000003|
|         2|          5994.59|
|        97|5977.189999999995|
|        46|5963.109999999999|
|        42|5696.840000000003|
|        59|          5642.89|
+----------+-----------------+



In [7]:
print("\n" + "="*80)
print("Завдання 5: Аналіз рейтингів фільмів")
print("="*80)

# Створення тестових даних (або завантажте problem5_data set.txt якщо є)
ratings_data = [
    (1, 101, 5, 1234567890),
    (1, 102, 4, 1234567891),
    (2, 101, 5, 1234567892),
    (2, 103, 3, 1234567893),
    (3, 104, 2, 1234567894),
    (3, 105, 1, 1234567895),
    (4, 106, 5, 1234567896),
    (4, 107, 4, 1234567897),
    (5, 108, 3, 1234567898),
    (5, 109, 5, 1234567899),
    (1, 110, 5, 1234567900),
    (2, 111, 4, 1234567901),
    (3, 112, 3, 1234567902),
    (4, 113, 2, 1234567903),
    (5, 114, 1, 1234567904),
]

ratings_columns = ["userid", "movieid", "rating", "timestamp"]
df_ratings = spark.createDataFrame(ratings_data, ratings_columns)

print("\nВхідні дані:")
df_ratings.show(10)

# Підрахунок кількості оцінок для кожної зірки
rating_counts = df_ratings.groupBy("rating") \
    .count() \
    .orderBy(desc("rating"))

print("\nСтатистика рейтингів:")
rating_counts.show()

# Відповіді на питання
for r in range(5, 0, -1):
    count_val = df_ratings.filter(col("rating") == r).count()
    print(f"Фільми оцінювались {r} зірками: {count_val} разів")


Завдання 5: Аналіз рейтингів фільмів

Вхідні дані:
+------+-------+------+----------+
|userid|movieid|rating| timestamp|
+------+-------+------+----------+
|     1|    101|     5|1234567890|
|     1|    102|     4|1234567891|
|     2|    101|     5|1234567892|
|     2|    103|     3|1234567893|
|     3|    104|     2|1234567894|
|     3|    105|     1|1234567895|
|     4|    106|     5|1234567896|
|     4|    107|     4|1234567897|
|     5|    108|     3|1234567898|
|     5|    109|     5|1234567899|
+------+-------+------+----------+
only showing top 10 rows


Статистика рейтингів:
+------+-----+
|rating|count|
+------+-----+
|     5|    5|
|     4|    3|
|     3|    3|
|     2|    2|
|     1|    2|
+------+-----+

Фільми оцінювались 5 зірками: 5 разів
Фільми оцінювались 4 зірками: 3 разів
Фільми оцінювались 3 зірками: 3 разів
Фільми оцінювались 2 зірками: 2 разів
Фільми оцінювались 1 зірками: 2 разів


In [8]:
print("\n" + "="*80)
print("ЧАСТИНА 2: Аналіз даних фондового ринку (Yahoo Finance)")
print("="*80)

# Створення тестових даних для демонстрації
stock_data = [
    ("2010-01-04", 30.57, 30.74, 30.21, 30.57, 123432400, 27.73),
    ("2010-01-05", 30.66, 30.80, 30.46, 30.63, 150476200, 27.78),
    ("2010-01-06", 30.63, 30.75, 30.11, 30.14, 138040000, 27.33),
    ("2011-01-03", 45.52, 46.23, 45.12, 46.08, 111268600, 41.78),
    ("2012-01-03", 58.75, 59.39, 57.87, 58.95, 140129500, 53.47),
    ("2015-01-02", 111.39, 111.44, 107.35, 109.33, 53204600, 103.22),
    ("2016-01-04", 102.61, 105.37, 102.00, 105.35, 67649400, 99.52),
    ("2018-01-02", 170.16, 172.30, 169.26, 172.26, 118071600, 168.19),
    ("2020-01-02", 296.24, 300.60, 295.19, 300.35, 135647200, 297.43),
    ("2020-03-23", 224.37, 228.47, 214.88, 224.37, 178876600, 222.16),
]

stock_columns = ["Date", "Open", "High", "Low", "Close", "Volume", "Adj Close"]
df_stock = spark.createDataFrame(stock_data, stock_columns)

print("\nДані завантажено успішно!")


ЧАСТИНА 2: Аналіз даних фондового ринку (Yahoo Finance)

Дані завантажено успішно!


In [9]:
# Завдання 1: Схема DataFrame
print("\n1. Схема DataFrame:")
df_stock.printSchema()

# Завдання 2: Перші 10 рядків
print("\n2. Перші 10 рядків з набору даних:")
df_stock.show(10)

# Завдання 3: HV Ratio
print("\n3. DataFrame зі стовпцем HV Ratio:")
df_stock_hv = df_stock.withColumn("HV_Ratio", col("High") / col("Volume"))
df_stock_hv.select("Date", "High", "Volume", "HV_Ratio").show(10)

# Завдання 4: День з піковою ціною
print("\n4. День з піковою ціною Close:")
peak_day = df_stock.orderBy(desc("Close")).limit(1)
peak_day.select("Date", "Close").show()

# Завдання 5: Середнє значення Close
print("\n5. Середнє значення стовпця Close:")
avg_close = df_stock.agg(avg("Close").alias("Average_Close"))
avg_close.show()


1. Схема DataFrame:
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Adj Close: double (nullable = true)


2. Перші 10 рядків з набору даних:
+----------+------+------+------+------+---------+---------+
|      Date|  Open|  High|   Low| Close|   Volume|Adj Close|
+----------+------+------+------+------+---------+---------+
|2010-01-04| 30.57| 30.74| 30.21| 30.57|123432400|    27.73|
|2010-01-05| 30.66|  30.8| 30.46| 30.63|150476200|    27.78|
|2010-01-06| 30.63| 30.75| 30.11| 30.14|138040000|    27.33|
|2011-01-03| 45.52| 46.23| 45.12| 46.08|111268600|    41.78|
|2012-01-03| 58.75| 59.39| 57.87| 58.95|140129500|    53.47|
|2015-01-02|111.39|111.44|107.35|109.33| 53204600|   103.22|
|2016-01-04|102.61|105.37| 102.0|105.35| 67649400|    99.52|
|2018-01-02|170.16| 172.3|169.26|172.26|118071600|   168.19|
|2020-01-

In [10]:
# Завдання 6: Максимальне та мінімальне значення Volume
print("\n6. Максимальне та мінімальне значення Volume:")
volume_stats = df_stock.agg(
    max("Volume").alias("Max_Volume"),
    min("Volume").alias("Min_Volume")
)
volume_stats.show()

# Завдання 7: Кількість днів з Close < 60
print("\n7. Кількість днів, коли Close < 60:")
days_below_60 = df_stock.filter(col("Close") < 60).count()
print(f"Днів з Close < 60: {days_below_60}")

# Завдання 8: Відсоток часу, коли High > 80
print("\n8. Відсоток часу, коли High > 80:")
total_days = df_stock.count()
days_high_above_80 = df_stock.filter(col("High") > 80).count()
percentage = (days_high_above_80 / total_days) * 100 if total_days > 0 else 0
print(f"Відсоток: {percentage:.2f}%")


6. Максимальне та мінімальне значення Volume:
+----------+----------+
|Max_Volume|Min_Volume|
+----------+----------+
| 178876600|  53204600|
+----------+----------+


7. Кількість днів, коли Close < 60:
Днів з Close < 60: 5

8. Відсоток часу, коли High > 80:
Відсоток: 50.00%


In [11]:
# Завдання 9: Щорічний максимум High
print("\n9. Щорічний максимум High:")
# Конвертуємо стовпець Date у тип Date
df_stock_date = df_stock.withColumn("Date", col("Date").cast("date"))
yearly_max = df_stock_date.withColumn("Year", year("Date")) \
    .groupBy("Year") \
    .agg(max("High").alias("Max_High")) \
    .orderBy("Year")
yearly_max.show()

# Завдання 10: Середнє Close для кожного місяця
print("\n10. Середнє Close для кожного календарного місяця:")
monthly_avg = df_stock_date.withColumn("Month", month("Date")) \
    .groupBy("Month") \
    .agg(avg("Close").alias("Avg_Close")) \
    .orderBy("Month")
monthly_avg.show()


9. Щорічний максимум High:
+----+--------+
|Year|Max_High|
+----+--------+
|2010|    30.8|
|2011|   46.23|
|2012|   59.39|
|2015|  111.44|
|2016|  105.37|
|2018|   172.3|
|2020|   300.6|
+----+--------+


10. Середнє Close для кожного календарного місяця:
+-----+-----------------+
|Month|        Avg_Close|
+-----+-----------------+
|    1|98.18444444444444|
|    3|           224.37|
+-----+-----------------+



In [12]:
print("\n" + "="*80)
print("ВИСНОВКИ")
print("="*80)
print("""
У ході виконання лабораторної роботи було:

1. Освоєно основні операції створення DataFrame в PySpark різними способами:
   - Ручне створення з складними вкладеними структурами
   - Завантаження з файлів
   - Перетворення з RDD

2. Продемонстровано роботу з функцією collect() для отримання даних з DataFrame:
   - Отримання всіх елементів
   - Доступ до окремих рядків та комірок
   - Робота зі складними структурами даних

3. Виконано практичні завдання з аналізу даних:
   - Агрегування даних за групами (groupBy)
   - Фільтрація та сортування даних
   - Робота з текстовими даними та підрахунок частоти слів
   - Обчислення статистичних показників

4. Проведено аналіз фінансових даних фондового ринку:
   - Створення нових обчислюваних стовпців
   - Знаходження максимальних, мінімальних та середніх значень
   - Робота з датами та часовими рядами
   - Групування даних за роками та місяцями

PySpark DataFrame надає потужний та зручний інтерфейс для роботи з великими
обсягами даних, підтримує SQL-подібний синтаксис та автоматично оптимізує
виконання запитів завдяки вбудованому оптимізатору Catalyst.
""")


ВИСНОВКИ

У ході виконання лабораторної роботи було:

1. Освоєно основні операції створення DataFrame в PySpark різними способами:
   - Ручне створення з складними вкладеними структурами
   - Завантаження з файлів
   - Перетворення з RDD

2. Продемонстровано роботу з функцією collect() для отримання даних з DataFrame:
   - Отримання всіх елементів
   - Доступ до окремих рядків та комірок
   - Робота зі складними структурами даних

3. Виконано практичні завдання з аналізу даних:
   - Агрегування даних за групами (groupBy)
   - Фільтрація та сортування даних
   - Робота з текстовими даними та підрахунок частоти слів
   - Обчислення статистичних показників

4. Проведено аналіз фінансових даних фондового ринку:
   - Створення нових обчислюваних стовпців
   - Знаходження максимальних, мінімальних та середніх значень
   - Робота з датами та часовими рядами
   - Групування даних за роками та місяцями

PySpark DataFrame надає потужний та зручний інтерфейс для роботи з великими
обсягами даних,