### Setup database connectivity

We'll reuse our module from the previous notebook (***`00_database_connectivity_setup.ipynb`***) to establish connectivity to the database

In [1]:
%run '00_database_connectivity_setup.ipynb'
IPython.display.clear_output()

### Load UCI Wine Quality dataset 

We'll use the [UCI Wine Quality dataset](https://archive.ics.uci.edu/ml/datasets/Wine+Quality) for our demos.
```
Attribute Information:

For more information, read [Cortez et al., 2009]. 
Input variables (based on physicochemical tests): 
1 - fixed acidity 
2 - volatile acidity 
3 - citric acid 
4 - residual sugar 
5 - chlorides 
6 - free sulfur dioxide 
7 - total sulfur dioxide 
8 - density 
9 - pH 
10 - sulphates 
11 - alcohol 
Output variable (based on sensory data): 
12 - quality (score between 0 and 10)
```

In [2]:
%%time
%%bash
mkdir -p /tmp/postgresopen_2017
if [ ! -f /tmp/postgresopen_2017/winequality-red.csv ]; then
    echo "Fetching UCI Wine Quality dataset"
    curl https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv > /tmp/postgresopen_2017/winequality-red.csv
fi
ls -l /tmp/postgresopen_2017/winequality-red.csv

-rw-rw-r-- 1 vatsan vatsan 84199 Sep  8 10:18 /tmp/postgresopen_2017/winequality-red.csv
CPU times: user 0 ns, sys: 8 ms, total: 8 ms
Wall time: 25.5 ms


In [3]:
%%time
%%execsql
drop table if exists wine_sample;
create table wine_sample
(
    id serial,
    fixed_acidity float8,
    volatile_acidity float8,
    citric_acid float8,
    residual_sugar float8,
    chlorides  float8,
    free_sulfur_dioxide float8,
    total_sulfur_dioxide float8,
    density float8,
    ph float8,
    sulphates float8,
    alcohol float8,
    quality float8
);
copy 
wine_sample
(
    fixed_acidity,
    volatile_acidity,
    citric_acid,
    residual_sugar,
    chlorides,
    free_sulfur_dioxide,
    total_sulfur_dioxide,
    density,
    ph,
    sulphates,
    alcohol,
    quality
) from '/tmp/postgresopen_2017/winequality-red.csv' WITH DELIMITER ';' CSV HEADER;

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 20.8 ms


In [4]:
%%time
%%showsql
select
    *
from
    wine_sample
limit 10;

Unnamed: 0,id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,ph,sulphates,alcohol,quality
0,1,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5.0
1,2,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5.0
2,3,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5.0
3,4,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6.0
4,5,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5.0
5,6,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5.0
6,7,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5.0
7,8,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7.0
8,9,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7.0
9,10,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5.0


CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 45.7 ms


In [5]:
%%time
%%showsql
select
    id,
    ARRAY[
        fixed_acidity,
        volatile_acidity,
        citric_acid,
        residual_sugar,
        chlorides,
        free_sulfur_dioxide,
        total_sulfur_dioxide,
        density,
        ph,
        sulphates,
        alcohol
    ] as features,
    quality
from
    wine_sample
limit 10;

Unnamed: 0,id,features,quality
0,1,"[7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978, 3.51, 0.56, 9.4]",5.0
1,2,"[7.8, 0.88, 0.0, 2.6, 0.098, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8]",5.0
2,3,"[7.8, 0.76, 0.04, 2.3, 0.092, 15.0, 54.0, 0.997, 3.26, 0.65, 9.8]",5.0
3,4,"[11.2, 0.28, 0.56, 1.9, 0.075, 17.0, 60.0, 0.998, 3.16, 0.58, 9.8]",6.0
4,5,"[7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978, 3.51, 0.56, 9.4]",5.0
5,6,"[7.4, 0.66, 0.0, 1.8, 0.075, 13.0, 40.0, 0.9978, 3.51, 0.56, 9.4]",5.0
6,7,"[7.9, 0.6, 0.06, 1.6, 0.069, 15.0, 59.0, 0.9964, 3.3, 0.46, 9.4]",5.0
7,8,"[7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 21.0, 0.9946, 3.39, 0.47, 10.0]",7.0
8,9,"[7.8, 0.58, 0.02, 2.0, 0.073, 9.0, 18.0, 0.9968, 3.36, 0.57, 9.5]",7.0
9,10,"[7.5, 0.5, 0.36, 6.1, 0.071, 17.0, 102.0, 0.9978, 3.35, 0.8, 10.5]",5.0


CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 33.7 ms


In [6]:
%%time
%%execsql
--1) Define model return type record
drop type if exists host_mdl_coef_intercept CASCADE;
create type host_mdl_coef_intercept
AS
(
    hostname text, -- hostname on which the model was built
    coef float[], -- model coefficients
    intercept float, -- intercepts
    r_square float -- training data fit
);

--2) Define a UDA to concatenate arrays
drop aggregate if exists array_agg_array(anyarray) CASCADE;
create aggregate array_agg_array(anyarray)
(
    SFUNC = array_cat,
    STYPE = anyarray
);

--3) Define PL/Python function to train ridge regression model
create or replace function sklearn_ridge_regression(
    features_mat float8[],
    n_features int,
    labels float8[]
)
returns host_mdl_coef_intercept
as
$$
    import os
    from sklearn import linear_model, preprocessing
    import numpy as np
    X_unscaled = np.array(features_mat).reshape(int(len(features_mat)/n_features), int(n_features))
    # Scale the input (zero mean, unit variance)
    X = preprocessing.scale(X_unscaled)
    y = np.array(labels).transpose()
    mdl = linear_model.Ridge(alpha = .5)
    mdl.fit(X, y)
    result = [
        os.popen('hostname').read().strip(), 
        mdl.coef_, 
        mdl.intercept_, 
        mdl.score(X, y)
    ] 
    return result
$$language plpython3u;

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 53.3 ms


In [7]:
%%showsql
select
    (
        sklearn_ridge_regression(
            features_mat,
            n_features,
            labels
        )
    ).*
from
(
    select
        -- Convert rows of features into a large linear array
        array_agg_array(features order by id) as features_mat,
        -- Number of features
        max(array_upper(features, 1)) as n_features,
        -- Gather all the Labels
        array_agg(quality order by id) as labels
    from
    (
        select
            id,
            1 as grouping,
            -- Create a feature vector of independent variables
            ARRAY[
                fixed_acidity,
                volatile_acidity,
                citric_acid,
                residual_sugar,
                chlorides,
                free_sulfur_dioxide,
                total_sulfur_dioxide,
                density,
                ph,
                sulphates,
                alcohol
            ] as features,
            quality
        from
            wine_sample
    )q1
    group by
        grouping
)q2

Unnamed: 0,hostname,coef,intercept,r_square
0,vatsan-ubuntu,"[0.0436426798019139, -0.193877662040999, -0.0354155984098491, 0.0230721425304759, -0.0881619827715223, 0.0455546585850756, -0.107311407147557, -0.0339500461485907, -0.0636873180606666, 0.155255807765603, 0.294035322852005]",5.636023,0.360552


### MAX_FIELD_SIZE (1GB) Limitation

One limitation with using PL/Python for in-database machine learning in PostgreSQL, is the max_field_size limit of Postgres which disallows UDFs from accepting inputs that exceed 1 GB (ex: a float8[]). While your database table itself could be of the order of hundreds of terabytes in size, no single field (cell) can exceed 1 GB in size. Each row of data could be composed of several hundred columns and collectively a row of data could be really large, but no single column’s value, in a given row can exceed 1 GB.

