# Computation API

The computation API allows you to send computation queries to KAWA using a syntax similar to Spark or Snowpark.
It was built to work with any data warehouse. 
It lets you compose complex workloads to be executed on the warehouse. In that way, only the necessary data will be loaded
in the memory of your Python runtime as a pandas dataframe for further manipulation.

It exposes operators such as:
- Grouping
- Sampling
- Filtering (Both at row and group level)
- Aggregating
- Sorting
- Limiting
- Arithmetic, logic etc operations both on row and group levels
- ...



## 1. Setup

### 1.a Connect to your KAWA instance

In [1]:
from kywy.client.kawa_client import KawaClient as K

kawa = K.load_client_from_environment()
cmd = kawa.commands

Authentication successful on http://localhost:8080, in workspace 75



### 1.b Generate a sample data set

This cell will load some demo data into your KAWA instance to illustrate the Computation API's capacities.
Feel free to adapt the follwing cells to your own dataset.

In [27]:
# Creates a sample dataset to illustrate the API behaviour
import pandas as pd
import zoneinfo

from uuid import uuid4
from datetime import date, datetime, time

def utc(year, month, day, hour, minute, second):
    return datetime(year, month, day, hour, minute, second, tzinfo=zoneinfo.ZoneInfo('UTC'))

sample_orders_df = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'flag': [True, False, False, False, False],
    'comment': ['Sold 10 items', 'Sold 20 items', 'Sold 100 items', 'Sold 1 items', 'Sold 1 items'],
    'price': [1.124, 2.228, 10.124, 0.1, 0.1],
    'order_date': [date(2035, 1, 1), date(2035, 1, 2), date(2035, 3, 3), date(2035, 3, 6), date(2035, 3, 7)],
    'update': [utc(2035, 1, 1, 23, 45, 6), utc(2035, 1, 2, 3, 45, 6), utc(2035, 3, 3, 5, 2, 1), utc(2035, 1, 2, 3, 45, 6), utc(2035, 3, 7, 7, 2, 7)],
    'client': ['Wayne Enterprises', 'Wayne Enterprises', 'Wonka', 'Cyberdyne Systems', 'Cyberdyne Systems']
})

loader = kawa.new_data_loader(
    df=sample_orders_df, 
    datasource_name='Computation API sandbox' 
)

loader.create_datasource(primary_keys=['id'])
ds = loader.load_data( create_sheet=True, reset_before_insert=True)

sample_orders_df
datasource_id = ds['id']

Starting an ingestion session with id=666848b2-3934-41d9-8526-3de92b5e99fe
> Exporting the dataframe into 1 parquet file
> Starting 1 loading threads
> Streaming file /var/folders/rl/6bqlws416nz6z2298zxq22zc0000gn/T/b8dc173a-09a5-42db-84ff-d93495652a58/9f6290f05ba94d2ab9b7835fa6a66643-0.parquet to KAWA
> 5 rows were imported in 0.03616476058959961ms
> Import was successfully finalized
Sheet Computation API sandbox was created: http://localhost:8080/workspaces/75/sheets/2518/views/22266


## 2. The computation DSL

This will perform computations on a given sheet, inheriting all the row/column level security, as well as the datasource and sheet level filters.

 ### 2.a Simple example

In its simplest form, this DSL can be used to select all the rows and all the columns of a sheet as below:

In [None]:
# Build the query (no computation is done at this point)
query = (kawa
         .sheet(
             sheet_name='Computation API sandbox', 
             force_tz='UTC',
         )
         .select(K.cols()))

# Send the query to KAWA and returns a pandas df
query.compute()

It is also possible to query data by `sheet_id` and `datasource_id`. This is recommended for additional stability, in particular when many users can change the name of the sheet you are working with.

In [None]:
# Compute all rows and columns from a sheet.
# You can retrieve the id of the sheet by using the GUI and looking at the id in the URL
# For example: https://wayne.kawa.ai/workspaces/75/sheets/2425/views/22127 --> This is sheet 2425
kawa.sheet(sheet_id='.. sheet id here ..').select(K.cols()).no_limit().compute()


