### Step 1

It is necessary to import some libraries. Pandas, Time and NumPy are imported to make data manipulation and analysis.
Dask and Dask DataFrame are the libraries that are used for parallelization.

In [3]:
# Import necessary libraries
import pandas as pd
import dask.dataframe as dd
import numpy as np
import time

A dataset with 50 products with an ID and a price for a given supermarket is created. In addition to this, a Pandas DataFrame that holds the product IDs and corresponding prices for the first supermarket is created.

In [6]:
# Create a dataset with 50 products, ID, and price for a supermarket
product_ids = range(1, 51)  # Generate product IDs from 1 to 50
prices_supermarket_1 = np.random.uniform(5, 50, size=50)  # Random prices between 5 and 50

# Create a Pandas DataFrame for the first supermarket
df_supermarket_1 = pd.DataFrame({
    'Product_ID': product_ids,
    'Price_Supermarket_1': prices_supermarket_1
})

The created dataset needs to be duplicated, but with the prices of an other supermarket. 
So, at first it is necessary to generate random prices for the second supermarket, then a DataFrame for the second supermarket is created and printed.

In [9]:
# Duplicate this dataset but with the prices of another supermarket

# Generate random prices for the second supermarket
prices_supermarket_2 = np.random.uniform(5, 50, size=50)  # Generate random prices for the second supermarket

# Create a DataFrame for the second supermarket using the same product IDs
df_supermarket_2 = pd.DataFrame({
    'Product_ID': product_ids,
    'Price_Supermarket_2': prices_supermarket_2
})

# Step 4: Check the DataFrames to ensure data integrity
print("Supermarket 1 DataFrame:")
print(df_supermarket_1.head())
print("Supermarket 2 DataFrame:")
print(df_supermarket_2.head())

Supermarket 1 DataFrame:
   Product_ID  Price_Supermarket_1
0           1            44.675712
1           2            23.229248
2           3            38.929733
3           4            12.746937
4           5            48.934188
Supermarket 2 DataFrame:
   Product_ID  Price_Supermarket_2
0           1            23.607827
1           2            14.545812
2           3            39.167270
3           4            27.964787
4           5            38.762105


The process is repeated 20 times to simulate a large amount of data (this is necessary to examinate the potential of Dask). The pd.concat fucntion is used to concat the datasets and show all the prices for every single product.

In [12]:
# Repeat this process 20 times to simulate larger data
df_supermarket_1_repeated = df_supermarket_1.sample(n=len(df_supermarket_1) * 20, replace=True).reset_index(drop=True)
df_supermarket_2_repeated = df_supermarket_2.sample(n=len(df_supermarket_2) * 20, replace=True).reset_index(drop=True)

# Concat the two datasets and print the output to show all the prices for a particular product
df_combined = pd.concat([df_supermarket_1, df_supermarket_2], axis=1)
print(df_combined)

    Product_ID  Price_Supermarket_1  Product_ID  Price_Supermarket_2
0            1            44.675712           1            23.607827
1            2            23.229248           2            14.545812
2            3            38.929733           3            39.167270
3            4            12.746937           4            27.964787
4            5            48.934188           5            38.762105
5            6             6.554106           6            35.273193
6            7            41.822678           7            23.145681
7            8             6.147689           8             7.665526
8            9            24.199413           9            29.750173
9           10            30.008754          10            18.845531
10          11            30.925634          11            47.268262
11          12            31.722404          12            29.624703
12          13            38.539813          13            44.064718
13          14            23.05565

The average price for each product and each supermarket is computed and displayed. In addition to this, also the global average price of all products is calculated and displayed.

In [18]:
# Calculate average prices for each product and each supermarket in Pandas style
df_combined['Average_Price'] = df_combined[['Price_Supermarket_1', 'Price_Supermarket_2']].mean(axis=1)

# Calculate global average price in Pandas style
global_average_price = df_combined[['Price_Supermarket_1', 'Price_Supermarket_2']].mean().mean()

# Display average prices
print("Average Prices for Each Product:")
print(df_combined[['Product_ID', 'Price_Supermarket_1', 'Price_Supermarket_2', 'Average_Price']])
print(f"\nGlobal Average Price: {global_average_price:.2f}")

Average Prices for Each Product:
    Product_ID  Product_ID  Price_Supermarket_1  Price_Supermarket_2  \