```
(http://www.postgresql.org/about/)
Limit Value
Maximum Database Size Unlimited
Maximum Table Size  32 TB
Maximum Row Size  1.6 TB
Maximum Field Size  1 GB
Maximum Rows per Table  Unlimited
Maximum Columns per Table 250 - 1600 depending on column types
Maximum Indexes per Table Unlimited
```

In [8]:
%%time
%%execsql
-- An array of 120000000 float8(8 bytes) types = 960 MB
--1) Define UDF to generate large arrays
create or replace function gen_array(x int)
returns float8[]
as
$$
    from random import random
    return [random() for _ in range(x)]
$$language plpython3u;

--2) Create a table
drop table if exists cellsize_test;
create table cellsize_test
as
(
    select
        1 as row,
        y,
        gen_array(12000) as arr
    from
        generate_series(1, 3) y
);

--3) Define a UDA to concatenate arrays
DROP AGGREGATE IF EXISTS array_agg_array(anyarray) CASCADE;
CREATE AGGREGATE array_agg_array(anyarray)
(
    SFUNC = array_cat,
    STYPE = anyarray
);

--4) Define a UDF to consume a really large array and return its size
create or replace function consume_large_array(x float8[])
returns text
as
$$
    return 'size of x:{0}'.format(len(x))
$$language plpython3u;

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 126 ms


NOTE: The following query will fail if we had allocated > 1GB for the field 'arr' (for the purpose of this talk & avoid my system from hanging/crashing) I'm not demonstrating the memory error. Feel free to clone the repo and reproduce this for yourself.

In [9]:
%%time
%%showsql
--5) Invoke the UDF & UDA to demonstrate failure due to max_fieldsize_limit
select
    row,
    consume_large_array(arr)
from
(

    select
        row,
        array_agg_array(arr) as arr
    from
        cellsize_test
    group by
        row
)q;

Unnamed: 0,row,consume_large_array
0,1,size of x:36000


CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 19.6 ms


For data science problems we often work with datasets/matrices that exceed the max_field_size limit. For instance, in the example here: [gp_xgboost_gridsearch](https://github.com/vatsan/gp_xgboost_gridsearch), several XGBoost models are built in parallel for every possible combination of the input parameters. If the input dataset exceeds 1 GB in size, these UDFs would error out due to the violation of the max_fieldsize_limit. This limitation prevents users from harnessing the full power of the database (or MPP cluster) even when the host(s) have a lot more memory than the max_field_size limit.

### Using SD and GD with UDAs to bypass max_fieldsize_limit for building large scale ML models

We’ll next demonstrate how to work-around the max_fieldsize_limit, to write PL/Python UDFs that harness popular machine learning libraries like scikit-learn on datasets several tens to hundreds of gigabytes in size.
All PL/Python UDFs have two dictionaries, `SD` and `GD`, that can be used to cache data in memory.

1. `SD` is private to a UDF, it is used to cache data between function calls in a given transaction.
2. `GD` is global dictionary, it is available to all UDFs within a transaction.

Here is an illustration of how it works: ![SD & GD with UDAs](https://github.com/vatsan/postgresopen-2017/blob/master/docs/images/largescale_sklearn_models_mpp.png?raw=true)

In [10]:
%%time
%%execsql
--1) SFUNC: State transition function, part of a User-Defined-Aggregate definition
-- This function will merely stack every row of input, into the GD variable
drop function if exists stack_rows(
    text,
    text[],
    float8[],
    float8
) cascade;

create or replace function stack_rows(
    key text,
    header text[], -- name of the features column and the dependent variable column
    features float8[], -- independent variables (as array)
    label float8 -- dependent variable column
)
returns text
as
$$
    if 'header' not in GD:
        GD['header'] = header
    if not key:
        gd_key = 'stack_rows'
        GD[gd_key] = [[features, label]]
        return gd_key
    else:
        GD[key].append([features, label])
        return key
$$language plpython3u;

--2) Define the User-Defined Aggregate (UDA) consisting of a state-transition function (SFUNC), a state variable and a FINALFUNC (optional)
drop aggregate if exists stack_rows( 
    text[], -- header (feature names)
    float8[], -- features (feature values),
    float8 -- labels
) cascade;
create aggregate stack_rows(
        text[], -- header (feature names)
        float8[], -- features (feature values),
        float8 -- labels
    )
