## Setup database connectivity

We'll reuse our module from the previous notebook (***`00_database_connectivity_setup.ipynb`***) to establish connectivity to the database

In [29]:
%run '00_database_connectivity_setup.ipynb'
IPython.display.clear_output()

Your connection object is ***`conn`***:
1. Queries: You can run your queries using ***```psql.read_sql("""<YOUR SQL>""", conn)```***. Alternatively, if you don't want a handle to the resulting dataframe, you can run the code inline using the magic command we defined in previously in cell: ***```%%showsql```***.
2. Create/Delete/Updates: You can run these statements using ***```psql.execute("""<YOUR SQL>""", conn)```***, followed by a ***```conn.commit()```*** command to ensure your transaction is committed. Otherwise your changes will be rolledback if you terminate your kernel. Alternatively, you could use the magic command that we previously defined in the cell: ***```%%execsql```***.

If you created a new connection object (say to connect to a new cluster) as shown in the last section of `00_database_connectivity_setup.ipynb` notebook, use that connection object where needed.

## Can PL/Python UDFs work on inputs exceeding max_field_size (1 GB)?

[Greenplum](http://greenplum.org/) is an MPP database that was forked off of Postgres many years ago. Just like Postgres, Greenplum data types have a [max field size of 1 GB](http://www.postgresql.org/about/).

Greenplum is well suited for data science problems at massive scale, using in-database machine learning libraries like [MADlib](http://madlib.net) and procedural languages like PL/Python and PL/R which enable data scientists to harness the vast ecosystem of machine learning models and statistical functions in Python and R. While dealing with parellelization of data science models, there are two flavors of problems that data scientists encounter: 
1. Embarassingly parallel problems or data parallel problems.
2. Model parallel problems.

Data-parallel problems are those that typically involve building the same machine learning model on different subsets of the full-dataset or for instance, running a grid-search on model parameters, using the same input dataset. These are relatively easy to parallelize on Greenplum and other MPP databases like [HAWQ](http://hawq.incubator.apache.org/) using User Defined Functions in PL/Python, PL/R or any other procedural languages supported by these platforms. More information on this can be found here: [All things Python @ Pivotal](http://www.slideshare.net/SrivatsanRamanujam/all-thingspythonpivotal) (slides 22-30) and here: [gp-xgboost-gridsearch](https://github.com/vatsan/gp_xgboost_gridsearch).  

Model parallel problems typically involve building a machine learning model on a dataset that cannot fit into memory, on a distributed cluster. The [MADlib library](http://madlib.incubator.apache.org/) (slides 31-52 in [All things Python @ Pivotal](http://www.slideshare.net/SrivatsanRamanujam/all-thingspythonpivotal)) explicitly parallelizes such models by splitting them into sub-tasks that can be executed in parallel and combining the results from these sub-tasks to fit the original model.

One limitation while working on data-parallel problems is the max_field_size limit of Greenplum/Postgres which disallows UDFs from accepting inputs that exceed 1 GB (ex: a float8[]). For data science problems, one may often have to work with datasets that are represented as matrices (or large linear arrays) that well exceed the max_field_size limit. For instance, in the example here: [gp_xgboost_gridsearch](https://github.com/vatsan/gp_xgboost_gridsearch), several models are built in parallel, for each combination of the input parameters. If the serialized input dataset exceeds 1 GB in size, these UDFs would error out due to the violation of the max_fieldsize_limit. 

This prevents users from harnessing the full power of the MPP cluster even when segment hosts typically have a lot more memory (ex: 48 GB - 128 GB depending on the cluster) than the max_field_size limit. Granted, a segment host typically could have 6-8 segment nodes and in a multi-user environment, one has to be mindful of not eating up into memory that'll prevent other users from executing their queries. However, it would be useful to have the ability to build machine learning models using popular Python & R libraries, that could well exceed the max_fieldsize_limit.

In this notebook, we'll demonstrate how to work around the max_fieldsize_limit using UDFs. 

## Warning

This should only be considered a work-around as it relies on the distribution strategy of the data on disks (i.e. by using the appropriate distribution key, all records pertaining to a given key will reside on the same segment and hence can be processed by a UDF in the respective segments).

## 1. Create a table with rows containing a field close to max_fieldsize (~ 1 GB)

In [None]:
%%execsql
-- An array of 120000000 float8(8 bytes) types = 960 MB

--1) Define UDF to generate large arrays
create or replace function gen_array(x int)
returns float8[]
as
$$
    from random import random
    return [random() for _ in range(x)]
$$language plpythonu;

--2) Create a table
drop table if exists cellsize_test;
create table cellsize_test
as
(
    select
        1 as row,
        y,
        gen_array(120000000) as arr
    from
        generate_series(1, 3) y
) distributed by (row);

## 2. Attempt to pass an input > 1 GB to a UDF to demonstrate how it fails due to the max_fieldsize_limit

In [4]:
%%showsql

--1) Define a UDA to concatenate arrays
DROP AGGREGATE IF EXISTS array_agg_array(anyarray) CASCADE;
CREATE ORDERED AGGREGATE array_agg_array(anyarray)
(
    SFUNC = array_cat,
    STYPE = anyarray
);


--2) Define a UDF to consume a really large array and return its size
create or replace function consume_large_array(x float8[])
returns text
as
$$
    return 'size of x:{0}'.format(len(x))
$$language plpythonu;

--3) Invoke the UDF & UDA to demonstrate failure due to max_fieldsize_limit
select
    row,
    consume_large_array(arr)