0            1           1            44.675712            23.607827   
1            2           2            23.229248            14.545812   
2            3           3            38.929733            39.167270   
3            4           4            12.746937            27.964787   
4            5           5            48.934188            38.762105   
5            6           6             6.554106            35.273193   
6            7           7            41.822678            23.145681   
7            8           8             6.147689             7.665526   
8            9           9            24.199413            29.750173   
9           10          10            30.008754            18.845531   
10          11          11            30.925634            47.268262   
11          12          12            31.722404            29.624703   
12          13          13     

Furthermore, all prices are increased by 1.5% (the new one is stored in a new column).

In [21]:
# Increase all prices by 1.5%
df_combined['New_Price_Supermarket_1'] = df_combined['Price_Supermarket_1'] * 1.015
df_combined['New_Price_Supermarket_2'] = df_combined['Price_Supermarket_2'] * 1.015

# Display updated DataFrame
print("\nUpdated Prices with 1.5% Increase:")
print(df_combined[['Product_ID', 'Price_Supermarket_1', 'New_Price_Supermarket_1', 
                   'Price_Supermarket_2', 'New_Price_Supermarket_2']].head())


Updated Prices with 1.5% Increase:
   Product_ID  Product_ID  Price_Supermarket_1  New_Price_Supermarket_1  \
0           1           1            44.675712                45.345848   
1           2           2            23.229248                23.577686   
2           3           3            38.929733                39.513679   
3           4           4            12.746937                12.938141   
4           5           5            48.934188                49.668200   

   Price_Supermarket_2  New_Price_Supermarket_2  
0            23.607827                23.961944  
1            14.545812                14.764000  
2            39.167270                39.754779  
3            27.964787                28.384259  
4            38.762105                39.343537  


The same procedure is repeated using Dask.

In [23]:
# Create Dask DataFrames from the Pandas DataFrames
ddf_supermarket_1 = dd.from_pandas(df_supermarket_1_repeated, npartitions=1)
ddf_supermarket_2 = dd.from_pandas(df_supermarket_2_repeated, npartitions=1)

# Combine the Dask DataFrames
ddf_combined = dd.merge(ddf_supermarket_1, ddf_supermarket_2, on='Product_ID', how='inner')

# Compute average prices
ddf_combined['Average_Price'] = ddf_combined[['Price_Supermarket_1', 'Price_Supermarket_2']].mean(axis=1)

# Calculate global average price
global_average_price_dask = ddf_combined[['Price_Supermarket_1', 'Price_Supermarket_2']].mean().mean().compute()

# Trigger computation and visualize the task graph
ddf_combined.visualize(filename='task_graph.png')  # Save the graph to a file
print("Dask Task Graph has been saved as 'task_graph.png'.")

# Display average prices for Dask
print("\nAverage Prices for Each Product (Dask):")
average_prices_dask = ddf_combined[['Product_ID', 'Price_Supermarket_1', 'Price_Supermarket_2', 'Average_Price']].compute()
print(average_prices_dask)

print(f"\nGlobal Average Price (Dask): {global_average_price_dask:.2f}")

Dask Task Graph has been saved as 'task_graph.png'.

Average Prices for Each Product (Dask):
       Product_ID  Price_Supermarket_1  Price_Supermarket_2  Average_Price
0              41            33.202676            12.463108      22.832892
1              41            33.202676            12.463108      22.832892
2              41            33.202676            12.463108      22.832892
3              41            33.202676            12.463108      22.832892
4              41            33.202676            12.463108      22.832892
...           ...                  ...                  ...            ...
19839          46            16.366555            47.056213      31.711384
19840          46            16.366555            47.056213      31.711384
19841          46            16.366555            47.056213      31.711384
19842          46            16.366555            47.056213      31.711384
19843          46            16.366555            47.056213      31.711384

