In [33]:
import pandas as pd
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.metadata import Metadata 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, unix_timestamp
import time


In [29]:
df = pd.read_csv('electronic_devices.csv')

metadata = Metadata.detect_from_dataframe(df, table_name='electronic_devices')
# metadata.save_to_json(filepath='electronic_devices_v1.json')


In [31]:
synthesizer = GaussianCopulaSynthesizer(metadata)
synthesizer.fit(data=df)

In [32]:
synthetic_data = synthesizer.sample(
    num_rows=1_000_000,
    batch_size=1_000
)

synthetic_data

Sampling rows: 100%|██████████| 1000000/1000000 [00:30<00:00, 32723.79it/s]


Unnamed: 0,customer_id,age,gender,loyalty_member,product_type,sku,rating,order_status,payment_method,total_price,unit_price,quantity,purchase_date,shipping_type,addons,addons_cnt
0,12207,33,Female,No,Headphones,TBL345,3,Cancelled,PayPal,2345.66,301.26,10,2024-05-10,Expedited,"Extended Warranty, Impulse Item, Accessory",47.67
1,16280,43,Female,No,Smartphone,SKU1003,5,Completed,Debit Card,3144.41,739.83,2,2024-09-08,Expedited,"Impulse Item,Accessory,Extended Warranty",35.26
2,11924,73,Male,No,Laptop,SMP234,2,Completed,Cash,4033.30,1063.98,5,2024-09-12,Same Day,"Accessory, Accessory",75.75
3,3878,49,Female,No,Laptop,LTP123,2,Completed,Credit Card,2956.60,220.16,2,2024-03-31,Same Day,"Impulse Item, Extended Warranty, Extended Warr...",1.02
4,1628,58,Female,No,Laptop,SKU1004,5,Cancelled,Credit Card,3717.24,879.88,3,2023-11-22,Express,,2.97
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,7111,59,Female,No,Laptop,SMP234,2,Completed,Bank Transfer,1465.85,515.44,1,2024-09-01,Standard,Extended Warranty,7.82
999996,1185,57,Male,No,Smartwatch,SKU1004,2,Cancelled,Credit Card,2261.26,536.00,4,2024-07-04,Same Day,Accessory,0.04
999997,3741,53,Male,No,Tablet,SKU1005,5,Completed,Paypal,120.45,131.05,1,2024-06-15,Express,"Extended Warranty, Extended Warranty, Impulse ...",31.99
999998,19661,47,Female,No,Headphones,HDP456,3,Completed,Paypal,11337.56,1139.68,9,2024-07-06,Expedited,"Accessory, Accessory, Accessory",101.58


In [35]:
synthetic_data.to_csv('synthetic_generated_data.csv', index=False)

In [34]:
spark = SparkSession.builder.master("local[*]").appName("DataAggregation").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/01 21:49:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [36]:
data = spark.read.csv('synthetic_generated_data.csv', header=True, inferSchema=True)

                                                                                

In [None]:
data = data.withColumn("purchase_date", unix_timestamp("purchase_date", "yyyy-MM-dd").cast("timestamp"))

In [41]:
filtered_data = data.filter(
    (col('purchase_date') == '2024-09-03') &
    (col('addons').isNotNull()) &
    (col('addons').rlike('^[^,]+$'))
)
filtered_data.head()

Row(customer_id=17296, age=64, gender='Female', loyalty_member='No', product_type='Tablet', sku='HDP456', rating=4, order_status='Completed', payment_method='Paypal', total_price=644.0, unit_price=774.88, quantity=2, purchase_date=datetime.datetime(2024, 9, 3, 0, 0), shipping_type='Expedited', addons='Accessory', addons_cnt=3.62)

In [45]:
price_diff_data = filtered_data.select(
    min('unit_price').alias('min_unit_price'),
    max('unit_price').alias('max_unit_price'),
    (max('unit_price') - min('unit_price')).alias('unit_price_diff'),
    min('total_price').alias('min_total_price'),
    max('total_price').alias('max_total_price'),
    (max('total_price') - min('total_price')).alias('total_price_diff')
)
price_diff_data.show()

+--------------+--------------+---------------+---------------+---------------+----------------+
|min_unit_price|max_unit_price|unit_price_diff|min_total_price|max_total_price|total_price_diff|
+--------------+--------------+---------------+---------------+---------------+----------------+
|        124.47|       1139.68|        1015.21|          22.08|        11396.8|        11374.72|
+--------------+--------------+---------------+---------------+---------------+----------------+



In [44]:
grouped_data_gender = filtered_data.groupBy("gender").agg(
    min('unit_price').alias('min_unit_price'),
    max('unit_price').alias('max_unit_price'),
    (max('unit_price') - min('unit_price')).alias('unit_price_diff'),
    min('total_price').alias('min_total_price'),
    max('total_price').alias('max_total_price'),
    (max('total_price') - min('total_price')).alias('total_price_diff')
)
grouped_data_gender.show()

+------+--------------+--------------+---------------+---------------+---------------+----------------+
|gender|min_unit_price|max_unit_price|unit_price_diff|min_total_price|max_total_price|total_price_diff|
+------+--------------+--------------+---------------+---------------+---------------+----------------+
|Female|        124.84|       1139.68|        1014.84|          22.08|        11396.8|        11374.72|
|  Male|        124.47|       1139.66|        1015.19|          22.63|        11396.8|        11374.17|
+------+--------------+--------------+---------------+---------------+---------------+----------------+



