In [1]:
import pandas as pd
import time 

df = pd.read_csv('wroshop_small.csv')
print(df.head())



  transaction_id            timestamp  customer_id      city  product_id  \
0  TXN0000077275  2023-01-01 00:35:30  CUST0003778  Warszawa  PROD004174   
1  TXN0000001975  2023-01-01 01:16:48  CUST0017124  Warszawa  PROD001943   
2  TXN0000057581  2023-01-01 01:54:58  CUST0010032    Krakow  PROD001245   
3  TXN0000072149  2023-01-01 03:02:28  CUST0013656     Radom  PROD001143   
4  TXN0000019668  2023-01-01 04:18:18  CUST0016922  Warszawa  PROD000746   

             category   price  quantity  total_amount  
0  Artykuly_Spozywcze   30.23         2         60.46  
1         Elektronika  318.12         1        318.12  
2             Zabawki   59.10         5        295.50  
3             Zabawki  101.40         2        202.80  
4           Smartfony  426.73         2        853.46  


In [2]:
# a)

start_time = time.time()
result = df.groupby('category')['total_amount'].sum()
end_time = time.time()
mem = result.memory_usage(deep=True) / 1024**2
#print(result)
print(f"Grupowanie - Czas wykonania: {end_time - start_time:.4f} sekundy")
print(f"Pamięć: {mem:.4f} MB")


start_time1 = time.time()
result1 = df.groupby('city')['quantity'].sum().sort_values(ascending=False).head(10)
end_time1 = time.time()
mem1 = result1.memory_usage(deep=True) / 1024**2
#print(result1)
print(f"Top10 - Czas wykonania: {end_time1 - start_time1:.4f} sekundy")
print(f"Pamięć: {mem1:.4f} MB")

start_time2 = time.time()
result2 = df[df['quantity'] > 6]
end_time2 = time.time()
mem2 = result2.memory_usage(deep=True).sum() / 1024**2
#print(result2)
print(f"Filtrowanie - Czas wykonania: {end_time2 - start_time2:.4f} sekundy")
print(f"Pamięć: {mem2:.4f} MB")



Grupowanie - Czas wykonania: 0.0108 sekundy
Pamięć: 0.0012 MB
Top10 - Czas wykonania: 0.0091 sekundy
Pamięć: 0.0006 MB
Filtrowanie - Czas wykonania: 0.0068 sekundy
Pamięć: 1.4782 MB


In [3]:
import glob
# b)

file_pattern = "wroshop_medium/part_*.csv"
files = glob.glob(file_pattern)

files_sorted = sorted(files)

df = pd.concat((pd.read_csv(file) for file in files_sorted), ignore_index=True)

start_time = time.time()
result = df.groupby('category')['total_amount'].sum()
end_time = time.time()
mem = result.memory_usage(deep=True) / 1024**2
#print(result)
print(f"Grupowanie - Czas wykonania: {end_time - start_time:.4f} sekundy")
print(f"Pamięć: {mem:.4f} MB")


start_time1 = time.time()
result1 = df.groupby('city')['quantity'].sum().sort_values(ascending=False).head(10)
end_time1 = time.time()
mem1 = result1.memory_usage(deep=True) / 1024**2
#print(result1)
print(f"Top10 - Czas wykonania: {end_time1 - start_time1:.4f} sekundy")
print(f"Pamięć: {mem1:.4f} MB")

start_time2 = time.time()
result2 = df[df['quantity'] > 6]
end_time2 = time.time()
mem2 = result2.memory_usage(deep=True).sum() / 1024**2
#print(result2)
print(f"Filtrowanie - Czas wykonania: {end_time2 - start_time2:.4f} sekundy")
print(f"Pamięć: {mem2:.4f} MB")


Grupowanie - Czas wykonania: 0.0767 sekundy
Pamięć: 0.0012 MB
Top10 - Czas wykonania: 0.0686 sekundy
Pamięć: 0.0006 MB
Filtrowanie - Czas wykonania: 0.0190 sekundy
Pamięć: 14.9741 MB


In [7]:
# c)
import dask.dataframe as dd
import time

ddf = dd.read_csv('wroshop_medium/*.csv')
result = ddf.groupby('category')['total_amount'].sum()
result.visualize(filename='graph.png')

start_time = time.time()
result.compute()
end_time = time.time()
print(f"Grupowanie - Czas wykonania: {end_time - start_time:.4f} sekundy")




Grupowanie - Czas wykonania: 2.3601 sekundy


In [5]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName("WroShop").getOrCreate()
df = spark.read.csv('wroshop_medium/', header = True, inferSchema = True)

start_time = time.time()
result = df.groupBy('category').avg('total_amount')
end_time = time.time()
print(f"Grupowanie - Czas wykonania: {end_time - start_time:.4f} sekundy")
print(result.explain())





Grupowanie - Czas wykonania: 0.0216 sekundy
== Physical Plan ==
*(2) HashAggregate(keys=[category#192], functions=[avg(total_amount#195)])
+- Exchange hashpartitioning(category#192, 200), ENSURE_REQUIREMENTS, [id=#139]
   +- *(1) HashAggregate(keys=[category#192], functions=[partial_avg(total_amount#195)])
      +- FileScan csv [category#192,total_amount#195] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/kornelorawczak/Documents/studia/sem5/ml/list7/wroshop_medium], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<category:string,total_amount:double>


None


                                                                                

25/12/01 03:44:58 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 271492 ms exceeds timeout 120000 ms
25/12/01 03:44:58 WARN SparkContext: Killing executors is not supported by current scheduler.


Pandas:
- Wszystkie dane muszą zmieścić się w pamięci RAM
- Procesowanie na pojedynczym rdzeniu CPU
- Brak dystrybucji - wszystko na jednej maszynie
- Dobre dla małych danych, jedna maszyna i proste transformacje


Dask:
- Lazy evaluation - tworzy graf zadań
- Dzieli dane na partycje
- Wykonuje równolegle na wielu rdzeniach
- Duże dane które nie mieszczą się na jednej maszynie i potrzeba wielordzeniowości w równoległych, niezależnych procesach

Pyspark:
- Dystrybuowane na wielu maszynach
- Resilient Distributed Datasets (RDD) - odporne na błędy
- Optimizer Catalyst - zaawansowana optymalizacja zapytań
- Lazy evaluation z zaawansowanym planowaniem
- Olbrzymie dane, zaawansowana optymalizacja