# Gorilla Data Engineer Assessment

Use **pandas** to calculate a transportation distribution charge for four gas meters in
the United Kingdom. While solving this exercise, focus on efficiency - i.e., use vectorised operations and avoid loops!
> Transportation distribution charges are levied by gas distribution companies for the use of their
lower pressure pipelines; they cover the cost of physically transporting the gas through the
pipeline. This rate is determined depending on a meter's exit zone (gas network region) and its
estimated annual quantity (AQ); and it changes over time.

* The daily charge is calculated by finding the correct rate for each meter and day in the
forecast and multiplying this rate (in p/kWh) with the day's forecast (in kWh).
* Calculate the total cost per meter by summing its daily charges for the full forecast
period and converting to Pounds (1p = 0.01£).
* Calculate the total consumption per meter by summing its daily consumption
forecast for the full period.

In [3]:
# import sys
# print(sys.executable) 

import pandas as pd

# Daily Charge Calculation
"""
I: Rate, Forecast
    Rate (R)= p/kWh (pence per kWh) / panda series (dict)
    Forecast (F) = consumption in kWh / panda series (dict)
O: Cost of consumption per day (in pence) / panda series (dict)
C: Rate and forecast should be non-negative values (Absolute)
E: 
    Zero values would equal zero daily charge
    High values or floating point precision may result in overflow or loss in precision
    Non-numeric inputs should result in error
    Negative values should be flagged as invalid and result in error.
"""
def daily_charge(r, f):
    # Check edge cases
    # Non-numeric values
    if not isinstance(r, pd.Series) or not isinstance(f, pd.Series):
        raise TypeError("Rate and forecast must be a series.")
    # Non-negative values
    if (r < 0).any(): 
        raise ValueError("Rate must be non-negative.")
    if (f < 0).any(): 
        raise ValueError("Forecast must be non-negative.")

    # Calculate
    return r * f

# Tests
try:
    r = pd.Series([15.5, 16.0, 14.8, 15.2, 16.5])
    f = pd.Series([20.0, 25.0, 30.0, 15.0, 10.0])
    print(daily_charge(r,f))
except (ValueError, TypeError) as error:
    print(error)

0    310.0
1    400.0
2    444.0
3    228.0
4    165.0
dtype: float64


In [4]:
# Total Cost per Meter Calculation
"""
I: rate, forecast
O: total cost in pounds / numeric - float or int
"""

def meter_cost(r,f):
    daily_charges = daily_charge(r,f) # calculate the daily charges
    total = daily_charges.sum() # get the sum total of the daily charges - in pence
    return total * 0.01 # convert to pouns

# Tests
try:
    r = pd.Series([15.5, 16.0, 14.8, 15.2, 16.5])
    f = pd.Series([20.0, 25.0, 30.0, 15.0, 10.0])
    print(meter_cost(r,f))
except (TypeError,ValueError) as e:
    print(e)

15.47


In [5]:
# Total Consumption per Meter Calculation
"""
I: Forecast / Numpy dict
O: Comsumption in kWh / Numeric
C: Must be non-neg.
E: Needs to add own edge cases since this does not run through daily charges.
"""

def meter_consumption(f):
    # Non-numeric values
    if not isinstance(f, pd.Series):
        raise TypeError("Forecast must be an list of numeric value.")
    # Non-negative values
    if (f < 0).any(): 
        raise ValueError("Forecast must be non-negative.")
    
    # forecast - total consumption 
    return f.sum()

# Tests
try:
    f = pd.Series([20.0,25.0,30.0,15.0,10.0])
    print(meter_consumption(f))
except (TypeError,ValueError) as e:
    print(e)

100.0


In [10]:
# retrieve csv files
df_rate = pd.read_csv("./data/rate.csv")
df_forecast = pd.read_csv("./data/forecast.csv")
df_meter = pd.read_csv("./data/meter.csv")

# combine all csv files
df_combined = pd.merge(df_meter, df_rate, on='exit_zone')
df_data = pd.merge(df_combined, df_forecast, on=['meter_id','date'], how='inner')

print(df_combined)

# calculate the daily charge
# The moment, I realize this negates past functions
df_data['daily_charge'] = df_data['rate_p_per_kwh'] * df_data['kwh'] 

# calculate the total cost, forecast
cost_total = df_data.groupby('meter_id')['daily_charge'].sum() * 0.01
forecast_total = df_data.groupby('meter_id')['kwh'].sum()

# create one table, need to reset the index as its registering meter_id as the index due to prev concat
df_total = pd.merge(forecast_total, cost_total, on='meter_id').reset_index()

