In [6]:
!pip install pyspark transliterate



Импорт требуемых библиотек для работы

In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
import random
import uuid
import datetime
from transliterate import translit

Определения списка данных для генерации имен, городов и доменов

In [8]:
NAMES = ["Михаил", "София", "Александр", "Анастасия", "Артем", "Мария", "Матвей", "Анна", "Максим", "Виктория", None]
CITIES = ["Москва", 'Санкт-Петербург', 'Екатеринбург', 'Новосибирск', 'Казань', None]
DOMAINS = ['mail.ru', 'bk.ru', 'yandex.ru', 'gmail.com', 'hotmail.com']

Класс для генерации данных

In [9]:
class DataGenerator:

    def __init__(self, num=10, spark=None):
        self.num = num
        self.spark = spark

    def generate_name(self):
        return random.choice(NAMES)

    def generate_email(self, name):
        if name is None:
            return None
        try:
            transliterated = translit(name, 'ru', reversed=True).lower()
            return f'{transliterated}{random.randint(0, 100)}@{random.choice(DOMAINS)}'
        except:
            return f'user{random.randint(1000, 9999)}@{random.choice(DOMAINS)}'

    def generate_city(self):
        return random.choice(CITIES)

    def generate_age(self):
        return random.randint(18, 95)

    def generate_salary(self):
        return random.randint(23000, 150000)

    def generate_date(self, age):
        # Дата регистрации не раньше, чем человеку исполнилось 18 лет
        max_years_ago = age - 18
        if max_years_ago <= 0:
            days_ago = 0
        else:
            days_ago = random.randint(0, max_years_ago * 365)

        return datetime.date.today() - datetime.timedelta(days=days_ago)

    def generate_objects(self):
        # Генерируем данные как список Python объектов
        data = []
        for _ in range(self.num):
            age = self.generate_age()
            name = self.generate_name()
            city = self.generate_city()

            record = (
                str(uuid.uuid4()),  # id
                name,  # name
                self.generate_email(name),  # email
                age,  # age
                city,  # city
                self.generate_salary(),  # salary
                self.generate_date(age)  # registration_date
            )
            data.append(record)

        # Определяем схему DataFrame
        schema = StructType([
            StructField("id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("age", IntegerType(), True),
            StructField("city", StringType(), True),
            StructField("salary", IntegerType(), True),
            StructField("registration_date", DateType(), True)
        ])

        # Создаем DataFrame из сгенерированных данных
        return self.spark.createDataFrame(data, schema=schema)

Функции для преобразования данных (очистка, обработка null, шифрование)

In [10]:
def clean_data(df):
    """
    Преобразование сгенерированных данных:
    - Обработка NULL/None значений (удаление)
    - Обработка дубликатов (удаление строки-дубликата)
    - Проверка даты регистрации
    """
    df_cleaned = df.filter(
        (f.col("name").isNotNull()) &
        (f.length(f.col("name")) >= 5) &
        (f.col("city").isNotNull()) &
        (f.length(f.col("city")) >= 7)
    )

    df_cleaned = df_cleaned.filter(
        (f.col("email").isNotNull()) &
        (f.col("email").contains("@")) &
        ((f.col("email").endswith(".ru")) | (f.col("email").endswith(".com")))
    )
    df_cleaned = df_cleaned.dropDuplicates()\
                           .filter(f.col("age") >= 18)

    current_date = datetime.date.today()
    df_cleaned = df_cleaned.filter(
        f.col("registration_date") <= current_date
    )

    return df_cleaned

def encrypt_data(df):
    """
    Простое шифрование чувствительных данных (для примера, используем email)
    """
    df_encrypted = df.withColumn(
        "email_encrypted",
        f.sha2(f.col("email"), 256)
    )

    return df_encrypted

Основной блок кода

In [11]:
spark = SparkSession.builder\
    .appName('DataGenerationApp')\
    .master('local[*]')\
    .getOrCreate()


data_generator = DataGenerator(100, spark)
df_raw = data_generator.generate_objects()

print("Сгенерированные данные:")
df_raw.show(10)

df_cleaned = clean_data(df_raw)

print("\nДанные после очистки:")
df_cleaned.show(10)

print(f"\nСтатистика:\n"
      f"Исходные записи: {df_raw.count()}\n"
      f"После очистки: {df_cleaned.count()}\n"
      f"Удалено записей: {df_raw.count() - df_cleaned.count()}")

df_final = encrypt_data(df_cleaned)

print("\nДанные с шифрованием:")
df_final.show(10)

# Запись в файл
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"/content/{current_time}-dev.csv"

df_final.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)

print(f"\nДанные сохранены в: {output_path}")

spark.stop()

Сгенерированные данные:
+--------------------+---------+--------------------+---+------------+------+-----------------+
|                  id|     name|               email|age|        city|salary|registration_date|
+--------------------+---------+--------------------+---+------------+------+-----------------+
|a89236f9-3c91-414...|   Максим|  maksim89@yandex.ru| 33|      Москва|137423|       2022-10-25|
|86c8bc9e-8ab2-4e8...|    Мария|  marija84@gmail.com| 38|      Казань| 30593|       2008-01-28|
|e26413de-e712-47e...|   Матвей|  matvej92@gmail.com| 84|      Москва|114976|       1962-07-16|
|e8450050-1aea-47f...|Анастасия|anastasija22@hotm...| 89|      Казань|109368|       2012-06-22|
|f13da429-e03b-465...|   Матвей|      matvej93@bk.ru| 81|Екатеринбург|137425|       2012-03-05|
|e3f5c45f-9684-4ee...|    Артем|       artem80@bk.ru| 26|Екатеринбург|134437|       2019-02-28|
|9fa685ab-7df5-409...|   Матвей|  matvej18@yandex.ru| 41|      Казань|138815|       2010-06-08|
|9a2b64f3-14b5-4