(
    SFUNC = stack_rows,
    STYPE = text -- the key in GD used to hold the data across calls
);

--3) Create a return type for model results
DROP TYPE IF EXISTS host_mdl_coef_intercept CASCADE;
CREATE TYPE host_mdl_coef_intercept
AS
(
    hostname text, -- hostname on which the model was built
    coef float[], -- model coefficients
    intercept float, -- intercepts
    r_square float -- training data fit
);

--4) Define a UDF to run ridge regression by retrieving the data from the key in GD and returning results
drop function if exists run_ridge_regression(text) cascade;
create or replace function run_ridge_regression(key text)
returns host_mdl_coef_intercept
as
$$
    import os
    import numpy as np   
    import pandas as pd
    from sklearn import linear_model, preprocessing
    
    if key in GD:
        df = pd.DataFrame(GD[key], columns=GD['header'])
        mdl = linear_model.Ridge(alpha = .5)
        X_unscaled = np.mat(df[GD['header'][0]].values.tolist())
        # Scale the input (zero mean, unit variance)
        X = preprocessing.scale(X_unscaled)
        y = np.mat(df[GD['header'][1]].values.tolist()).transpose()
        mdl.fit(X, y)
        result = [
            os.popen('hostname').read().strip(), 
            mdl.coef_[0], 
            mdl.intercept_[0], 
            mdl.score(X, y)
        ]   
        GD[key] = result        
        result = GD[key]
        del GD[key]
        return result
    else:
        plpy.info('returning None')
        return None
$$ language plpython3u;

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 3.13 ms


As seen above, we first defined a state-transition function, which is one of the building blocks of a User-Defined Aggregate. This function takes a row of input (in this case it is a record consisting of a float8[] and a float8 corresponding to the feature vector and the dependent variable) and stacks in the `GD` variable, using a user-specified key. This key in `GD` is accessible by any other UDFs in the same transaction, thus the UDF run_ridge_regression retrieves all the stacked rows from the `GD` variable, constructs the input matrix required for the ridge regression model of `sklearn` and returns a set of rows as result, where each row of output corresponds to the hostname on which the model was built, the coefficients of the model, the intercept and the [coefficient of determination](https://en.wikipedia.org/wiki/Coefficient_of_determination) of the model on the training dataset. All these building blocks were combined in the definition of the User-Defined Aggregate.

We can invoke our UDAs and UDFs like so:

In [11]:
%%time
%%showsql
select
    model,
    (results).*
from
(
    select
        model,
        run_ridge_regression(
            stacked_input_key
        ) as results
    from
    (
        select
            model,
            stack_rows(
                ARRAY['features', 'quality'], --header or names of input fields
                features, -- feature vector input field
                quality -- label column
            ) as stacked_input_key
        from
        (
            select
                id,
                1 as model,
                -- Create a feature vector of independent variables
                ARRAY[
                    fixed_acidity,
                    volatile_acidity,
                    citric_acid,
                    residual_sugar,
                    chlorides,
                    free_sulfur_dioxide,
                    total_sulfur_dioxide,
                    density,
                    ph,
                    sulphates,
                    alcohol
                ] as features,
                quality
            from
                wine_sample
        ) q1
        group by
            model
    )q2
)q3;

Unnamed: 0,model,hostname,coef,intercept,r_square
0,1,vatsan-ubuntu,"[0.0436426798019139, -0.193877662040999, -0.0354155984098491, 0.0230721425304759, -0.0881619827715223, 0.0455546585850756, -0.107311407147557, -0.0339500461485907, -0.0636873180606666, 0.155255807765603, 0.294035322852005]",5.636023,0.360552


CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 512 ms