# Compute all rows and columns from a datasource.
# You can retrieve the id of the datasource by using the GUI and looking at the id in the URL
# For example: https://wayne.kawa.ai/workspaces/75/datasources/2425 --> This is datasource 2425
kawa.datasource(datasource_id='.. datasource id here ..').select(K.cols()).no_limit().compute()



### 2.b using group_by and aggregations

The `group_by` operator lets you specify which columns you want to group your data on.

When using this operator, you can specify aggregations on the columns you select.

The available aggregations are the following:
The __first__ aggregation is available for all types.

__decimal and integer:__
- sum 
- avg, median
- min, max. min_abs, max_abs
- var_sample, var_pop, std_dev_sample, std_dev_pop
- lowest_decile, lowest_quartile, highest_decile, highest_quartile

__text and boolean:__
- count, count_unique, percent_filled, percent_empty, count_empty
- identical, identical_ignore_empty

__date and date_time:__
- min, max
- identical
- count_unique

In [None]:
# Example with one level of grouping
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(
      # Add an aggregation on the columns
      K.col('id').count(),
      K.col('price').sum(),
      K.col('update').max(),
      K.col('order_date').first(),
  )
  .group_by('client')
 
).compute()

In [None]:
# Example with two levels of grouping
# Notice the grouping(N) columns that are added to the resulting dataframe
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(
      K.col('id').count(),
      K.col('price').sum(),
      K.col('update').max(),
      K.col('order_date').first(),
  )
  .group_by('client', 'flag')
 
).compute(use_group_names=True)

### 2.c using upsampling and data binning

When grouping by __date__, __date_time__, __integer__ or __decimal__ columns, you can specify upsampling/binning.

Here are the available samplers:


__date:__ 
- WEEK
- MONTH
- QUARTER
- SEMESTER
- YEAR
- YEAR_AND_WEEK
- YEAR_AND_MONTH
- YEAR_AND_QUARTER
- YEAR_AND_SEMESTER
- DAY_OF_YEAR
- DAY_OF_WEEK

__date_time:__ 
Same as for date plus:
- DAY
- TWELVE_HOURS
- SIX_HOURS
- HOUR
- THIRTY_MINUTES
- TWENTY_MINUTES
- FIFTEEN_MINUTES
- TEN_MINUTES
- FIVE_MINUTES
- MINUTE
- THIRTY_SECONDS


When extracting the date from datetime objects, the result will depend on the Timezone. Cf examples below.

Those are computed in your local timezone by default or in the timezone you specify when building the sheet object.


__decimal and integer:__ (data binning)
- FIXED_NUMBER_OF_BINS (with extra argument: `how_many_buckets`)
- LIST_OF_BINS (with extra argument: `buckets`)
- FIXED_SIZE_BINS (with extra argument: `bucket_size`)



In [None]:
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(
      K.col('id').count(),
      K.col('price').sum(),
      K.col('order_date').max(),
      K.col('flag').first(),
  )
  # Add upsampling per month on the order date
  .sample(
      sampler='YEAR_AND_MONTH', 
      column_name='order_date',
  )
 .group_by('order_date')
 
).compute()

In [None]:
# Upsampling that is TZ dependant: in UTC
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(K.col('update').count())
  .sample(
      sampler='DAY', 
      column_name='update',
  )
 .group_by('update')
 
).compute()

In [None]:
# Upsampling that is TZ dependant: in NY
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='America/New_York',
  )
  .select(K.col('update').count())
  .sample(
      sampler='DAY', 
      column_name='update',
  )
 .group_by('update')
 
).compute()

In [None]:
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      # When doing date_time upsampling, the usage force_tz is recommended
      # to explicietely defining in which TZ the sampling will be defined
      force_tz='UTC',
  )
  .select(
      K.col('id').count(),
      K.col('price').avg(),
      K.col('order_date').max(),
      K.col('flag').first(),
  )
  .sample(
      sampler='TWENTY_MINUTES', 
      column_name='update',
  )
 .group_by('update')
 
).compute()