from
(

    select
        row,
        array_agg_array(arr) as arr
    from
        cellsize_test
    group by
        row
)q;

DatabaseError: Execution failed on sql '
--1) Define a UDA to concatenate arrays
DROP AGGREGATE IF EXISTS array_agg_array(anyarray) CASCADE;
CREATE ORDERED AGGREGATE array_agg_array(anyarray)
(
    SFUNC = array_cat,
    STYPE = anyarray
);


--2) Define a UDF to consume a really large array and return its size
create or replace function consume_large_array(x float8[])
returns text
as
$$
    return 'size of x:{0}'.format(len(x))
$$language plpythonu;

--3) Invoke the UDF & UDA to demonstrate failure due to max_fieldsize_limit
select
    row,
    consume_large_array(arr)
from
(

    select
        row,
        array_agg_array(arr) as arr
    from
        cellsize_test
    group by
        row
)q;': array size exceeds the maximum allowed (134217727)  (seg42 slice1 sdw1:40000 pid=25165)


Notice the error `array size exceeds the maximum allowed (134217727)  (seg42 slice1 sdw1:40000 pid=25165)`, which is stemming from the max_fieldsize_limit.

## 3. Using the static dictionary SD, demonstrate how to use a UDF that processes inputs exceeding max_field_size

Note:
All PL/Python functions have two dictionaries, SD and GD, that can be used to cache data in memory.
(http://www.postgresql.org/docs/8.2/static/plpython-funcs.html)
1. SD is private to a UDF, it is used to cache data between function calls.
2. GD is global dictionary, it is available to all UDFs within a session.

In [5]:
%%showsql
--1) Define UDF that caches each row of data in the static dictionary (SD)
--   and chains all such accumulated arrays to return combined length
--   when a certain condition is met
create or replace function consume_large_array_w_gd(x float8[], y int, final_value int)
returns text 
as
$$
    import itertools
    import sys
    if SD.has_key('large_array'):
        SD['large_array'].append(x)
    else:
        SD['large_array'] = [x]
    if y== final_value:
        result = list(itertools.chain(*SD['large_array']))
        return 'Size: {0}  MB'.format(sys.getsizeof(result)*1.0/(1024*1024))
    else:
        return 'Processing...'
$$language plpythonu;

--2) Invoke the UDF to demonstrate the workaround to exceed max_fieldsize_limit
select
    row,
    y,
    consume_large_array_w_gd(arr, y, 3) as result
from
    cellsize_test
order by
    y;

Unnamed: 0,row,y,result
0,1,1,Processing...
1,1,2,Processing...
2,1,3,Size: 2841.24237061 MB


Notice from the results above, our UDF was able to concatenate 3 arrays, each of size 970 MB, to obtain an array of size 2.8 GB, which is well above the max_fieldsize_limit. By extending the above idea, we can write UDFs that could build machine learning models operating on > 1 GB of data.

