# Задание 2

Генерация синтетических данных с использованием Apache Spark

Цель задания: Использовать Apache Spark для создания синтетического набора данных, который имитирует информацию о покупках в интернет-магазине. 
- Набор данных должен включать в себя информацию о заказах, включая дату заказа, идентификатор пользователя, название товара, количество и цену. Сгенерированные данные будут использованы для последующего анализа покупательской активности и понимания потребительских трендов.

Шаги выполнения:
1. Генерация данных:

- Создать DataFrame с полями: Дата, UserID, Продукт, Количество, Цена.

- Данные для поля Продукт генерируются из списка возможных товаров ( не меньше 5 товаров )

- Количество и Цена должны генерироваться случайно в заданных пределах.

- Дата должна быть в пределах последнего года.

- UserID представляет собой случайное число, имитирующее идентификаторы пользователей.

- Обратите внимание, что должна быть возможности изменять количество сгенерированных строк. Минимальное количество - 1000 строк.

2. Сохранение данных:

- Сохранить сгенерированный DataFrame в формате CSV для последующего анализа.

- Результат выполнения задания необходимо выложить в github/gitlab и указать ссылку на Ваш репозиторий (не забудьте: репозиторий должен быть публичным).

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

from pyspark.sql.functions import rand, col
import random
from datetime import datetime, timedelta

import findspark
findspark.init()
findspark.find()

'/home/anton/.local/lib/python3.10/site-packages/pyspark'

In [3]:
spark = SparkSession.builder \
    .appName("task_3.6_2") \
    .getOrCreate()

24/06/30 13:27:30 WARN Utils: Your hostname, ubuntu-prox resolves to a loopback address: 127.0.1.1; using 192.168.1.30 instead (on interface enp6s18)
24/06/30 13:27:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/30 13:27:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Read CSV
# df = spark.read.csv("/tmp/resources/zipcodes.csv")

1. Генерация данных:

- Создать DataFrame с полями: Дата, UserID, Продукт, Количество, Цена.

- Данные для поля Продукт генерируются из списка возможных товаров ( не меньше 5 товаров )

- Количество и Цена должны генерироваться случайно в заданных пределах.

- Дата должна быть в пределах последнего года.

- UserID представляет собой случайное число, имитирующее идентификаторы пользователей.

- Обратите внимание, что должна быть возможности изменять количество сгенерированных строк. Минимальное количество - 1000 строк.

In [5]:
num_records = 1000
products = ["Молоко", "Ноутбук", "Книга", "Сноу-Борд", "Кружка"]
min_quantity = 1
max_quantity = 10
min_price = 10.0
max_price = 100.0

In [6]:
# Создать DataFrame с полями: Дата, UserID, Продукт, Количество, Цена.

data = [(datetime.now() - timedelta(days=random.randint(0, 365)), # Дата должна быть в пределах последнего года (365 дней).
        random.randint(1, 1000), # UserID представляет собой случайное число, имитирующее идентификаторы пользователей.
        random.choice(products), # Продукт. Данные для поля Продукт генерируются из списка возможных товаров ( не меньше 5 товаров )
        random.randint(min_quantity, max_quantity), # Количество. Генерируется случайно в заданных пределах min_quantity-max_quantity.
        round(random.uniform(min_price, max_price),2)) for _ in range(num_records)] # Цена. Генерируется случайно в заданных пределах min_price-max_price.

df = spark.createDataFrame(data, ["Дата", "UserID", "Продукт", "Количество", "Цена"])

2. Сохранение данных:

- Сохранить сгенерированный DataFrame в формате CSV для последующего анализа.

- Результат выполнения задания необходимо выложить в github/gitlab и указать ссылку на Ваш репозиторий (не забудьте: репозиторий должен быть публичным).

In [7]:
df.show(10)

                                                                                

+--------------------+------+---------+----------+-----+
|                Дата|UserID|  Продукт|Количество| Цена|
+--------------------+------+---------+----------+-----+
|2023-10-01 13:27:...|   550|Сноу-Борд|         8|12.92|
|2024-01-01 13:27:...|   815|Сноу-Борд|         6|25.94|
|2023-11-18 13:27:...|   313|    Книга|         4|21.39|
|2023-12-01 13:27:...|   950|   Кружка|         3|52.92|
|2024-01-25 13:27:...|   518|    Книга|         6|56.83|
|2024-05-18 13:27:...|   461|    Книга|         9|43.67|
|2024-06-03 13:27:...|   450|  Ноутбук|         3|22.46|
|2023-10-01 13:27:...|   733|   Кружка|         5|94.72|
|2024-06-26 13:27:...|   626|Сноу-Борд|         3|76.14|
|2023-10-21 13:27:...|   801|   Кружка|         6|10.94|
+--------------------+------+---------+----------+-----+
only showing top 10 rows



https://sparkbyexamples.com/pyspark/pyspark-write-dataframe-to-csv-file/

In [11]:
# создается папка 3.6_2.csv в ней файлы экспорта
df.coalesce(1) \
  .write \
  .format("csv") \
  .options(header='True', delimiter=',') \
  .mode('overwrite') \
  .save("3.6_2.csv")

In [9]:
# df.write \
#     .format("csv") \
#     .options(header='True', delimiter=',') \
#     .mode('overwrite') \
#     .save("3.6_2.csv")

In [10]:
# csv_file_path = "data.csv"
# df.write \
#         .options(header='True', delimiter=',') \
#         .mode('overwrite') \
#         .csv(csv_file_path)