# making it pretty now
columns = {
    'meter_id': 'Meter ID', 
    'kwh': 'Total Estimated Consumption (kWh)', 
    'daily_charge': 'Total Cost (£)'
    }
df_result = df_total.rename(columns=columns).to_string(index=False)

# see the magic
print(df_result)

     meter_id  aq_kwh exit_zone                 date  aq_min_kwh  aq_max_kwh  \
0    14676236   28978       EA1  2020-04-01 00:00:00           0     73200.0   
1    14676236   28978       EA1  2020-04-01 00:00:00       73200    732000.0   
2    14676236   28978       EA1  2020-04-01 00:00:00      732000         NaN   
3    14676236   28978       EA1  2020-10-01 00:00:00           0     73200.0   
4    14676236   28978       EA1  2020-10-01 00:00:00       73200    732000.0   
..        ...     ...       ...                  ...         ...         ...   
115  88357331  484399       SE2  2024-04-01 00:00:00       73200    732000.0   
116  88357331  484399       SE2  2024-04-01 00:00:00      732000         NaN   
117  88357331  484399       SE2  2024-10-01 00:00:00           0     73200.0   
118  88357331  484399       SE2  2024-10-01 00:00:00       73200    732000.0   
119  88357331  484399       SE2  2024-10-01 00:00:00      732000         NaN   

     rate_p_per_kwh  
0            0.26

Write a function that generates a list of random meters of any size. Examples of valid exit zones can be found in the rate table. You may randomly generate the annual quantity.

In [56]:
import numpy as np

# Generate list of random meters of any size
"""
I - An integer specifying the number of random meters to generate.
O - A dict containing meter, exit zone, aq 
C - input is non-negative, valid exit zones, aq is adjusted based on specific requirements.
E - zero or negative, extreme values, valid exit zones, range, duplicate meter_ids
"""

def gen_meter_list(m, ez, min=0, max=5000):
    id = np.random.randint(1,1e7, size=m) # generate random meters
    exit_zones = np.random.choice(ez, size=m) # select random exit zones
    aq = np.random.uniform(min,max, size=m)
    
    df_meters = pd.DataFrame({
        'meter_id': id,
        "exit_zone": exit_zones,
        'annual_quantity': aq
    })
    return df_meters

ez = df_rate['exit_zone'].unique() # this is just to get only unique exit zones in existing data.
meter = gen_meter_list(len(ez),ez)

print(meter)

    meter_id exit_zone  annual_quantity
0    8276707       EM1      2664.863915
1    5312113       SW2      4797.135236
2    1723148       SC4      4159.920141
3    6882917        LW      2922.460423
4    5257473       NE3       777.937439
5     396586        LC      2569.256066
6     698492       SW2      2741.462981
7      45018       NT1      2372.323874
8    7532211       EM2      1974.130425
9    6784081       SO1       819.566946
10   6536258       EA4       724.533025
11   1997295       NT1       561.089328
12   6315088       WA1      3052.169282
13    514696       EM3      3469.447003
14   1892924       SW2      1334.878899
15    646908       EA3      1758.885660
16   1942755       SC2      3150.755610
17   3313778       SE1      2902.760151
18   9827145       SW1      1182.314799
19   1086859       NE2       850.719957
20   8972542       NO1      4100.992166
21   8277718       NE3      3892.818985
22   1783933       NW1      4972.715113
23   9513049        LW      4674.849710


Write a function that generates mock consumption data given a list of meters and a start date and duration (number of days in the forecast). The data may be completely random and it doesn't have to match with the meters' annual quantities either.

In [57]:
# Generate mock consumption data
"""
I - List of meters (DF,dict), start date(str), duration (num - # days in forecast)
O - Consumption data (DF) - meter, date, kwh
C - meter list must be valid, start date must be valid, the duration or period should be valid number of dates, the consumption data range should be appropriate use case, 
E - meter list is empty, single meters, duration is only 1 day, negative duration, non-df meters
"""

def gen_consumption_list(meter_list, start_date, periods):
    # make sure start date is in the correct format using Panda Datetime stamp
    if isinstance(start_date, str):
        start_date = pd.Timestamp(start_date)
    else:
        raise TypeError("Start Date must be a string and a valid date YYYY-MM-DD format.")

    # generate forecast periods using duration by Day
    dates = pd.date_range(start=start_date, periods=periods, freq='D')

    # print(dates)

    # get generated meter list (meter_id, dates, kwh)
    meters = np.array([meter_list['meter_id']]).flatten()
    dates = np.tile(dates, len(meters))
    consumption = np.random.uniform(0,5000,size=(len(meter_list),periods)).flatten() # might need to consider what the aq_min and aq_max ranges are based on the date (?)
    
    # print(meters, len(dates), len(consumption))

    # get consumption data
    df_consumption = pd.DataFrame({
        'meter_id': np.repeat(meters,periods),
        'date': dates,
        'kwh': consumption
    })
    
    return df_consumption
    