## 4. Extend the workaround above to demonstrate building data parallel ML models on datasets exceeding 1 GB

Let's work with the [Wine quality dataset](https://archive.ics.uci.edu/ml/datasets/Wine+Quality) from the UCI Machine Learning repository. This dataset has the following fields, with the modeling goal being predicting wine quality, from properties like alcohol content, mmalic_acid, magneisum, ash, alcalinity_of_ash etc.

The original dataset has only 178 samples. We'll repeat them to create a dataset, whose rows when combined, will exceed 1 GB in size. With 13 floating point features and 178 samples, we'll need to repeat each row roughly 55K times to create a dataset > 1 GB in memory. We'll replicate this for 10 models which will be built across the segments.

In [None]:
%%execsql
drop table if exists wine_large cascade;
create table wine_large
as
(
    select
        model,
        row_number() over(partition by model order by random()) as row_indx,
        features,
        quality
    from
    (
        select
            ARRAY[
                alcohol,
                mmalic_acid,
                ash,
                alcalinity_of_ash,
                magnesium,
                total_phenols,
                flavanoids,
                nonflavanoid_phenols,
                proanthocyanins,
                color_intensity,
                hue,
                od280,
                proline
            ] as features,
            quality
        from
            wine
    )q,
    generate_series(1, 10) model,
    generate_series(1, 55000) repeatition
) distributed by (model);

In [30]:
%%showsql
-- Size of table on disk
select 
    schemaname,
    tablename,
    sz,
    pg_size_pretty(sz) as table_size
from
(
     select 
        schemaname,
        tablename,
        pg_relation_size(schemaname||'.'||tablename) as sz
     from 
        pg_tables
     where 
        tablename = 'wine_large'
)q
order by sz desc;

Unnamed: 0,schemaname,tablename,sz,table_size
0,public,wine_large,12151685120,11 GB


We're creating a sample dataset where each row is a float8[] of 13 features and there are 9790000 such rows per segment. That's a matrix of size 13*9790000*8 bytes (1.01 GB), which is greater than the max_fieldsize_limit. 10 such models will be built in parallel, one for each segment using the workaround we demonstrated before.

### Define UDF to build ridge regression model in sklearn on each dataset. 10 models will be built in parallel.

In [31]:
%%execsql

--1) Create a return type for model results
DROP TYPE IF EXISTS host_mdl_coef_intercept CASCADE;
CREATE TYPE host_mdl_coef_intercept
AS
(
    hostname text, -- hostname on which the model was built
    coef float[], -- model coefficients
    intercept float, -- intercepts
    r_square float -- training data fit
);


--2) UDF to build ridge regression models
create or replace function sklearn_w_sd(
    features float8[], 
    label float8, 
    current_row int, 
    final_row int
)
returns host_mdl_coef_intercept
as
$$
    import numpy as np
    import os
    from sklearn import linear_model
    if SD.has_key('wine_large'):
        SD['wine_large'].append(features)
        SD['wine_large_label'].append(label)
    else:
        SD['wine_large'] = [features]
        SD['wine_large_label'] = [label]
    if current_row == final_row:
        mdl = linear_model.Ridge(alpha = .5)
        input_matrix = np.matrix(SD['wine_large'])
        labels = np.matrix(SD['wine_large_label']).transpose()
        mdl.fit(input_matrix, labels)
        plpy.info('input_matrix: {0}'.format(input_matrix.shape))
        plpy.info('labels: {0}'.format(labels.shape))
        plpy.info('Results:{0}, {1}, {2}'.format(mdl.coef_[0], mdl.intercept_[0], mdl.score(input_matrix, labels)))
        return [os.popen('hostname').read().strip(), mdl.coef_[0], mdl.intercept_[0], mdl.score(input_matrix, labels)]     
    else:
        return None
$$language plpythonu;

In [37]:
%%showsql

select
    gp_segment_id,
    model,
    row_indx as current_row,
    final_row,
    (result).*
