In [1]:
from datetime import datetime
from modelhub import ModelHub
from bach import display_sql_as_markdown

In [2]:
modelhub = ModelHub(time_aggregation='%Y-%m-%d')

In [3]:
df = modelhub.get_objectiv_dataframe(start_date='2022-02-02')
df['feature_nice_name'] = df.location_stack.ls.nice_name

In [4]:
# construct steps column which is a list of strings 
df_steps = df.groupby('session_id')['feature_nice_name'].to_json_array().reset_index()
df_steps = df_steps.rename(columns={'feature_nice_name': 'steps'})
df_steps = df.merge(df_steps, on='session_id')[['session_id', 'steps']].drop_duplicates()
df_steps.to_pandas()

Unnamed: 0_level_0,session_id,steps
event_id,Unnamed: 1_level_1,Unnamed: 2_level_1
6307e080-d6a4-4e7d-b47f-a029b90cf6e1,97,"[Root Location: blog, Media Player: objectiv-i..."
03bfb487-a6ae-4175-bf3a-db127a75a85f,1651,"[Root Location: home, Media Player: 2-minute-v..."
21629b85-c17a-458b-a5ab-ff4c074a963c,2872,"[Root Location: home, Media Player: 2-minute-v..."
0f3b8b89-e4ff-4256-9beb-c3f252216699,2116,[Content: post-meet-objectiv-open-source-produ...
02ad3934-665c-47e9-8d76-c2a7a79f2c60,3600,[Pressable: hamburger located at Root Location...
...,...,...
0f0fc271-a3ca-4952-bed7-90ca7656dbb1,3836,[Pressable: after located at Root Location: ho...
1ba7c155-0521-431d-8f5d-564aa3e8daa6,3852,"[Root Location: home, Link: docs located at Ro..."
0d74fe99-d796-423b-bccb-f7c64a155aa6,3720,[Expandable: Reference located at Root Locatio...
044ec938-81c0-496a-9a11-f35412f81614,3975,"[Root Location: home, Link: docs located at Ro..."


## My custom lambda func

In [5]:
func_ngram = lambda data, n: [data[i: i + n] for i in range(len(data) - n + 1)]

func_ngram(['a', 'b', 'c', 'd', 'e', 'f'], 3)

[['a', 'b', 'c'], ['b', 'c', 'd'], ['c', 'd', 'e'], ['d', 'e', 'f']]

In [6]:
x = ['a', 'b', 'c', 'd', 'e', 'f']
func_ngram(['a', 'b', 'c', 'd', 'e', 'f'], 0)

[[], [], [], [], [], [], []]

In [7]:
x[6:]

[]

### For each row of `df_steps.steps` want to apply `func_ngram` func

In [8]:
df_steps_pandas = df_steps.to_pandas()
n_gram = 3 # this value will provide the user

df_steps_pandas['desired_col'] = df_steps_pandas['steps'].apply(lambda x: func_ngram(x, n_gram))

In [9]:
df_steps_pandas['steps'].iloc[0]

['Root Location: blog',
 'Media Player: objectiv-in-2-minutes located at Root Location: blog => Content: post-meet-objectiv-open-source-product-analytics-designed-for-data-sc']

In [10]:
df_steps_pandas['desired_col'].iloc[0]

[]

In [11]:
df_steps_pandas['steps'].apply(lambda x: func_ngram(x, n_gram))

event_id
6307e080-d6a4-4e7d-b47f-a029b90cf6e1                                                   []
03bfb487-a6ae-4175-bf3a-db127a75a85f    [[Root Location: home, Media Player: 2-minute-...
21629b85-c17a-458b-a5ab-ff4c074a963c    [[Root Location: home, Media Player: 2-minute-...
0f3b8b89-e4ff-4256-9beb-c3f252216699    [[Content: post-meet-objectiv-open-source-prod...
02ad3934-665c-47e9-8d76-c2a7a79f2c60    [[Pressable: hamburger located at Root Locatio...
                                                              ...                        
0f0fc271-a3ca-4952-bed7-90ca7656dbb1    [[Pressable: after located at Root Location: h...
1ba7c155-0521-431d-8f5d-564aa3e8daa6    [[Root Location: home, Link: docs located at R...
0d74fe99-d796-423b-bccb-f7c64a155aa6    [[Expandable: Reference located at Root Locati...
044ec938-81c0-496a-9a11-f35412f81614    [[Root Location: home, Link: docs located at R...
110f27ac-183f-44d6-8800-ad8655825699    [[Expandable: Reference located at Root Locati...
N

In [22]:
import bach
from sql_models.util import is_postgres, is_bigquery, DatabaseNotSupportedException
from abc import abstractmethod


class ListShifterOperation:
    ARRAY_TO_STR_STMT = "'[' || ARRAY_TO_STRING(ARRAY({}), ', ') || ']'"

    FIRST_ELEMENT_SUBLIST_OFFSET = '__first_sublist_offset'
    GENERATED_OFFSET_SUBLIST = '__sublist_item_offset'

    def __init__(self, list_series: bach.SeriesJson, shifting_n: int) -> None:
        self._list_series = list_series
        self._shifting_n = shifting_n

    @abstractmethod
    def r_shift(self) -> bach.SeriesJson:
        """
        Steps for generating shifted sublists:
            1. Cast list_series expression to array (JSON_QUERY_ARRAY)
            2. Generate index of items per sublist
            3. Iterate over generated indexes and extract items from original array
            4. Generate expression for unnesting original array and creating array
                with final sub-lists
        """
        raise NotImplementedError()


class PostgresListShifterOperation(ListShifterOperation):

    def r_shift(self) -> bach.SeriesJson:
        # item indexes are ordinal for PG, therefore we need to subtract 1
        start_gen_series = f'{self.FIRST_ELEMENT_SUBLIST_OFFSET} - 1'
        stop_gen_series = f'{start_gen_series} + {self._shifting_n - 1}'

        # generates a series with the indexes to extract and creates sublist
        # containing items based on generated indexes.
        sub_list_gen_fmt = (
            f"""
            SELECT {{}} -> cast({self.GENERATED_OFFSET_SUBLIST} as int)
            FROM GENERATE_SERIES({start_gen_series}, {stop_gen_series}) 
            as {self.GENERATED_OFFSET_SUBLIST}
            """
        )
        sub_list_gen_expr = bach.expression.Expression.construct(sub_list_gen_fmt, self._list_series)

        # unnest list series and selects the result of the generated sublists
        # validates that index of sublist first element does not exceed shifting length
        unnest_main_list_fmt = (
            f'SELECT {self.ARRAY_TO_STR_STMT} FROM jsonb_array_elements({{}}) '
            f'WITH ORDINALITY rshift(elem, {self.FIRST_ELEMENT_SUBLIST_OFFSET}) '
            f'WHERE {start_gen_series} <= {{}} - {self._shifting_n}'
        )
        agg_sublists_expr = bach.expression.Expression.construct(
            unnest_main_list_fmt,
            sub_list_gen_expr,
            self._list_series,
            self._list_series.json.get_array_length(),
        )

        # format the final agg list from array to string
        return self._list_series.copy_override(
            expression=bach.expression.Expression.construct(
                self.ARRAY_TO_STR_STMT,
                agg_sublists_expr,
            )
        )
    
    def count_sublist_occurrence(self, shift_type='right') -> bach.SeriesInt64:
        if shift_type != 'right':
            raise Exception(f'{shift_type} is not implemented. "right" shift is supported only.')
        
        shifted_list = self.r_shift()
        shifted_list = shifted_list.materialize(node_name='shifted_list')
        
        unnested_sublist_series = shifted_list.copy_override(
            expression=bach.expression.Expression.construct(
                'unnest(array(select sublist :: text from jsonb_array_elements({}::jsonb) as sublist))',
                shifted_list
            ),
        )
        df = unnested_sublist_series.to_frame()
        df['count'] = 1
        return df.groupby(self._list_series.name)['count'].count()


class BigQueryListShifterOperation(ListShifterOperation):

    def r_shift(self) -> bach.SeriesJson:
        start_gen_series = f'{self.FIRST_ELEMENT_SUBLIST_OFFSET}'
        stop_gen_series = f'{start_gen_series} + {self._shifting_n - 1}'

        # generates a series with the indexes to extract and creates sublist
        # containing items based on generated indexes.
        sub_list_gen_fmt = (
            f"""
            SELECT JSON_QUERY_ARRAY({{}})[OFFSET({self.GENERATED_OFFSET_SUBLIST})]
            FROM UNNEST(GENERATE_ARRAY({start_gen_series}, {stop_gen_series})) 
            as {self.GENERATED_OFFSET_SUBLIST}
            """
        )
        sub_list_gen_expr = bach.expression.Expression.construct(sub_list_gen_fmt, self._list_series)

        # unnest list series and selects the result of the generated sublists
        # validates that index of sublist first element does not exceed shifting length
        unnest_main_list_fmt = (
            f'SELECT {self.ARRAY_TO_STR_STMT} FROM UNNEST(JSON_QUERY_ARRAY({{}})) '
            f'WITH OFFSET AS {self.FIRST_ELEMENT_SUBLIST_OFFSET} '
            f'WHERE {start_gen_series} <= {{}} - {self._shifting_n}'
        )
        agg_sublists_expr = bach.expression.Expression.construct(
            unnest_main_list_fmt,
            sub_list_gen_expr,
            self._list_series,
            self._list_series.json.get_array_length(),
        )

        # format the final agg list from array to string
        return self._list_series.copy_override(
            expression=bach.expression.Expression.construct(
                self.ARRAY_TO_STR_STMT,
                agg_sublists_expr,
            )
        )
    
    def count_sublist_occurrence(self, shift_type='right') -> bach.SeriesInt64:
        if shift_type != 'right':
            raise Exception(f'{shift_type} is not implemented. "right" shift is supported only.')
        
        shifted_list = self.r_shift()
        # convert to array, this way we can just convert array items as rows
        # by doing FROM {base_node}, {base_node.shifted_list}
        shifted_list = shifted_list.copy_override(
            expression=bach.expression.Expression.construct('JSON_QUERY_ARRAY({})', shifted_list)
        )
        shifted_list = shifted_list.materialize(node_name='shifted_list')

        from sql_models.model import CustomSqlModelBuilder
        from sql_models.util import quote_identifier

        sublist_identifier = quote_identifier(shifted_list.engine.dialect, name='sublist')
        shifted_list_identifier = quote_identifier(shifted_list.engine.dialect, name=shifted_list.name)
        column_stmt = (
            f"{sublist_identifier} as {shifted_list_identifier}, "
            f"count(1) as {quote_identifier(shifted_list.engine.dialect, 'count')}"
        )

        from sql_models.model import CustomSqlModelBuilder
        sql = (
            f"SELECT {column_stmt} FROM "
            f"{{{{current_node}}}}, {{{{current_node}}}}.{shifted_list_identifier} AS {sublist_identifier} "
            f" GROUP BY {sublist_identifier}"
        )
        model_builder = CustomSqlModelBuilder(sql=sql, name='sublist_counter')
        sql_model = model_builder(current_node=shifted_list.base_node)

        df = bach.DataFrame.from_model(
            engine=shifted_list.engine,
            model=sql_model,
            index=[shifted_list.name],
            all_dtypes={shifted_list.name: bach.SeriesJson.dtype, 'count': bach.SeriesInt64.dtype}
        )
        return df['count']



In [23]:
def bach_func_gram(list_series: bach.SeriesJson, n: int) -> bach.SeriesJson:
    engine = list_series.engine
    if is_postgres(engine):
        return PostgresListShifterOperation(list_series=list_series, shifting_n=n).r_shift()
    
    if is_bigquery(engine):
        return BigQueryListShifterOperation(list_series=list_series, shifting_n=n).r_shift()
    
    raise DatabaseNotSupportedException(engine)


In [24]:
import os
from sqlalchemy import create_engine
pg_engine = create_engine("postgresql://objectiv:@localhost:5432/objectiv")
bq_engine = create_engine(
    "bigquery://objectiv-production/snowplow",
    credentials_path=f"{os.getcwd().replace('notebooks', 'modelhub')}/.secrets/objectiv-production--bigquery-read-only.json",
)

In [25]:
import pandas as pd
import json

samples = [
    ['a', 'b', 'c', 'd', 'e', 'f'],
    [ 1 ,  2 ,  3 ,  4 ,  5 ,  6 , 7, 8, 9],
    [],
    ['a'],
]


In [26]:
sample_pdf = pd.DataFrame({'list_col': samples})
sample_pdf['list_col'] = sample_pdf['list_col'].apply(json.dumps)
sample_pdf

pg_sample_df = bach.DataFrame.from_pandas(engine=pg_engine, df=sample_pdf, convert_objects=True)
bq_sample_df = bach.DataFrame.from_pandas(engine=bq_engine, df=sample_pdf, convert_objects=True)

pg_sample_df['list_col'] = pg_sample_df['list_col'].astype('json')
bq_sample_df['list_col'] = bq_sample_df['list_col'].astype('json')

In [27]:
expected = {}

for n in range(0, 10):
    for idx, s in enumerate(samples):
        expected[f'{idx}_n_{n}'] = func_ngram(s, n)
    pg_sample_df[f'func_ngram_{n}'] = bach_func_gram(pg_sample_df['list_col'], n=n)
    bq_sample_df[f'func_ngram_{n}'] = bach_func_gram(bq_sample_df['list_col'], n=n)

    

In [31]:
pg_sample_df.to_pandas()

Unnamed: 0_level_0,list_col,func_ngram_0,func_ngram_1,func_ngram_2,func_ngram_3,func_ngram_4,func_ngram_5,func_ngram_6,func_ngram_7,func_ngram_8,func_ngram_9
_index_0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,"[a, b, c, d, e, f]","[[], [], [], [], [], []]","[[""a""], [""b""], [""c""], [""d""], [""e""], [""f""]]","[[""a"", ""b""], [""b"", ""c""], [""c"", ""d""], [""d"", ""e""...","[[""a"", ""b"", ""c""], [""b"", ""c"", ""d""], [""c"", ""d"", ...","[[""a"", ""b"", ""c"", ""d""], [""b"", ""c"", ""d"", ""e""], [...","[[""a"", ""b"", ""c"", ""d"", ""e""], [""b"", ""c"", ""d"", ""e...","[[""a"", ""b"", ""c"", ""d"", ""e"", ""f""]]",[],[],[]
1,"[1, 2, 3, 4, 5, 6, 7, 8, 9]","[[], [], [], [], [], [], [], [], []]","[[1], [2], [3], [4], [5], [6], [7], [8], [9]]","[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7...","[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [...","[[1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4,...","[[1, 2, 3, 4, 5], [2, 3, 4, 5, 6], [3, 4, 5, 6...","[[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4...","[[1, 2, 3, 4, 5, 6, 7], [2, 3, 4, 5, 6, 7, 8],...","[[1, 2, 3, 4, 5, 6, 7, 8], [2, 3, 4, 5, 6, 7, ...","[[1, 2, 3, 4, 5, 6, 7, 8, 9]]"
2,[],[],[],[],[],[],[],[],[],[],[]
3,[a],[[]],"[[""a""]]",[],[],[],[],[],[],[],[]


In [28]:
PostgresListShifterOperation(list_series=pg_sample_df['list_col'], shifting_n=3).count_sublist_occurrence().to_pandas()

list_col
[1, 2, 3]          1
[2, 3, 4]          1
[3, 4, 5]          1
[4, 5, 6]          1
[5, 6, 7]          1
[6, 7, 8]          1
[7, 8, 9]          1
["a", "b", "c"]    1
["b", "c", "d"]    1
["c", "d", "e"]    1
["d", "e", "f"]    1
Name: count, dtype: int64

In [32]:
bq_sample_df.to_pandas()

Unnamed: 0_level_0,list_col,func_ngram_0,func_ngram_1,func_ngram_2,func_ngram_3,func_ngram_4,func_ngram_5,func_ngram_6,func_ngram_7,func_ngram_8,func_ngram_9
_index_0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,"[a, b, c, d, e, f]","[[], [], [], [], [], []]","[[a], [b], [c], [d], [e], [f]]","[[a, b], [b, c], [c, d], [d, e], [e, f]]","[[a, b, c], [b, c, d], [c, d, e], [d, e, f]]","[[a, b, c, d], [b, c, d, e], [c, d, e, f]]","[[a, b, c, d, e], [b, c, d, e, f]]","[[a, b, c, d, e, f]]",[],[],[]
1,"[1, 2, 3, 4, 5, 6, 7, 8, 9]","[[], [], [], [], [], [], [], [], []]","[[1], [2], [3], [4], [5], [6], [7], [8], [9]]","[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7...","[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [...","[[1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4,...","[[1, 2, 3, 4, 5], [2, 3, 4, 5, 6], [3, 4, 5, 6...","[[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4...","[[1, 2, 3, 4, 5, 6, 7], [2, 3, 4, 5, 6, 7, 8],...","[[1, 2, 3, 4, 5, 6, 7, 8], [2, 3, 4, 5, 6, 7, ...","[[1, 2, 3, 4, 5, 6, 7, 8, 9]]"
2,[],[],[],[],[],[],[],[],[],[],[]
3,[a],[[]],[[a]],[],[],[],[],[],[],[],[]


In [30]:
BigQueryListShifterOperation(list_series=bq_sample_df['list_col'], shifting_n=3).count_sublist_occurrence().to_pandas()

list_col
[a, b, c]    1
[b, c, d]    1
[c, d, e]    1
[d, e, f]    1
[1, 2, 3]    1
[2, 3, 4]    1
[3, 4, 5]    1
[4, 5, 6]    1
[5, 6, 7]    1
[6, 7, 8]    1
[7, 8, 9]    1
Name: count, dtype: int64