In [None]:
import pandas
import lithops
import io
import boto3
import time
import matplotlib.pyplot as plt

from dataplug import CloudObject
from dataplug.formats.generic.csv import CSV, partition_chunk_size, partition_num_chunks

session = boto3.Session(region_name="us-east-1")
creds = session.get_credentials().get_frozen_credentials()

s3_config = {
        "credentials": {
                "AccessKeyId": creds.access_key,
                "SecretAccessKey": creds.secret_key,
                "SessionToken": creds.token,
        }
}

In [None]:
my_id = input()
print("Your ID is", my_id)
read_bucket_name = 'scipy-tutorial-data'
write_bucket_name = 'scipy-tutorial-' + my_id

In [None]:
co = CloudObject.from_s3(CSV, f"s3://{read_bucket_name}/yellow_tripdata_2015-01.csv", s3_config=s3_config,
                         metadata_bucket=write_bucket_name)


co.preprocess()
data_slices = co.partition(partition_chunk_size, chunk_size=25*1024*1024)  # 100MB

def count_total_passengers(data_slice):
    csv_bytes = data_slice.get().encode('utf-8')
    df = pandas.read_csv(io.BytesIO(csv_bytes))
    return df['passenger_count'].sum()

def reduce_fn(total_passengers):
    return sum(total_passengers)


# Step 3: Create a Lithops executor and map the function to the data
executor = lithops.FunctionExecutor()
futures = executor.map_reduce(map_function=count_total_passengers, map_iterdata=data_slices, reduce_function=reduce_fn,
                              map_runtime_memory=1769)
result = futures.get_result()

print(f"Total passengers: {result}")

#### Performance Comparison
From here on, we test the execution of the pipeline with different numbers of workers to evaluate how scalability affects both execution time and cost. The configurations tested are:

12 workers

24 workers

48 workers

96 workers

For each configuration, we measure the elapsed time required to complete the pipeline and the associated execution cost. These metrics allow us to analyze the trade-offs between speed and expense when increasing parallelism. Our goal is to identify the optimal number of workers that balances performance improvements with cost efficiency.


In [None]:
# Calculate metrics definitions
def calc_cost(futures, memory):
    stats = [i.stats for i in futures]
    total_time = 0
    for func in stats:
        total_time += func["worker_end_tstamp"] - func["worker_start_tstamp"]
    return (total_time / 3600) * (memory / 1024) * 0.00001667  


def calc_time(start, end):
    return end - start


def calc_perf(elapsed_time, cost):
    return (1/elapsed_time) * (1/cost)

In [None]:
# Launch with different configurations and collect the metrics
  
executor = lithops.FunctionExecutor()

workers_array = [12, 24, 48, 96]
memory = 1769
execution_metrics = {}

for workers in workers_array:
    data_slices = co.partition(partition_num_chunks, num_chunks=workers)
    print(data_slices)
    start = time.time()
    futures = executor.map_reduce(map_function=count_total_passengers, map_iterdata=data_slices, reduce_function=reduce_fn,
                                map_runtime_memory=memory)
    end = time.time()

    executor.get_result()

    elapsed_time = calc_time(start, end)
    cost = calc_cost(futures, memory)
    perf = calc_perf(elapsed_time, cost)

    execution_metrics[str(workers)] = {
        "elapsed_time": elapsed_time,
        "cost": cost,
        "performance": perf
    }  

In [None]:
# Bar plot execution time
workers = list(execution_metrics.keys())
elapsed_times = [execution_metrics[w]["elapsed_time"] for w in workers]
plt.figure(figsize=(6, 5))
plt.bar(workers, elapsed_times)
plt.title('Execution Time vs Number of Workers')
plt.xlabel('Number of Workers')
plt.ylabel('Elapsed Time (seconds)')
plt.grid()
plt.show()

In [None]:
# Bar plot execution cost 
workers = list(execution_metrics.keys())
costs = [execution_metrics[w]["cost"] for w in workers]
plt.figure(figsize=(6, 5))
plt.bar(workers, costs, color="orange")
plt.title('Execution Cost vs Number of Workers')
plt.xlabel('Number of Workers')
plt.ylabel('Execution Cost (USD)')
plt.grid()
plt.show()

In [None]:
# Scatter plot performance (time vs cost)
elapsed_times = [execution_metrics[w]["elapsed_time"] for w in workers]
costs = [execution_metrics[w]["cost"] for w in workers]

plt.figure(figsize=(6, 5))
plt.scatter(elapsed_times, costs, color='green')

for i, w in enumerate(workers):
    plt.text(elapsed_times[i], costs[i], str(w) + " wk", fontsize=14, ha='left', va='bottom')

plt.title('Performance: Cost vs Execution Time')
plt.xlabel('Elapsed Time (seconds)')
plt.ylabel('Execution Cost (USD)')
plt.grid()
plt.show()

In [None]:
# Print and find performance differences between executions 
perf_arr = [execution_metrics[w]["performance"] for w in workers]

min_val = min(perf_arr)
max_val = max(perf_arr)

perf_arr_normalized = [(x - min_val) / (max_val - min_val) if max_val != min_val else 0 for x in perf_arr]

best_index = perf_arr_normalized.index(max(perf_arr_normalized))
best_worker = workers[best_index]
best_perf_norm = perf_arr_normalized[best_index]

print("Normalized perf. array",  perf_arr_normalized)
print(f"Best configuration: {best_worker} workers")