from
(
    select
        gp_segment_id,
        t1.model,
        t1.row_indx,
        t2.final_row,
        sklearn_w_sd(
            t1.features,
            t1.quality,
            t1.row_indx::int,
            t2.final_row::int
        ) as result
    from
        wine_large t1,
        (
            select
                model,
                max(row_indx) as final_row
            from
                wine_large
            group by 
                model
        ) t2
    where 
        t1.model = t2.model
    -- Ordering is important
    order by 
        t1.row_indx
)q
where 
    result is not null;

Unnamed: 0,gp_segment_id,model,current_row,final_row,hostname,coef,intercept,r_square
0,40,7,9790000,9790000,sdw16,"[-318.618679571, 66.1469191956, -0.377781308715, 132.786910594, -8.29728552732, 1.99415717897, 64.3852933288, -100.286970978, -162.290601023, 23.1318911356, 44.6320358897, 121.367548182, -67.0307163178]",64.754602,0.724984
1,67,6,9790000,9790000,sdw5,"[-318.618679571, 66.1469191956, -0.377781308697, 132.786910593, -8.29728552731, 1.99415717897, 64.3852933288, -100.286970978, -162.290601023, 23.1318911355, 44.6320358897, 121.367548182, -67.0307163179]",64.754602,0.724984
2,15,2,9790000,9790000,sdw12,"[-318.618679571, 66.1469191955, -0.3777813087, 132.786910593, -8.29728552729, 1.99415717897, 64.385293329, -100.286970979, -162.290601023, 23.1318911355, 44.6320358898, 121.367548182, -67.0307163178]",64.754602,0.724984
3,25,4,9790000,9790000,sdw14,"[-318.618679571, 66.1469191954, -0.377781308656, 132.786910593, -8.29728552728, 1.99415717897, 64.3852933288, -100.286970978, -162.290601023, 23.1318911356, 44.6320358898, 121.367548182, -67.030716318]",64.754602,0.724984
4,42,1,9790000,9790000,sdw1,"[-318.618679571, 66.1469191955, -0.377781308684, 132.786910593, -8.29728552728, 1.99415717897, 64.385293329, -100.286970979, -162.290601023, 23.1318911356, 44.6320358898, 121.367548182, -67.030716318]",64.754602,0.724984
5,94,5,9790000,9790000,sdw9,"[-318.618679571, 66.1469191955, -0.37778130868, 132.786910593, -8.29728552727, 1.99415717897, 64.385293329, -100.286970979, -162.290601023, 23.1318911356, 44.6320358898, 121.367548182, -67.0307163179]",64.754602,0.724984
6,23,10,9790000,9790000,sdw13,"[-318.61867957, 66.1469191957, -0.377781308713, 132.786910594, -8.29728552735, 1.99415717897, 64.3852933285, -100.286970978, -162.290601023, 23.1318911354, 44.6320358897, 121.367548182, -67.0307163177]",64.754602,0.724984
7,50,9,9790000,9790000,sdw2,"[-318.618679571, 66.1469191957, -0.37778130871, 132.786910594, -8.29728552734, 1.99415717897, 64.3852933285, -100.286970978, -162.290601023, 23.1318911356, 44.6320358897, 121.367548182, -67.0307163177]",64.754602,0.724984
8,77,8,9790000,9790000,sdw6,"[-318.618679571, 66.1469191956, -0.377781308687, 132.786910593, -8.29728552729, 1.99415717897, 64.3852933289, -100.286970979, -162.290601023, 23.1318911358, 44.6320358897, 121.367548182, -67.030716318]",64.754602,0.724984
9,52,3,9790000,9790000,sdw2,"[-318.618679571, 66.1469191955, -0.37778130866, 132.786910594, -8.29728552731, 1.99415717897, 64.3852933293, -100.286970979, -162.290601024, 23.1318911357, 44.6320358897, 121.367548182, -67.030716318]",64.754602,0.724984


From the above results, we can see that we were able to build 10 models in parallel, one for each segment, where each segment had a matrix which was > 1 GB in size. This UDF can thus be extended to other data parallel problems.

## Acknowledgments

This is joint work w/ Heikki Linnakangas and Ivan Novick of Pivotal.

Srivatsan Ramanujam <vatsan.cs@utexas.edu>, Feb-29, 2016.