In [1]:
from mitzu import *

sample_data_source = EventDataSource(
    connection=Connection(
        connection_type=ConnectionType.FILE,
        connection_params={"file_type": "parquet"},
        url="./samples/simple_big_data.snappy.parquet",
    ),
)
m = init_notebook_project(source=sample_data_source)

Found notebook context
Initializing project ...
Finished project initialization


In [9]:
(app_launched >> workspace_opened).config(time_group="week")



In [2]:
!pip install psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.3.tar.gz (380 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m380.6/380.6 KB[0m [31m886.7 kB/s[0m eta [36m0:00:00[0m1m801.3 kB/s[0m eta [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: psycopg2
  Building wheel for psycopg2 (setup.py) ... [?25ldone
[?25h  Created wheel for psycopg2: filename=psycopg2-2.9.3-cp39-cp39-macosx_12_0_x86_64.whl size=141877 sha256=20494da73c1a02ea591defac9e35dea864aa3957ab81c2aaf23e2fae47b515ce
  Stored in directory: /Users/istvanmeszaros/Library/Caches/pip/wheels/b3/a1/6e/5a0e26314b15eb96a36263b80529ce0d64382540ac7b9544a9
Successfully built psycopg2
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.3
You should consider upgrading via the '/Users/istvanmeszaros/Personal/mitzu-io/mitzu/.venv/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [None]:
!pip install mitzu[postgres]

In [3]:
from mitzu import *
import mitzu.common.model as M
from typing import Any, List
import mitzu.adapters.generic_adapter as GA

import pandas as pd
from mitzu.adapters.slqalchemy_adapter import SQLAlchemyAdapter
import sqlalchemy as SA
from sqlalchemy.orm import aliased

class PostgresAdapter(SQLAlchemyAdapter):
    def __init__(self, source: M.EventDataSource):
        super().__init__(source)

    def get_engine(self) -> Any:        
        if self._engine is not None:
            self._engine.dispose()
        self._engine = SA.create_engine('postgresql://postgres:gagsa0-feCjuz-hevpyg@db.nptqkpxplswvdlblxrup.supabase.co:5432/postgres')
        return self._engine


    def map_type(self, sa_type: Any) -> M.DataType:
        if isinstance(sa_type, SA.Integer):
            return M.DataType.NUMBER
        if isinstance(sa_type, SA.Float):
            return M.DataType.NUMBER
        if isinstance(sa_type, SA.Text):
            return M.DataType.STRING
        if isinstance(sa_type, SA.VARCHAR):
            return M.DataType.STRING
        if isinstance(sa_type, SA.DateTime):
            return M.DataType.DATETIME
        raise ValueError(f"{sa_type} is not supported.")

    def _get_datetime_interval(self, table_column: SA.Column, timewindow: M.TimeWindow) -> Any:
        return table_column + SA.text(f"interval '{timewindow}'")
    

    def _get_column_values_df(
        self, fields: List[M.Field], event_specific: bool
    ) -> pd.DataFrame:
        source = self.source
        table = self._table
        event_name_field = table.columns.get(self.source.event_name_field)
        any_event_field = SA.literal(M.ANY_EVENT_NAME).label(source.event_name_field)

        query = SA.select(
            group_by=SA.literal(1),
            columns=[event_name_field if event_specific else any_event_field] + [
                SA.case(
                    (
                        SA.func.count(table.columns.get(f._name).distinct())
                        < source.max_enum_cardinality,
                        SA.func.array_agg(
                            table.columns.get(f._name).distinct()
                        ),
                    ),
                    else_=SA.literal(None),
                ).label(f._name)
                for f in fields
                if f._name != source.event_name_field
            ]
                         
        )
        df = self.execute_query(query)

        return df.set_index(source.event_name_field).to_dict("index")

    def _get_segmentation_select(self, metric: M.SegmentationMetric) -> Any:
        table = aliased(self.get_table())
        columns = table.columns
        source = self.source

        evt_time_group = (
            self._get_date_trunc(
                table_column=columns.get(source.event_time_field),
                time_group=metric._time_group,
            )
            if metric._time_group != M.TimeGroup.TOTAL
            else SA.literal(None)
        )

        group_by = (
            columns.get(metric._group_by._field._name)
            if metric._group_by is not None
            else SA.literal(None)
        )

        return SA.select(
            columns=[
                evt_time_group.label("datetime"),
                group_by.label("group"),
                SA.func.count(columns.get(source.user_id_field).distinct()).label(
                    "unique_user_count"
                ),
                SA.func.count(columns.get(source.user_id_field)).label("event_count"),
            ],
            whereclause=(
                self._get_segment_where_clause(table, metric._segment)
                & self._get_timewindow_where_clause(table, metric)
            ),
            group_by=[evt_time_group, SA.literal(2)],
        )

    def _get_conversion_select(self, metric: M.ConversionMetric) -> Any:
        table = self.get_table()
        columns = table.columns
        source = self.source
        first_segment = metric._conversion._segments[0]
        other_segments = metric._conversion._segments[1:]
        user_id_col = columns.get(source.user_id_field)
        event_time_col = columns.get(source.event_time_field)
        time_group = metric._time_group

        if time_group != M.TimeGroup.TOTAL:
            evt_time_group = self._get_date_trunc(
                table_column=event_time_col,
                time_group=time_group,
            )
        else:
            evt_time_group = SA.literal(None)

        group_by = (
            columns.get(metric._group_by._field._name)
            if metric._group_by is not None
            else SA.literal(None)
        )

        steps = [table]
        other_selects = []
        joined_source = table
        for i, seg in enumerate(other_segments):
            prev_table = steps[i]
            prev_cols = prev_table.columns
            curr_table = self.get_table()
            curr_cols = curr_table.columns
            curr_used_id_col = curr_cols.get(source.user_id_field)

            steps.append(curr_table)

            other_selects.extend(
                [
                    SA.func.count(curr_used_id_col.distinct()).label(
                        self._fix_col_index(i + 2, GA.USER_COUNT_COL)
                    ),
                    SA.func.count(curr_used_id_col).label(
                        self._fix_col_index(i + 2, GA.EVENT_COUNT_COL)
                    ),
                ]
            )
            joined_source = joined_source.join(
                curr_table,
                (
                    (prev_cols.get(source.user_id_field) == curr_used_id_col)
                    & (
                        curr_cols.get(source.event_time_field)
                        > prev_cols.get(source.event_time_field)
                    )
                    & (
                        curr_cols.get(source.event_time_field)
                        <= self._get_datetime_interval(
                            columns.get(source.event_time_field), metric._conv_window
                        )
                    )
                    & self._get_segment_where_clause(curr_table, seg)
                ),
                isouter=True,
            )

        columns = [
            evt_time_group.label(GA.DATETIME_COL),
            group_by.label(GA.GROUP_COL),
            (
                SA.func.count(
                    steps[len(steps) - 1].columns.get(source.user_id_field).distinct()
                )
                * 1.0
                / SA.func.count(user_id_col.distinct())
            ).label(GA.CVR_COL),
            SA.func.count(user_id_col.distinct()).label(
                self._fix_col_index(1, GA.USER_COUNT_COL)
            ),
            SA.func.count(user_id_col).label(
                self._fix_col_index(1, GA.EVENT_COUNT_COL)
            ),
        ]

        columns.extend(other_selects)
        return SA.select(
            columns=columns,
            whereclause=(
                self._get_segment_where_clause(table, first_segment)
                & self._get_timewindow_where_clause(table, metric)
            ),
            group_by=[SA.literal(1), SA.literal(2)],
        ).select_from(joined_source)

sample_data_source = EventDataSource(
    table_name="event_fixed",
    user_id_field="session_id",
    event_time_field="created_at",
    event_name_field="event_value_fixed",
    event_specific_fields=['url'],
    connection=Connection(
        connection_type=ConnectionType.POSTGRES,
        connection_params={"file_type": "parquet"},
        url="/work/sample_user_event_data.parquet",
    ),
)

sample_data_source.adapter = PostgresAdapter(sample_data_source)

m = init_notebook_project(sample_data_source)



Found notebook context
Initializing project ...
Finished project initialization


In [7]:
(m.submit_url >> select_reason).print_sql()

SELECT date_trunc('DAY', event_fixed_1.created_at) as datetime,
       null as "group",
       (count(distinct event_fixed_2.session_id) * 1.0) / count(distinct event_fixed_1.session_id) as conversion_rate,
       count(distinct event_fixed_1.session_id) as unique_user_count_1,
       count(event_fixed_1.session_id) as event_count_1,
       count(distinct event_fixed_2.session_id) as unique_user_count_2,
       count(event_fixed_2.session_id) as event_count_2
FROM   event_fixed as event_fixed_1 left
    OUTER JOIN event_fixed as event_fixed_2
        ON event_fixed_1.session_id = event_fixed_2.session_id and
           event_fixed_2.created_at > event_fixed_1.created_at and
           event_fixed_2.created_at <= event_fixed_1.created_at + interval '1 day' and
           event_fixed_2.event_value_fixed = 'select_reason'
WHERE  event_fixed_1.event_value_fixed = 'submit_url'
   and event_fixed_1.created_at >= '2021-03-23 17:37:18.559168'
   and event_fixed_1.created_at <= '2022-03-23 17:3