consumption = gen_consumption_list(df_meter, '2024-01-01', 30)

print (consumption)

     meter_id       date          kwh
0    14676236 2024-01-01  4244.891949
1    14676236 2024-01-02  1886.730295
2    14676236 2024-01-03  3126.518441
3    14676236 2024-01-04  2760.589339
4    14676236 2024-01-05  1141.098338
..        ...        ...          ...
115  88357331 2024-01-26     7.268142
116  88357331 2024-01-27  2154.016813
117  88357331 2024-01-28  3084.564174
118  88357331 2024-01-29  2296.692587
119  88357331 2024-01-30  4825.896520

[120 rows x 3 columns]


Write a function that takes as an input a meter list and a consumption forecast table and that calculates the transportation cost table (i.e., best take your logic from task 1 and wrap it in a function). Benchmark this function using meter lists of different sizes and consumption forecasts for periods of different lengths.

In [66]:
# Function to calculate transportation cost table & Benchmark
"""
I - Meter list (meter, exit zone, aq columns), consumption table (meter_id, date, kwh)
O - Transportation cost table (meter_id, total consumption, total cost)
C - valid meter list, valid consumption forecast, valid rate table
E - empty dataframes, non-unique meters, missing data (i.e. NaN), inconsistent data consumption not in meter, neg values in rates or consumption
"""

def format_value(x):
    if isinstance(x, (int, float)):
        return f"{x:.2f}"
    return x

def calc_cost(meter, consumption):
    # check edge cases and constraints
        # is there any missing data (i.e. NaN)?
        # is it a valid consumption table?
        # is the data frame empty?
        # are all the meters unique in the list?
        # is there any missing data (i.e. NaN)?
    # thinking about ways to check if data consumption is inconsistent
    
    # combine all data from params
    meters_and_consumption = pd.merge(meter, consumption, on='meter_id')

    # need rate_p_per_kwh
    meters_and_consumption['rate_p_per_kwh'] = meters_and_consumption['annual_quantity'] / meters_and_consumption['kwh']
    
    # calc the daily charge
    meters_and_consumption['daily_charge'] = meters_and_consumption['rate_p_per_kwh'] * meters_and_consumption['kwh']

    # calc total cost and total consumption per meter
    cost_total = meters_and_consumption.groupby('meter_id')['daily_charge'].sum() * 0.01
    consumption_total = meters_and_consumption.groupby('meter_id')['kwh'].sum()

    # create the data frame
    df_total = pd.merge(consumption_total, cost_total, on='meter_id').reset_index()
    df_total = df_total.map(format_value)

    # making it pretty
    columns = {
        'meter_id': 'Meter ID', 
        'kwh': 'Total Estimated Consumption (kWh)', 
        'daily_charge': 'Total Cost (£)'
    }

    df_result = df_total.rename(columns=columns)
    
    # return transportation_cost
    return df_result

m = gen_meter_list(len(ez),ez)
c = gen_consumption_list(m, '2024-01-01', 30)

# see the magic
calc_cost(m, c)

Unnamed: 0,Meter ID,Total Estimated Consumption (kWh),Total Cost (£)
0,11527.0,73656.19,70.03
1,796003.0,71855.63,958.27
2,1185113.0,61092.5,782.29
3,1217646.0,79178.23,539.46
4,1306132.0,67235.51,200.67
5,1806971.0,68028.7,538.52
6,1836195.0,66228.48,369.1
7,2024000.0,67849.3,78.1
8,2402508.0,70517.51,269.62
9,2462313.0,68544.07,184.06


In [77]:
import timeit
import pandas as pd
import numpy as np
from memory_profiler import memory_usage

# creating large sample data for testing
def gen_meter_list (size, seed):
    np.random.seed(seed)
    id = np.arange(1, size + 1)
    aq = np.random.randint(1000,5000,size)
    return pd.DataFrame({
        'meter_id': id,
        'annual_quantity': aq
    })

def gen_consumption_list(meter_df, start_date, days):
    meter_ids = meter_df['meter_id'].values
    date_range = pd.date_range(start=start_date, periods=days)
    consumption_data = {
        'meter_id': np.random.choice(meter_ids, size=days * len(meter_ids)),
        'date': np.tile(date_range, len(meter_ids)),
        'kwh': np.random.randint(10, 100, size=days * len(meter_ids))
    }
    return pd.DataFrame(consumption_data)