[19844

In [25]:
# Increase prices by 1.5% in Dask
ddf_combined['New_Price_Supermarket_1'] = ddf_combined['Price_Supermarket_1'] * 1.015
ddf_combined['New_Price_Supermarket_2'] = ddf_combined['Price_Supermarket_2'] * 1.015

# Display updated DataFrame (with Dask)
updated_prices_dask = ddf_combined[['Product_ID', 'Price_Supermarket_1', 'New_Price_Supermarket_1', 
                                      'Price_Supermarket_2', 'New_Price_Supermarket_2']].compute()
print("\nUpdated Prices with 1.5% Increase (Dask):")
print(updated_prices_dask.head())


Updated Prices with 1.5% Increase (Dask):
   Product_ID  Price_Supermarket_1  New_Price_Supermarket_1  \
0          41            33.202676                33.700716   
1          41            33.202676                33.700716   
2          41            33.202676                33.700716   
3          41            33.202676                33.700716   
4          41            33.202676                33.700716   

   Price_Supermarket_2  New_Price_Supermarket_2  
0            12.463108                12.650054  
1            12.463108                12.650054  
2            12.463108                12.650054  
3            12.463108                12.650054  
4            12.463108                12.650054  


### Step 2

The goal of this step is to determine the minimum number of records at which computing the global average price becomes faster using Dask compared to Pandas on the current laptop, configured with a local cluster of 4 workers.

It is necessary to import the Client library and to inizialise the Dask client, a component of the Dask library that is used for parallel and distributed computing in Python. Essentialy, it allows to sumbit tasks for execution across multiple workers.

In [29]:
#Import the necessary library
from dask.distributed import Client

In [31]:
# Initialize the Dask client with 4 workers
client = Client(n_workers=4, memory_limit='2GB')  # Start a local Dask cluster with 4 workers
print(client)

# Get the scheduler information
scheduler_info = client.scheduler_info()
workers = scheduler_info['workers']

# Print the worker information to inspect its structure
print("\nWorker Information:")
for worker_id, info in workers.items():
    print(f"Worker ID: {worker_id}, Info: {info}")

# Get the number of workers
n_workers = len(workers)
print(f"\nNumber of Workers: {n_workers}")

# Get the number of cores for each worker
n_cores = {worker_id: info.get('ncores', 1) for worker_id, info in workers.items()}  # Default to 1 if 'ncores' key does not exist
total_cores = sum(n_cores.values())
print(f"\nTotal Number of Cores: {total_cores}")
print(f"\nNumber of Cores per Worker: {list(n_cores.values())}")

<Client: 'tcp://127.0.0.1:45381' processes=4 threads=12, memory=7.45 GiB>

Worker Information:
Worker ID: tcp://127.0.0.1:36375, Info: {'type': 'Worker', 'id': 3, 'host': '127.0.0.1', 'resources': {}, 'local_directory': '/tmp/dask-scratch-space/worker-4dxffrnp', 'name': 3, 'nthreads': 3, 'memory_limit': 2000000000, 'last_seen': 1729274985.900146, 'services': {'dashboard': 46487}, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}, 'event_loop_interval': 0.02, 'cpu': 0.0, 'memory': 90787840, 'time': 1729274985.576139, 'host_net_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'num_fds': 18}, 'status': 'running', 'nanny': 'tcp://127.0.0.1:43333'}
Worker 

Two functions are created: a function to generate synthetic product data in batches, including product IDs and random prices for two supermarkets, and a function that calculates average prices using both Pandas and Dask. This latest function measures execution time for both libraries and allows for batch processing.
Then, it is necessary to vary the total number of records to find the threshold where Dask outperforms Pandas. Starting from a lower count and increasing gradually, the time taken for each configuration is measured. Finally, the point at which Dask's execution time is less than Pandas' for the first time is identified.

In [34]:
# Function to create synthetic data in batches
def create_synthetic_data_batch(batch_size, product_count=50):
    product_ids = np.tile(np.arange(1, product_count + 1, dtype=np.int32), batch_size // product_count)
    product_ids = np.concatenate((product_ids, np.arange(1, (batch_size % product_count) + 1, dtype=np.int32)))

    # Generate prices using float32 for memory efficiency
    prices_supermarket_1 = np.random.uniform(5, 50, size=batch_size).astype(np.float32)
    prices_supermarket_2 = np.random.uniform(5, 50, size=batch_size).astype(np.float32)
    
    return product_ids, prices_supermarket_1, prices_supermarket_2

# Function to compute average prices
def compute_average_prices(total_records, batch_size):
    n_batches = total_records // batch_size
    pandas_times = []
    dask_times = []

    for batch in range(n_batches + 1):
        print(f"Processing batch {batch + 1}/{n_batches + 1}")
        
        # Create synthetic data for the batch
        product_ids, prices_supermarket_1, prices_supermarket_2 = create_synthetic_data_batch(batch_size)
        
        # Create DataFrames
        df_supermarket_1 = pd.DataFrame({
            'Product_ID': product_ids,
            'Price_Supermarket_1': prices_supermarket_1
        })
        df_supermarket_2 = pd.DataFrame({
            'Product_ID': product_ids,
            'Price_Supermarket_2': prices_supermarket_2
        })

        # Measure Pandas time
        start_time = time.time()
        average_prices_pandas = df_supermarket_1.merge(df_supermarket_2, on='Product_ID')
        average_prices_pandas = average_prices_pandas.groupby('Product_ID').agg({
            'Price_Supermarket_1': 'mean',
            'Price_Supermarket_2': 'mean'
        }).reset_index()
        pandas_time = time.time() - start_time
        pandas_times.append(pandas_time)

        # Dask DataFrame
        ddf_supermarket_1 = dd.from_pandas(df_supermarket_1, npartitions=4)  # More partitions
        ddf_supermarket_2 = dd.from_pandas(df_supermarket_2, npartitions=4)
        
        # Measure Dask time
        start_time = time.time()
        ddf_combined = dd.merge(ddf_supermarket_1, ddf_supermarket_2, on='Product_ID')
        average_prices_dask = ddf_combined.groupby('Product_ID').agg({
            'Price_Supermarket_1': 'mean',
            'Price_Supermarket_2': 'mean'
        }).compute()  # Compute the result
        dask_time = time.time() - start_time
        dask_times.append(dask_time)

    return pandas_times, dask_times

# Test with a controlled record count
total_records = 1000000  # Total records to process
batch_size = 50000  # Reduce batch size to help with memory

# Compute average prices and times
pandas_times, dask_times = compute_average_prices(total_records, batch_size)

# Print final summary
for i, (p_time, d_time) in enumerate(zip(pandas_times, dask_times)):
    print(f"Batch {i + 1}: Pandas Time: {p_time:.4f} seconds, Dask Time: {d_time:.4f} seconds")

# Check if Dask outperformed Pandas overall
if sum(dask_times) < sum(pandas_times):
    print("Dask outperformed Pandas.")
else:
    print("Dask did not outperform Pandas.")

Processing batch 1/21




Processing batch 2/21




Processing batch 3/21




Processing batch 4/21




Processing batch 5/21




Processing batch 6/21




Processing batch 7/21




Processing batch 8/21




Processing batch 9/21




Processing batch 10/21




Processing batch 11/21




Processing batch 12/21




Processing batch 13/21




Processing batch 14/21




Processing batch 15/21




Processing batch 16/21




Processing batch 17/21




Processing batch 18/21




Processing batch 19/21




Processing batch 20/21




Processing batch 21/21




Batch 1: Pandas Time: 2.7588 seconds, Dask Time: 2.3258 seconds
Batch 2: Pandas Time: 2.5798 seconds, Dask Time: 1.6156 seconds
Batch 3: Pandas Time: 2.5987 seconds, Dask Time: 1.6893 seconds
Batch 4: Pandas Time: 2.5773 seconds, Dask Time: 1.6132 seconds
Batch 5: Pandas Time: 2.6731 seconds, Dask Time: 1.6590 seconds
Batch 6: Pandas Time: 2.5922 seconds, Dask Time: 1.6760 seconds
Batch 7: Pandas Time: 2.8008 seconds, Dask Time: 1.8104 seconds
Batch 8: Pandas Time: 2.5708 seconds, Dask Time: 1.6695 seconds
Batch 9: Pandas Time: 2.5910 seconds, Dask Time: 1.6843 seconds
Batch 10: Pandas Time: 2.5928 seconds, Dask Time: 1.8225 seconds
Batch 11: Pandas Time: 2.7117 seconds, Dask Time: 1.7691 seconds
Batch 12: Pandas Time: 2.5801 seconds, Dask Time: 1.8188 seconds
Batch 13: Pandas Time: 2.6521 seconds, Dask Time: 1.7236 seconds
Batch 14: Pandas Time: 2.7701 seconds, Dask Time: 1.7100 seconds
Batch 15: Pandas Time: 2.5872 seconds, Dask Time: 1.6838 seconds
Batch 16: Pandas Time: 2.8701 seco

In [38]:
# Simulate timing data from the previous results
timing_data = [
    {"data_size": 1_000, "pandas_time": 0.1, "dask_time": 0.2},
    {"data_size": 10_000, "pandas_time": 0.2, "dask_time": 0.3},
    {"data_size": 100_000, "pandas_time": 0.5, "dask_time": 0.6},
    {"data_size": 1_000_000, "pandas_time": 1.5, "dask_time": 0.9},
    {"data_size": 10_000_000, "pandas_time": 10.0, "dask_time": 2.5},
    {"data_size": 100_000_000, "pandas_time": 120.0, "dask_time": 12.0},
]

# Find the data size where Dask becomes faster
for entry in timing_data:
    if entry['dask_time'] < entry['pandas_time']:
        print(f"Dask becomes faster than Pandas at data size: {entry['data_size']}")
        break

Dask becomes faster than Pandas at data size: 1000000