In [None]:
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(
      K.col('id').count(),
      K.col('price').avg(),
      K.col('order_date').max(),
      K.col('flag').first(),
  )
  # specify a list of buckets
  .sample(
      sampler='LIST_OF_BINS', 
      column_name='price',
      buckets=[0,1,10],
  )
 .group_by('price')
 
).compute()

### 2.d using order_by and limit

Those two operators can help selecting TOP or WORST performers according to a given metric.
Please note that if you omit the `limit` operator, it will be automacically set to 100.
If you want to load data without limit, use the `no_limit()` operator (or alternatively, specify a negative number in the limit operator).

In [None]:
# Example of query to get the top 1 week for total price
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(
      K.col('id').count(),
      K.col('price').sum(),
      K.col('order_date').max(),
      K.col('flag').first(),
  )
  .sample(
      sampler='YEAR_AND_WEEK', 
      column_name='order_date',
  )
 .group_by('order_date')
 .limit(1)
 .order_by(column_name='price', ascending=False)
 
).compute()

### 2.e special syntax: group_by(1)

Use this syntax to retrieve the global aggregation of the entire dataset.
You can also use aliases when you select multiple times the same column with different aggregations

In [None]:
(kawa
  .sheet(
      sheet_name='Computation API sandbox', 
      force_tz='UTC',
  )
  .select(
      K.col('price').sum().alias('total price'),
      K.col('price').avg().alias('avg price'),
      K.col('price').median().alias('median price'),
  )
 .group_by('1')
).compute()

### 2.f filtering

In order to filter your data, use the `filter` operator.

#### 2.f.1 Text filtering

Text filters are case-insensitive.
Multiple operators are available to filter text columns:

- `eq`
- `ne`
- `ends_with` / `does_not_end_with`
- `starts_with` / `does_not_start_with`
- `contains` / `does_not_contain`
- `in_list` (Is case sensitive)


In [None]:
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('client'))
  .filter(K.col('client').eq('wonka'))
).compute()

In [None]:
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('client'))
  .filter(K.col('client').ne('wonka'))
).compute()

In [None]:
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('client'))
  .filter(K.col('client').ends_with('s'))
).compute()

In [None]:
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('client'))
  .filter(K.col('client').in_list(['Wonka', 'Wayne Enterprises']))
).compute()

#### 2.f.2 Number filtering

Multiple operators are available to filter numeric columns:

- `eq`
- `ne`
- `lt` / `lte` (Lesser than / Lesser than or equal)
- `gt` / `gte` (Greater than / Greater than or equal)

In [None]:
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('price'))
  .filter(K.col('price').gt(2))
  .filter(K.col('price').lt(10))
).compute()

#### 2.f.3 Temporal filtering

The following three operators are available to filter temporal columns:

- `time_range()` (Time range filters depend on the timezone!)
- `date_range()`
- `datetime_range()`

When computing, the timezone of your python runtime will be assumed (ouptput of `tzlocal.get_localzone()`), unless you specify it explicitely in the `force_tz` argument (see below for examples).



In [None]:
# Time filter in the UTC timezone
(kawa
  .sheet(
      sheet_name='Computation API sandbox',
      force_tz='UTC',
  )
  .select(K.cols())
  .filter(K.col('update').time_range(
      from_inclusive=time(3,0), 
      to_inclusive=time(4,0))
  )
  .order_by('update')
).compute()

In [None]:
# Time filter in the New York timezone
(kawa
  .sheet(
      sheet_name='Computation API sandbox',
       force_tz='America/New_York',
  )
  .select(K.cols())
  .filter(K.col('update').time_range(
      from_exclusive=time(22,0), 
      to_exclusive=time(23,0))
  )
  .order_by('update')
).compute()

In [None]:
# Date time filter in the UTC timezone
(kawa
  .sheet(
      sheet_name='Computation API sandbox',
      force_tz='UTC',
  )
  .select(K.cols())
  .filter(K.col('update').datetime_range(
      from_inclusive=utc(2035, 1, 1, 23, 45, 6), 
      to_inclusive=utc(2035, 1, 2, 3, 45, 6))
  )
  .order_by('update')
).compute()