def benchmark(fn, size, seed=42, days=30):
    m = gen_meter_list(size, seed)
    # print (m)
    c = gen_consumption_list(m, '2024-01-01', days)

    # execution time
    exe_time = timeit.timeit(lambda: fn(m,c), number=10)
    avg_time = exe_time / 10

    # memory usage
    mem = memory_usage((calc_cost, (m, c)), max_iterations=1)
    avg_mem = max(mem) - min(mem)

    return avg_time, avg_mem

# testing the test
def benchmark_test(fn):
    sizes = [100, 1000, 10000, 100000, 500000, 1000000]
    results = []
    
    for size in sizes:
        runtime, mem_usage = benchmark(fn, size)
        results.append((size, runtime))
        print(f"Size: {size}, Avg Runtime: {runtime:.2f} seconds, Memory Usage: {mem_usage:.2f} MiB")

benchmark_test(calc_cost)

Size: 100, Avg Runtime: 0.00 seconds, Memory Usage: 0.00 MiB
Size: 1000, Avg Runtime: 0.00 seconds, Memory Usage: 1.22 MiB
Size: 10000, Avg Runtime: 0.04 seconds, Memory Usage: 0.00 MiB
Size: 100000, Avg Runtime: 0.44 seconds, Memory Usage: 15.72 MiB
Size: 500000, Avg Runtime: 2.36 seconds, Memory Usage: 101.38 MiB
Size: 1000000, Avg Runtime: 5.11 seconds, Memory Usage: 185.53 MiB


> **How does this function scale for larger sets of data?**

After testing the runtime and memory, the input data size determines the execution time and the memory usage will increase. Its complexity is primarily driven by merging data frames, calculations per row, and grouping and summing the data which is resource intensive for larger data sets.

> **What are your observations after benchmarking?**

The execution time increases linearly with the size of the data set. It's efficient with smaller data sets, but larger data sets the execution time increases significantly.

> **Are there any steps in the cost calculation that can be improved?**

My considerations on steps to improve the cost calculation:
* Merging large data frames is time consuming and memory intensive.
* Vectorized calcuations (pandas) significantly improves the performance from my initial findings.
* Avoiding redundancy and reusing reduces computation time.

> **How would you go about improving the performance of this calculation?**

I'd be curious to check the memory usage as well for further improvements, but for some reason, memory_profiler was not working for me (though I'm curious to figure out why).
1. Find and ensure all operations are vectorized utilizing Pandas, by learning more about utilizing pandas and avoid loops for row operations.
2. Using join keys and check indexing for optimize dataframe merge.
3. Drop unnecessary columns that are not needed for the calculation.
4. Avoiding redundancy and reusing reduces computation time.
5. Possibly also see if we can use dask dataframes?

In [81]:
import dask.dataframe as dd

def calc_cost_dask(meter, consumption):
    meter_dask = dd.from_pandas(meter, npartitions=4)
    consumption_dask = dd.from_pandas(consumption, npartitions=4)

    meters_and_consumption = dd.merge(meter_dask, consumption_dask, on='meter_id')

    meters_and_consumption['rate_p_per_kwh'] = meters_and_consumption['annual_quantity'] / meters_and_consumption['kwh']
    meters_and_consumption['daily_charge'] = meters_and_consumption['rate_p_per_kwh'] * meters_and_consumption['kwh']

    grouped = meters_and_consumption.groupby('meter_id').agg({
        'daily_charge': 'sum',
        'kwh': 'sum'
    }).compute()

    grouped['daily_charge'] *= 0.01

    grouped = grouped.reset_index()
    grouped.columns = ['Meter ID', 'Total Cost (£)', 'Total Estimated Consumption (kWh)']

    return grouped.map(format_value)

# Run benchmark for Dask function
benchmark_test(calc_cost_dask)

Size: 100, Avg Runtime: 0.04 seconds, Memory Usage: 0.00 MiB
Size: 1000, Avg Runtime: 0.04 seconds, Memory Usage: 0.00 MiB
Size: 10000, Avg Runtime: 0.08 seconds, Memory Usage: 5.14 MiB
Size: 100000, Avg Runtime: 0.45 seconds, Memory Usage: 0.00 MiB
Size: 500000, Avg Runtime: 2.25 seconds, Memory Usage: 71.78 MiB
Size: 1000000, Avg Runtime: 4.94 seconds, Memory Usage: 2245.55 MiB


Observations with Dask - Overall the larger datasets are reduced by a view ms. We may see more comparing higher datasets.