In [None]:
import pandas as pd
import cudf

from sqlalchemy import create_engine
import urllib
import numpy as np
from numba import cuda

In [None]:
params = urllib.parse.quote_plus("DRIVER={ODBC Driver 17 for SQL Server};"
                                 "SERVER=localhost;"
                                 "DATABASE=<DB>;"
                                 "Uid=<user>;"
                                 "Pwd=<pwd>"
                                )

In [None]:
# Connect via current Windows credentials
conn = create_engine("mssql+pyodbc:///?odbc_connect={}".format(params))
conn

In [None]:
table_name = "AzureCSPPriceList"

In [None]:
pdf = pd.read_sql(f"SELECT ResourceID, Region, [Month], IncludedQuantities, MinimumValue, EUR  FROM {table_name} ORDER BY ResourceID, Region, Month, MinimumValue", 
                 conn)

In [None]:
pdf.info()

### Convert to cudf dataframe

In [None]:
cdf = cudf.from_pandas(pdf)

In [None]:
# Are cdf and pdf the same?
cdf.to_pandas().info()

### Check for nulls / nans and fix if so

In [None]:
cdf.to_pandas().isnull().any()

In [None]:
cdf['IncludedQuantities'].fillna(0, inplace=True)

In [None]:
cdf['EUR'].fillna(0, inplace=True)

In [None]:
cdf.to_pandas().isnull().any()

### Lowercase some fields

In [None]:
cdf['ResourceID'] = cdf['ResourceID'].str.lower()

In [None]:
cdf['Region'] = cdf['Region'].str.lower()

### Calculate hashes for grouping

In [None]:
cdf['ResourceHash'] = cdf.hash_columns(['ResourceID', 'Region', 'Month'])

In [None]:
cdf['ResourceIDHash'] = cdf.hash_columns(['ResourceID'])

### Group by ResourceHash to isolate the pricelist for a single resource in a single month

In [None]:
groups = cdf.groupby(['ResourceHash'], method='cudf')

df_groups = groups.as_df()

# DataFrame indexes of group starts
print(df_groups[1])

# DataFrame of groups itself
print(df_groups[0])

In [None]:
# How many chunks?
print(len(df_groups[1]))

### Calculate some extra info per price line

The below python function will be wrapped in below numba CUDA kernel function.

_Note that the CUDA grid and block parameters could be fed to the python function via the kwargs parameter, if desired._

```
def chunk_wise_kernel(nrows, chunks, __user_EUR, __user_MinimumValue, __user_UpperBound, __user_StartValue):
    blkid = cuda.blockIdx.x
    nblkid = cuda.gridDim.x
    tid = cuda.threadIdx.x
    ntid = cuda.blockDim.x
    for curblk in range(blkid, chunks.size, nblkid):
        start = chunks[curblk]
        stop = chunks[curblk + 1] if curblk + 1 < chunks.size else nrows
        inner(__user_EUR[start:stop], __user_MinimumValue[start:stop], __user_UpperBound[start:stop],
              __user_StartValue[start:stop])
```

### Implementation using cudf DataFrame.apply_chunks

In [None]:
# Define a function to apply to each group

def grpfunc(EUR, MinimumValue, upper_limit, interval_value, cum_value):
    if cuda.threadIdx.x == 0:
        count = len(EUR)

        if count == 1:
            upper_limit[0] = 1e100
            interval_value[0] = 0.0
            cum_value[0] = 0.0
        elif count > 1:
            for i in range(count-1):
                upper_limit[i] = MinimumValue[i+1]
                interval_value[i] = EUR[i] * \
                    (MinimumValue[i+1] - MinimumValue[i])

                if i == 0:
                    cum_value[i] = 0.0
                else:
                    cum_value[i] = interval_value[i-1] + cum_value[i-1]
            # Special handling of last entry
            upper_limit[count-1] = 1e100
            interval_value[count-1] = 1e100
            cum_value[count-1] = interval_value[count-1-1] + cum_value[count-1-1]
        else:
            # Should not happen, unless input column EUR contains nulls
            print("len(EUR): ", len(EUR))

# Run kernel function by group chunk
result = df_groups[0].apply_chunks(grpfunc,
                                   incols=['EUR', 'MinimumValue'],
                                   outcols={'upper_limit': np.float64, 'interval_value': np.float64, 'cum_value': np.float64},
                                   kwargs={},
                                   chunks=df_groups[1],
                                   # threads per block
                                   tpb=1)

print(result)

### Alternative implementation using GroupBy.apply_grouped

In [None]:
# Define a function to apply to each group

def grpfunc(EUR, MinimumValue, upper_limit, interval_value, cum_value):
    if cuda.threadIdx.x == 0:
        count = len(EUR)

        if count == 1:
            upper_limit[0] = 1e100
            interval_value[0] = 0.0
            cum_value[0] = 0.0
        elif count > 1:
            for i in range(count-1):
                upper_limit[i] = MinimumValue[i+1]
                interval_value[i] = EUR[i] * \
                    (MinimumValue[i+1] - MinimumValue[i])

                if i == 0:
                    cum_value[i] = 0.0
                else:
                    cum_value[i] = interval_value[i-1] + cum_value[i-1]
            # Special handling of last entry
            upper_limit[count-1] = 1e100
            interval_value[count-1] = 1e100
            cum_value[count-1] = interval_value[count-1-1] + cum_value[count-1-1]
        else:
            # Should not happen, unless input column EUR contains nulls
            print("len(EUR): ", len(EUR))

# Run kernel function by group chunk
result = groups.apply_grouped(grpfunc,
                                   incols=['EUR', 'MinimumValue'],
                                   outcols={'upper_limit': np.float64, 'interval_value': np.float64, 'cum_value': np.float64},
                                   # threads per block
                                   tpb=1)

print(result)

### Write to SQL table

In [None]:
tablename = 'AzureCSPPriceList_Extra'

In [None]:
result.to_pandas().to_sql(name=tablename, con=conn, index=False, if_exists="replace")