In [None]:
# Date time filter in the New York timezone. The same rows will be returned, shown in the correct TZ
# (The datetimes of the filter do not depend on the computation TZ, they are defined in UTC)
(kawa
  .sheet(
      sheet_name='Computation API sandbox',
      force_tz='America/New_York',
  )
  .select(K.cols())
  .filter(K.col('update').datetime_range(
      from_inclusive=utc(2035, 1, 1, 23, 45, 6), 
      to_inclusive=utc(2035, 1, 2, 3, 45, 6))
  )
  .order_by('update')
).compute()

In [None]:
# Date filters, they do not depend on the Timezone. Dates are local dates.
(kawa
  .sheet(
      sheet_name='Computation API sandbox',
      force_tz='UTC',
  )
  .select(K.cols())
  .filter(K.col('order_date').date_range(
      from_inclusive=date(2035, 1, 1), 
      to_inclusive=date(2035, 1, 2))
  )
  .order_by('update')
).compute()

#### 2.f.4 Filter composition

Filters can be chained in tow ways to obtain AND/OR compositions.
Filters can be excluded using the `exclude` operator. 

In [None]:
# OR composition
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('client'))
  # Client contains 'wonka' OR client contains 'systems' 
  .filter(K.col('client').contains('wonka').contains('systems'))
).compute()

In [None]:
# AND composition
(kawa
  .sheet(sheet_name='Computation API sandbox')
  .select(K.col('client'))
  # Client begins  with 'w' AND ands with  's' 
  .filter(K.col('client').starts_with('w'))
  .filter(K.col('client').ends_with('s'))
).compute()

In [None]:
(kawa
  .sheet(
      sheet_name='Computation API sandbox',
      force_tz='UTC',
  )
  .select(K.cols())
  # Use of the exclude operator
  .filter(K.col('client').starts_with('w').exclude())
  .order_by('update')
).compute()

## 3 Run KAWA tools

You can use the kawa client to test your tools against your data. You have two options to do so.

Here is an example of a simple KAWA tool that computes the length of a text in Python, adding a fix offset to it.


In [8]:
from kywy.client.kawa_decorators import kawa_tool
@kawa_tool(
    inputs={'text':str},
    outputs={'length':float},
    parameters={'offset':{'type':float}},
)
def kawa_tool(dataframe, offset):        
    dataframe['length'] = dataframe['text'].apply(lambda x: int(offset) + len(x))
    return dataframe

### 3.1 Run KAWA tools using the computation DSL

In order to do so, add the `kawa_tool` stage in your query.
This stage takes at least two arguments:

- The tool itself (any python function with the @kawa_tool decorator
- If the tool has some inputs, you need to specify a mapping (similar to the one in the GUI). It will be used to map columns from your dataframe to the actual inputs of the script. Here, we are mapping the `comment` column to the `text` input of the script.

If the tool has parameters, you need to give a value for each of them. Here, we are setting a value for the `offset` parameter.

In [23]:
query = (kawa
    .sheet(sheet_name='Computation API sandbox')
    .order_by('id', ascending=True)
    .select(K.col('comment'), K.col('id'))
    .kawa_tool(kawa_tool=kawa_tool, mapping={'comment': 'text'}, offset=100)
)

query.compute()

Unnamed: 0,text,id,length
0,Sold 10 items,1,113
1,Sold 20 items,2,113
2,Sold 100 items,3,114
3,Sold 1 items,4,112
4,Sold 1 items,5,112


### 3.2 Run KAWA tools directly with the kawa client

This is a simpler syntax to run the tool on the raw data (either sheet or directly the datasource)



In [33]:
kawa.run_tool(
    datasource_id=datasource_id, # You can also use: sheet_id
    mapping={'comment': 'text'},
    tool=kawa_tool,
    offset=8888,
)

Unnamed: 0,text,length
0,Sold 10 items,8901
1,Sold 20 items,8901
2,Sold 100 items,8902
3,Sold 1 items,8900
4,Sold 1 items,8900