In [50]:
grouped_data_age = filtered_data.groupBy("age").agg(
    min('unit_price').alias('min_unit_price'),
    max('unit_price').alias('max_unit_price'),
    (max('unit_price') - min('unit_price')).alias('unit_price_diff'),
    min('total_price').alias('min_total_price'),
    max('total_price').alias('max_total_price'),
    (max('total_price') - min('total_price')).alias('total_price_diff')
).orderBy("age")
grouped_data_age.show()

+---+--------------+--------------+------------------+---------------+---------------+------------------+
|age|min_unit_price|max_unit_price|   unit_price_diff|min_total_price|max_total_price|  total_price_diff|
+---+--------------+--------------+------------------+---------------+---------------+------------------+
| 19|        694.62|       1101.17|406.55000000000007|         221.86|       10610.57|          10388.71|
| 20|        127.14|       1116.98|            989.84|          71.53|        6785.45|           6713.92|
| 21|        181.03|       1137.69| 956.6600000000001|         629.43|        8136.03| 7506.599999999999|
| 22|        138.85|        1134.4| 995.5500000000001|          404.4|        7863.89| 7459.490000000001|
| 23|        124.84|       1134.69|           1009.85|          29.36|        9873.19|           9843.83|
| 24|        313.13|       1139.68| 826.5500000000001|        1047.24|       10719.56|           9672.32|
| 25|        158.54|       1087.19| 928.650000

In [57]:
grouped_data_gender_age = filtered_data.groupBy("gender", "age").agg(
    min('unit_price').alias('min_unit_price'),
    max('unit_price').alias('max_unit_price'),
    (max('unit_price') - min('unit_price')).alias('unit_price_diff'),
    min('total_price').alias('min_total_price'),
    max('total_price').alias('max_total_price'),
    (max('total_price') - min('total_price')).alias('total_price_diff')
).orderBy('age')
grouped_data_gender_age.show()

+------+---+--------------+--------------+------------------+---------------+---------------+------------------+
|gender|age|min_unit_price|max_unit_price|   unit_price_diff|min_total_price|max_total_price|  total_price_diff|
+------+---+--------------+--------------+------------------+---------------+---------------+------------------+
|Female| 19|        972.06|       1101.17|129.11000000000013|        6649.99|       10610.57|           3960.58|
|  Male| 19|        694.62|        694.62|               0.0|         221.86|         221.86|               0.0|
|Female| 20|        127.14|       1116.98|            989.84|         281.07|        6785.45|           6504.38|
|  Male| 20|        659.87|        991.55|331.67999999999995|          71.53|        5045.73|            4974.2|
|  Male| 21|        181.03|       1137.69| 956.6600000000001|         629.43|        8136.03| 7506.599999999999|
|Female| 21|        863.64|        863.64|               0.0|        4918.55|        4918.55|   

In [59]:
aggregated_data_gender = grouped_data_gender.collect()
aggregated_data_age = grouped_data_age.collect()
aggregated_data_gender_age = grouped_data_gender_age.collect()

aggregated_data_gender

[Row(gender='Female', min_unit_price=124.84, max_unit_price=1139.68, unit_price_diff=1014.84, min_total_price=22.08, max_total_price=11396.8, total_price_diff=11374.72),
 Row(gender='Male', min_unit_price=124.47, max_unit_price=1139.66, unit_price_diff=1015.19, min_total_price=22.63, max_total_price=11396.8, total_price_diff=11374.17)]

In [61]:
start_time = time.time()
grouped_data_gender.collect()
grouped_data_age.collect()
grouped_data_gender_age.collect()
no_cache_time = time.time() - start_time

no_cache_time

0.058648109436035156

In [63]:
spark.sparkContext.setCheckpointDir("/Users/tmanspace/Files/test")

In [64]:
filtered_data.cache()
filtered_data.checkpoint()

24/12/01 22:04:21 WARN CacheManager: Asked to cache already cached data.


DataFrame[customer_id: int, age: int, gender: string, loyalty_member: string, product_type: string, sku: string, rating: int, order_status: string, payment_method: string, total_price: double, unit_price: double, quantity: int, purchase_date: timestamp, shipping_type: string, addons: string, addons_cnt: double]

In [81]:
start_time = time.time()
grouped_data_gender.collect()
grouped_data_age.collect()
grouped_data_gender_age.collect()
cache_time = time.time() - start_time

cache_time

0.054679155349731445

In [82]:
print(f"Время без кэширования: {no_cache_time:.4f} секунд")
print(f"Время с кэшированием: {cache_time:.4f} секунд")

Время без кэширования: 0.0586 секунд
Время с кэшированием: 0.0547 секунд


In [85]:
print("Агрегированные данные по возрасту:")
for row in aggregated_data_age[:5]:
    print(row)

Агрегированные данные по возрасту:
Row(age=19, min_unit_price=694.62, max_unit_price=1101.17, unit_price_diff=406.55000000000007, min_total_price=221.86, max_total_price=10610.57, total_price_diff=10388.71)
Row(age=20, min_unit_price=127.14, max_unit_price=1116.98, unit_price_diff=989.84, min_total_price=71.53, max_total_price=6785.45, total_price_diff=6713.92)
Row(age=21, min_unit_price=181.03, max_unit_price=1137.69, unit_price_diff=956.6600000000001, min_total_price=629.43, max_total_price=8136.03, total_price_diff=7506.599999999999)
Row(age=22, min_unit_price=138.85, max_unit_price=1134.4, unit_price_diff=995.5500000000001, min_total_price=404.4, max_total_price=7863.89, total_price_diff=7459.490000000001)
Row(age=23, min_unit_price=124.84, max_unit_price=1134.69, unit_price_diff=1009.85, min_total_price=29.36, max_total_price=9873.19, total_price_diff=9843.83)
