 ### Работа с различными источниками данных

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
from pyspark.sql.functions import count, when, col

spark = SparkSession.builder \
    .appName("Module_2_DataFrame_Basics") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", 4) \
    .getOrCreate()

# 1. Прочитайте CSV файл с явной схемой

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("department", StringType(), True),
    StructField("hire_date", StringType(), True),
    StructField("is_active", BooleanType(), True)
])

path_file = "..//../data/employees.csv"

df = spark.read.csv(
    path_file,
    header=True,
    schema=schema
)

# 2. Выведите информацию о DataFrame
print(f"Количество строк: {df.count()}")
print(f"Количество столбцов: {len(df.columns)}")
df.printSchema()
df.describe().show()

# 3. Проверьте наличие NULL значений
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


Количество строк: 10000
Количество столбцов: 7
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- is_active: boolean (nullable = true)

+-------+------------------+---------+----------------+-----------------+----------+----------+
|summary|                id|     name|             age|           salary|department| hire_date|
+-------+------------------+---------+----------------+-----------------+----------+----------+
|  count|             10000|    10000|           10000|            10000|     10000|     10000|
|   mean|            5000.5|     NULL|         48.8047|       89558.4295|      NULL|      NULL|
| stddev|2886.8956799071675|     NULL|18.2766124149454|34605.27441500447|      NULL|      NULL|
|    min|                 1|   User_1|              18|            30000|   Finance|2020-11-07|
|   

### Основные операции с DataFrame

In [8]:
# 1. Выберите только нужные столбцы
selected_df = df.select("name", "age", "salary", "department")

# 2. Отфильтруйте данные
filtered_df = selected_df.filter((col("age") > 25) & (col("salary") > 50000))

# 3. Отсортируйте по зарплате
sorted_df = filtered_df.orderBy(col("salary").desc())

# 4. Покажите результат
sorted_df.show(10)

# 5. Найдите уникальные департаменты
df.select("department").distinct().show()

+---------+---+------+----------+
|     name|age|salary|department|
+---------+---+------+----------+
|User_3787| 30|149988|     Sales|
|User_6004| 37|149970|     Sales|
|User_1446| 26|149967|   Finance|
|User_3659| 52|149964|        HR|
|User_6422| 34|149945|   Finance|
|User_6161| 60|149909|   Finance|
|User_7886| 30|149901|     Sales|
|User_1407| 68|149885|        IT|
|User_4109| 68|149882|        IT|
|User_3945| 44|149869|        IT|
+---------+---+------+----------+
only showing top 10 rows

+----------+
|department|
+----------+
|     Sales|
|        HR|
|        IT|
|   Finance|
| Marketing|
+----------+



### Работа с NULL значениями

In [9]:
# Создайте DataFrame с NULL значениями для практики
# Добавим искусственные NULL значения
df_with_nulls = df.withColumn("bonus", 
    when(col("salary") > 80000, col("salary") * 0.1)
    .otherwise(None)
)

# Покажем строки с NULL в bonus
df_with_nulls.filter(col("bonus").isNull()).show()

# Заполним NULL значениями
df_filled = df_with_nulls.na.fill({"bonus": 0})
df_filled.show()

# Удалим строки с NULL в важных столбцах
df_cleaned = df_with_nulls.na.drop(subset=["name", "age"])
print(f"Оригинальный размер: {df_with_nulls.count()}")
print(f"После очистки: {df_cleaned.count()}")

+---+-------+---+------+----------+----------+---------+-----+
| id|   name|age|salary|department| hire_date|is_active|bonus|
+---+-------+---+------+----------+----------+---------+-----+
|  2| User_2| 76| 52377|        IT|2024-08-13|    false| NULL|
|  3| User_3| 43| 50963|   Finance|2023-08-05|     true| NULL|
|  6| User_6| 34| 33522|        HR|2024-12-08|     true| NULL|
|  7| User_7| 73| 47237|        HR|2025-06-18|    false| NULL|
|  8| User_8| 41| 40584|        HR|2023-09-15|    false| NULL|
|  9| User_9| 42| 62464|   Finance|2021-09-29|    false| NULL|
| 10|User_10| 18| 79601|   Finance|2023-06-07|    false| NULL|
| 11|User_11| 79| 61546|     Sales|2025-08-31|     true| NULL|
| 14|User_14| 27| 49382| Marketing|2022-10-31|     true| NULL|
| 15|User_15| 56| 69706|   Finance|2023-12-17|     true| NULL|
| 19|User_19| 74| 49461|        IT|2022-06-11|     true| NULL|
| 20|User_20| 35| 42436|   Finance|2021-12-15|     true| NULL|
| 24|User_24| 37| 63200|   Finance|2021-06-06|     true

### Работа с различными форматами данных

In [11]:
# 1. Прочитайте CSV и преобразуйте в Parquet
df_csv = spark.read.csv("../../data/employees_extended.csv", header=True, inferSchema=True)
df_csv.write.mode("overwrite").parquet("../../data/employees.parquet")

# 2. Прочитайте Parquet и выполните операции
df_parquet = spark.read.parquet("../../data/employees.parquet")

# 3. Проверьте производительность
import time

start_time = time.time()
csv_count = df_csv.count()
csv_time = time.time() - start_time

start_time = time.time()
parquet_count = df_parquet.count()
parquet_time = time.time() - start_time

print(f"CSV count time: {csv_time:.2f}s, Count: {csv_count}")
print(f"Parquet count time: {parquet_time:.2f}s, Count: {parquet_count}")

CSV count time: 0.06s, Count: 10000
Parquet count time: 0.14s, Count: 10000


### Продвинутая фильтрация и выбор столбцов

In [12]:
from pyspark.sql.functions import col, when, expr, substring

# Работа с данными продаж
sales_df = spark.read.csv("../../data/sales_extended.csv", header=True, inferSchema=True)

# Сложная фильтрация
high_value_sales = sales_df.filter(
    (col("total_amount") > 1000) & 
    (col("quantity") >= 2) & 
    (col("region").isin(["North", "South"]))
)

# Выбор и преобразование столбцов
processed_df = high_value_sales.select(
    col("sale_id"),
    col("product"),
    col("total_amount"),
    col("region"),
    # Создание нового столбца на основе условий
    when(col("total_amount") > 5000, "Premium").otherwise("Regular").alias("customer_tier"),
    # Вычисление на основе других столбцов
    (col("total_amount") / col("quantity")).alias("avg_price_per_unit")
)

processed_df.show(10)

+-------+----------+-----------------+------+-------------+------------------+
|sale_id|   product|     total_amount|region|customer_tier|avg_price_per_unit|
+-------+----------+-----------------+------+-------------+------------------+
|      2|  Keyboard|8256.900000000001| South|      Premium| 825.6900000000002|
|      3|   Monitor|          12304.8| North|      Premium|            1538.1|
|      7|     Mouse|          15964.0| North|      Premium|            1596.4|
|      8|  Keyboard|           3295.4| South|      Regular|            823.85|
|     14|Headphones|7787.200000000001| North|      Premium|           1557.44|
|     15|   Printer|           4836.0| North|      Regular|             806.0|
|     20|     Phone|          5777.64| North|      Premium|           1925.88|
|     25|   Printer|          1299.24| North|      Regular|            649.62|
|     27|  Keyboard|         11883.78| North|      Premium|           1980.63|
|     29|    Camera|9736.230000000001| South|      P