# Data Exploration and Pipeline

The database contains `Users`, `Orders`, and `Providers` tables. 

Partners are the companies who sell surplus items on the marketplace.
A cohort consists of customers who made their first order within the same month (M0). 
M1 retention is the share of customers who have made at least one purchase one month after their first purchase month.

Explore the data with Sql to investigate the following:

- The top 10 partners by sales
- Customers’ favourite partner segments (default offer types). 
- What is the M1 retention for any given customer cohort. 

## Connect to the database 
The connection to the sqlite datbase is achieved through the `jupysql` python library. This allows querying the database from jupyter notebook. Alternative tools are SQLMagic

In [1]:
import sqlite3
con = sqlite3.connect("./data/mock_resq.db") 

%load_ext sql
%config SqlMagic.displaylimit = None

%sql sqlite:///data/mock_resq.db

### How many user are in the database

In [2]:
%%sql
SELECT COUNT(*) AS user_count
FROM users

user_count
358366


### How many countries do the users come from

In [3]:
%%sql
SELECT COUNT(DISTINCT country) AS country_count
FROM users

country_count
111


### Which top 10 countries having the most users

In [4]:
%%sql 
SELECT country, COUNT(*) AS user_count
FROM users
GROUP BY country
ORDER BY user_count DESC
LIMIT 10

country,user_count
FI,339573
SE,8961
EE,6505
DE,512
AX,434
ES,209
FR,200
GB,190
AT,182
NL,152


#### Inference

- There are **358,366** users in the database from **111** different countries.
- The top 10 countries where the users come from are Finland (FI), Sweden (SE), Estonia (EE), Germany (DE), Åland (AX), Spain (ES), France (FR), Great Britain (GB), Austria (AT), and the Netherlands (NL).
- Ninety-four percent of users (94%, i.e., **339,573** users) are from Finland, followed by Sweden with **8,961** users.

### How many Providers are in the database

In [5]:
%%sql
SELECT COUNT(id) AS provider_count
FROM providers

provider_count
4337


### Do providers have multiple offer types?

In [6]:
%%sql
SELECT id AS provider, COUNT(defaultoffertype) AS offer_type_count
FROM providers
GROUP BY id
ORDER BY offer_type_count DESC
LIMIT 5

provider,offer_type_count
9222930112446389796,1
9217379655006460479,1
9215371507696178188,1
9214584721622525154,1
9212615296993900753,1


### How many providers per country

In [7]:
%%sql 
SELECT country, COUNT(*) AS provider_count
FROM providers
GROUP BY country
ORDER BY provider_count DESC

country,provider_count
fin,4095
est,154
swe,84
pol,2
deu,2


#### Inference

- There are **4,337** providers in the database, with each provider having exactly one offer type (meal, snack, dessert, ingredients, flowers, etc.).
- The providers are from Finland (FIN), Estonia (EST), Sweden (SWE), Poland (POL), and Germany (DEU).
- Over **94% (4,095)** of the providers are from Finland, followed by Estonia with **154** providers.
- Poland and Germany each have two (2) providers.

`Questions:`

1. Are **partners** the same as **providers**? If yes, why is there a difference in nomenclature between the database and the instructions?
   
2. The **Users** table uses two-letter [ISO-3166 country codes](https://en.wikipedia.org/wiki/List_of_ISO_3166_country_codes) (e.g., FI, SE, ES), while the **Providers** table uses three-letter country codes (e.g., FIN, SWE, POL). Is there a particular reason for this?

3. All providers have exactly one offer type in the database, but the nomenclature includes terms like **defaultOfferType** and **partner segment**. Do providers have multiple offer types? Could the nomenclature be harmonized?


## `Now answering the Analyst's questions`

## Top 10 partners by sales

In [8]:
%%sql 
SELECT providerid, SUM(sales) AS total_sales
FROM orders
GROUP BY providerid
ORDER BY total_sales DESC
LIMIT 10

providerId,total_sales
7198110370745783236,10917800
8312310143652755348,7467750
8097235958083241788,2383700
3865474760205653333,2223400
8084884958338058541,1868140
4734853230275691017,1702100
5305286819167536850,1690500
1066258454353124935,1568100
7642201963087705313,1472000
4014236829817167297,1457000


How many currencies are there in the database?

In [9]:
%%sql 
SELECT DISTINCT currency
FROM orders

currency
eur
sek


There are only **Euro (EUR)** and **Swedish Krone (SEK)** currencies in the database.

`Question:`  
Is the conversion rate to the base currency (most likely Euro) saved during the order payment? This will be useful for transforming sales figures into a common currency for consistent reporting.

How about **top 10 partners by sales in the respective sales currencies** ?

In [10]:
%%sql 
SELECT providerid, currency, SUM(sales) AS total_sales
FROM orders
GROUP BY providerid, currency
ORDER BY total_sales DESC
LIMIT 10

providerId,currency,total_sales
7198110370745783236,sek,10917800
8312310143652755348,sek,7467750
8097235958083241788,sek,2383700
3865474760205653333,sek,2223400
8084884958338058541,eur,1868140
4734853230275691017,sek,1702100
5305286819167536850,sek,1690500
1066258454353124935,sek,1568100
7642201963087705313,sek,1472000
4014236829817167297,sek,1457000


The same providers are in the top 10

Converting SEK (Swedish Krone) to Euro with the assumption that a provider sells in only one currency

In [11]:
%%sql
-- with the assumption that a provider sells in only one currency
SELECT providerid, 
       SUM(CASE 
               WHEN currency = 'sek' THEN sales * 0.088
               ELSE sales 
           END) AS total_sales_euro
FROM orders
GROUP BY providerid
ORDER BY total_sales_euro DESC
LIMIT 10

providerId,total_sales_euro
8084884958338058541,1868140.0
2329385876751836948,1113610.0
7781996256202689245,1089737.0
3314909292283785248,987080.0
7198110370745783236,960766.4
864133707167331065,844570.0
644313756025543931,784250.0
8671992772042032524,724210.0
251523446593852416,720007.0
5617133624212883839,707950.0


Converting SEK (Swedish Krone) to Euro with the assumption that a provider sells in multiple currencies

In [12]:
%%sql
-- With the assumption that a provider sells in multiple currencies

WITH provider_sales_currency AS (
    SELECT providerid, currency, SUM(sales) AS total_sales
    FROM orders
    GROUP BY providerid, currency
)

SELECT 
    providerid, 
    SUM(
        CASE  
            WHEN currency = 'sek' THEN total_sales * 0.088 -- assumed conversion rate
            ELSE total_sales
        END
    ) AS total_sales_euro
FROM provider_sales_currency
GROUP BY providerid
ORDER BY total_sales_euro DESC
LIMIT 10

providerid,total_sales_euro
8084884958338058541,1868140.0
2329385876751836948,1113610.0
7781996256202689245,1089737.0
3314909292283785248,987080.0
7198110370745783236,960766.4
864133707167331065,844570.0
644313756025543931,784250.0
8671992772042032524,724210.0
251523446593852416,720007.0
5617133624212883839,707950.0


## Customers’ favourite partner segments (default offer types).

In [13]:
%%sql 
SELECT P.defaultoffertype AS partner_segment, 
       SUM(O.quantity) AS sum_order_quantity
FROM orders O
JOIN providers P ON O.providerid = P.id
GROUP BY P.defaultoffertype
ORDER BY sum_order_quantity DESC
LIMIT 1

partner_segment,sum_order_quantity
meal,305254


The Customers' favourite partner segment is **meal** with **305254** orders. 

## What is the M1 retention for any given customer cohort. 

Checking table definition and order table column types

In [14]:
%%sql 
SELECT * 
FROM sqlite_master 
WHERE type = 'table'

type,name,tbl_name,rootpage,sql
table,orders,orders,2,"CREATE TABLE orders (id, createdAt, userId, quantity, refunded, currency, sales, providerId)"
table,providers,providers,4420,"CREATE TABLE providers (id, defaultOfferType, country, registeredDate)"
table,users,users,4465,"CREATE TABLE users (id, country, registeredDate)"


In [15]:
%%sql 
-- pragma table_info('orders')
SELECT name, type 
FROM pragma_table_info('orders')

name,type
id,
createdAt,
userId,
quantity,
refunded,
currency,
sales,
providerId,


No column types ? This is quit strange. The Date fields needs to be converted for date and aggregation functions

Now creating cohorts and computing the M1 retention share
A cohort consists of customers who made their first order within the same month (M0). 
M1 retention is the share of customers who have made at least one purchase one month after their first purchase month

In [16]:
%%sql

WITH FIRSTORDERDATES AS (
        SELECT 
            USERID, 
            strftime('%Y-%m-01', MIN(DATE(CREATEDAT)))  AS COHORT_DATE
        FROM ORDERS
        GROUP BY USERID
    ),
    AT_LEAST_ONE_ORDER_PER_MONTH AS (
        SELECT 
            USERID, 
            MAX(CREATEDAT) AS PURCHASE_DATE
        FROM ORDERS
        GROUP BY USERID, strftime('%Y-%m-01', CREATEDAT)
    ),
    RETENTION_TABLE AS (
        SELECT 
            O.USERID, 
            FOD.COHORT_DATE,
            O.PURCHASE_DATE, 
            ((strftime('%Y', O.PURCHASE_DATE) - strftime('%Y', FOD.COHORT_DATE)) * 12) + 
            (strftime('%m', O.PURCHASE_DATE) - strftime('%m', FOD.COHORT_DATE)) AS MONTHS_SINCE_FIRST_PURCHASE
        FROM AT_LEAST_ONE_ORDER_PER_MONTH O
        LEFT JOIN FIRSTORDERDATES FOD ON O.USERID = FOD.USERID
        ORDER BY FOD.COHORT_DATE, O.USERID, O.PURCHASE_DATE
    )

    SELECT 
    (
        CAST(
        (
            SELECT COUNT(*) 
            FROM RETENTION_TABLE 
            WHERE MONTHS_SINCE_FIRST_PURCHASE = 1
        ) AS REAL) / 
        CAST(
            (
                SELECT COUNT(DISTINCT(USERID)) FROM RETENTION_TABLE
            ) AS REAL)
    ) * 100 AS M1_SHARE_PERCENTAGE

M1_SHARE_PERCENTAGE
14.587526783975068


Provider retention

In [17]:
%%sql

WITH FIRSTORDERDATES AS (
        SELECT 
            providerId, 
            strftime('%Y-%m-01', MIN(DATE(CREATEDAT)))  AS COHORT_DATE
        FROM ORDERS
        GROUP BY providerId
    ),
    AT_LEAST_ONE_ORDER_PER_MONTH AS (
        SELECT 
            providerId, 
            MAX(CREATEDAT) AS PURCHASE_DATE
        FROM ORDERS
        GROUP BY providerId, strftime('%Y-%m-01', CREATEDAT)
    ),
    RETENTION_TABLE AS (
        SELECT 
            O.providerId, 
            FOD.COHORT_DATE,
            O.PURCHASE_DATE, 
            ((strftime('%Y', O.PURCHASE_DATE) - strftime('%Y', FOD.COHORT_DATE)) * 12) + 
            (strftime('%m', O.PURCHASE_DATE) - strftime('%m', FOD.COHORT_DATE)) AS MONTHS_SINCE_FIRST_PURCHASE
        FROM AT_LEAST_ONE_ORDER_PER_MONTH O
        LEFT JOIN FIRSTORDERDATES FOD ON O.providerId = FOD.providerId
        ORDER BY FOD.COHORT_DATE, O.providerId, O.PURCHASE_DATE
    )

    SELECT 
    (
        CAST(
        (
            SELECT COUNT(*) 
            FROM RETENTION_TABLE 
            WHERE MONTHS_SINCE_FIRST_PURCHASE = 1
        ) AS REAL) / 
        CAST(
            (
                SELECT COUNT(DISTINCT(providerId)) FROM RETENTION_TABLE
            ) AS REAL)
    ) * 100 AS M1_SHARE_PERCENTAGE

M1_SHARE_PERCENTAGE
77.21661054994388


`Answer:` 

The M1 retention rate is **14.59%**. This means that **14.59%** of customers (**17,973** out of **123,203** customers) made at least one purchase one month after their first purchase month.

## Building the data pipeline

First, install **dbt-core** and **dbt-sqlite** with:

```bash
pip install --quiet dbt-core dbt-sqlite
```

Make sure that the version of [dbt-sqlite](https://github.com/codeforkjeff/dbt-sqlite) aligns with that of dbt-core for the setup to work properly.

If the project is set up with the exported virtual environment file **environment.yml**, there is no need to reinstall these dependencies. Check that they are installed with:

```bash
pip show dbt-core dbt-sqlite
```

The `dbt init` command was used to initialize the dbt project in the subdirectory **ResqDbPipeline**. Therefore, all dbt commands should be run in that subdirectory.

### Connecting profile for Sqlite database

In [18]:
import os

dbt_project = "./ResqDbPipeline"
os.makedirs(dbt_project, exist_ok=True)

In [19]:
%%writefile {dbt_project}/profiles.yml
ResqDbPipeline:
  target: dev
  outputs:
    dev:
      type: sqlite
      threads: 1
      database: 'database'
      schema: 'main'
      schemas_and_paths:
        main: '../data/mock_resq.db'
      schema_directory: '../data'

Overwriting ./ResqDbPipeline/profiles.yml


### Testing sqlite database connection

In [20]:
!cd {dbt_project} && dbt debug

[0m10:06:41  Running with dbt=1.5.0
[0m10:06:41  dbt version: 1.5.0
[0m10:06:41  python version: 3.12.6
[0m10:06:41  python path: /opt/anaconda3/envs/resq/bin/python
[0m10:06:41  os info: macOS-14.6-arm64-arm-64bit
[0m10:06:41  Using profiles.yml file at /Users/asare/source/ResqCLV/ResqDbPipeline/profiles.yml
[0m10:06:41  Using dbt_project.yml file at /Users/asare/source/ResqCLV/ResqDbPipeline/dbt_project.yml
[0m10:06:41  Configuration:
[0m10:06:41    profiles.yml file [[32mOK found and valid[0m]
[0m10:06:41    dbt_project.yml file [[32mOK found and valid[0m]
[0m10:06:41  Required dependencies:
[0m10:06:41   - git [[32mOK found[0m]

[0m10:06:41  Connection:
[0m10:06:41    database: database
[0m10:06:41    schema: main
[0m10:06:41    schemas_and_paths: {'main': '../data/mock_resq.db'}
[0m10:06:41    schema_directory: ../data
[0m10:06:41    Connection test: [[32mOK connection ok[0m]

[0m10:06:41  [32mAll checks passed![0m


In [21]:
!cd {dbt_project} && dbt run --full-refresh

[0m10:06:43  Running with dbt=1.5.0
[0m10:06:43  Unable to do partial parsing because saved manifest not found. Starting full parse.
[0m10:06:43  Encountered an error:
Compilation Error
  dbt found two macros named "materialization_table_default" in the project
  "dbt".
   To fix this error, rename or remove one of the following macros:
      - macros/materializations/models/table/table.sql
      - macros/materializations/models/table.sql


This error seems strange and resembles bugs reported in the following threads:

- [dbt compilation error: materialization table default](https://discourse.getdbt.com/t/dbt-compilation-error-materialization-table-default/16375)
- [Materialization table default macro occurs](https://discourse.getdbt.com/t/materialization-table-default-macro-occurs/14973/2)

### Next Action
Proceeding with good old Python scripts since this is a **`simple pipeline`**. There is only one data source, which is already in the SQLite database, so there is no extraction step. However, there will be a step that transforms the data with queries and joins, and then creates Views.

### Top partners view script

In [22]:
%%writefile data/sql/top_partners.sql
DROP VIEW IF EXISTS TOP_PARTNERS;

CREATE VIEW TOP_PARTNERS 
AS 
    SELECT 
        PROVIDERID, 
        CURRENCY,
        SUM(SALES) AS TOTAL_SALES
    FROM ORDERS
    GROUP BY PROVIDERID, CURRENCY
    ORDER BY TOTAL_SALES DESC;

Overwriting data/sql/top_partners.sql


### Parter segment order quantity view script

In [23]:
%%writefile data/sql/partner_segment_order_quantity.sql

DROP VIEW IF EXISTS PARTNET_SEGMENT_ORDER_QUANTIY;

CREATE VIEW PARTNET_SEGMENT_ORDER_QUANTIY
AS 
    SELECT 
        P.DEFAULTOFFERTYPE AS PARTNER_SEGMENT, 
        SUM(O.QUANTITY) AS SUM_ORDER_QUANTITY
    FROM ORDERS O
    JOIN PROVIDERS P ON O.PROVIDERID = P.ID
    GROUP BY P.DEFAULTOFFERTYPE
    ORDER BY SUM_ORDER_QUANTITY DESC;

Overwriting data/sql/partner_segment_order_quantity.sql


### Customer cohort view script

In [24]:
%%writefile data/sql/customer_cohort.sql
DROP VIEW IF EXISTS CUSTOMER_COHORT;

CREATE VIEW CUSTOMER_COHORT
AS
    WITH FIRSTORDERDATES AS (
        SELECT 
            USERID, 
            strftime('%Y-%m-01', MIN(DATE(CREATEDAT)))  AS COHORT_DATE
        FROM ORDERS
        GROUP BY USERID
    ),
    AT_LEAST_ONE_ORDER_PER_MONTH AS (
        SELECT 
            USERID, 
            MAX(CREATEDAT) AS PURCHASE_DATE
        FROM ORDERS
        GROUP BY USERID, strftime('%Y-%m-01', CREATEDAT)
    )

    SELECT 
        O.USERID, 
        FOD.COHORT_DATE,
        O.PURCHASE_DATE, 
        ((strftime('%Y', O.PURCHASE_DATE) - strftime('%Y', FOD.COHORT_DATE)) * 12) + 
        (strftime('%m', O.PURCHASE_DATE) - strftime('%m', FOD.COHORT_DATE)) AS MONTHS_SINCE_FIRST_PURCHASE
    FROM AT_LEAST_ONE_ORDER_PER_MONTH O
    LEFT JOIN FIRSTORDERDATES FOD ON O.USERID = FOD.USERID
    ORDER BY FOD.COHORT_DATE, O.USERID, O.PURCHASE_DATE;


Overwriting data/sql/customer_cohort.sql


### Provider cohort view script

In [25]:
%%writefile data/sql/provider_cohort.sql
DROP VIEW IF EXISTS PROVIDER_COHORT;

CREATE VIEW PROVIDER_COHORT
AS
    WITH FIRSTORDERDATES AS (
            SELECT 
                providerId, 
                strftime('%Y-%m-01', MIN(DATE(CREATEDAT)))  AS COHORT_DATE
            FROM ORDERS
            GROUP BY providerId
        ),
        AT_LEAST_ONE_ORDER_PER_MONTH AS (
            SELECT 
                providerId, 
                MAX(CREATEDAT) AS PURCHASE_DATE
            FROM ORDERS
            GROUP BY providerId, strftime('%Y-%m-01', CREATEDAT)
        )
       
        SELECT 
            O.providerId, 
            FOD.COHORT_DATE,
            O.PURCHASE_DATE, 
            ((strftime('%Y', O.PURCHASE_DATE) - strftime('%Y', FOD.COHORT_DATE)) * 12) + 
            (strftime('%m', O.PURCHASE_DATE) - strftime('%m', FOD.COHORT_DATE)) AS MONTHS_SINCE_FIRST_PURCHASE
        FROM AT_LEAST_ONE_ORDER_PER_MONTH O
        LEFT JOIN FIRSTORDERDATES FOD ON O.providerId = FOD.providerId
        ORDER BY FOD.COHORT_DATE, O.providerId, O.PURCHASE_DATE
       


Overwriting data/sql/provider_cohort.sql


### Frequency, lifespan, average sales view script

In [26]:
%%writefile data/sql/lifespan_frequency_sales_value.sql
DROP VIEW IF EXISTS LIFESPAN_FREQUENCY_SALES;

CREATE VIEW LIFESPAN_FREQUENCY_SALES
AS
    WITH FIRSTORDERDATES AS (
            SELECT 
                USERID, 
                strftime('%Y-%m-01', MIN(DATE(CREATEDAT)))  AS COHORT_DATE
            FROM ORDERS
            GROUP BY USERID
        )

    SELECT 
        O.USERID,
        FOD.COHORT_DATE,
        strftime('%Y-%m-01', MAX(DATE(CREATEDAT)))  AS purchase_month,
        ((strftime('%Y', MAX(DATE(CREATEDAT))) - strftime('%Y', FOD.COHORT_DATE)) * 12) + (strftime('%m', MAX(DATE(CREATEDAT))) - strftime('%m', FOD.COHORT_DATE)) as lifespan,
        COUNT(O.SALES) AS frequency,
        O.currency,
        AVG(O.SALES) AS average_sales
    FROM ORDERS O
    LEFT JOIN FIRSTORDERDATES FOD ON O.USERID = FOD.USERID
    GROUP BY O.USERID, O.currency

Overwriting data/sql/lifespan_frequency_sales_value.sql


### The Data Pipeline script

In [27]:
#%%writefile resq_pipeline/data_pipeline.py
import os
import sqlite3
import pandas as pd
from datetime import datetime

class DataPipeline(object):

    def __init__(self, db_and_scripts_path="./data"):
        super().__init__()
        self._log("Initializing pipeline")

        self.db_path = os.path.join(db_and_scripts_path, "mock_resq.db")
        self._script_path = os.path.join(db_and_scripts_path, "sql")
        os.makedirs(os.path.join(db_and_scripts_path, "sql"), exist_ok=True)

        self._create_connection()
        self._create_customer_cohorts_view()
        self._create_provider_cohorts_view()
        self._create_top_partners_by_sales_view()
        self._create_partner_segment_order_quantity_view()
        self._create_lifespan_frequency_sales_view()
        self._close_connection()

        self._log("Done")

    def top_n_partners(self, top_n=5):
        """
        Return top n partners by sales
        """

        if not isinstance(int(top_n), int):
            raise ValueError("top n must be in int")

        sql = "SELECT * FROM TOP_PARTNERS LIMIT ?"
        return self.execute_query(query=sql, param=(top_n,))

    def customers_top_partner_segment(self, top_n=None):
        """
        Returns the order quantity of the top n partner segment.
        If top_n is not specified, it returns the favourite partner segment
        """

        if (top_n is not None) and (not isinstance(top_n, int)):
            raise ValueError("top n must be in int or None")

        top_n = 1 if top_n is None else top_n

        sql = "SELECT * FROM PARTNET_SEGMENT_ORDER_QUANTIY LIMIT ?"

        return self.execute_query(query=sql, param=(top_n,))

    def m_customer_retention_rate(self, month=1):
        """
        Returns M customer retention rate. Defauts to MI retention rate
        """

        if not isinstance(month, int):
            raise ValueError("month must be in int")

        sql = """
                SELECT 
                (
                    CAST(
                    (
                        SELECT COUNT(*) 
                        FROM CUSTOMER_COHORT 
                        WHERE MONTHS_SINCE_FIRST_PURCHASE = ?
                    ) AS REAL) / 
                    CAST(
                        (
                            SELECT COUNT(DISTINCT(USERID)) FROM CUSTOMER_COHORT
                        ) AS REAL)
                ) * 100 AS M_RETENTION
              """
        return self.execute_query(query=sql, param=(month,))
    
    def m_provider_retention_rate(self, month=1):
        """
        Returns M provider retention rate. Defauts to MI retention rate
        """

        if not isinstance(month, int):
            raise ValueError("month must be in int")

        sql = """
                SELECT 
                (
                    CAST(
                    (
                        SELECT COUNT(*) 
                        FROM PROVIDER_COHORT 
                        WHERE MONTHS_SINCE_FIRST_PURCHASE = ?
                    ) AS REAL) / 
                    CAST(
                        (
                            SELECT COUNT(DISTINCT(providerId)) FROM PROVIDER_COHORT
                        ) AS REAL)
                ) * 100 AS M_RETENTION
              """
        return self.execute_query(query=sql, param=(month,))
    
    def m_customer_retention_rate_by_cohort(self, month=1, cohort=None):
        """
        Returns M customer retention rate by cohort. Defauts to MI retention rate of entire customers

        Keyword arguments:

        :cohort: -- the cohort in the form yyyy-mm-01
        """

        if not isinstance(month, int):
            raise ValueError("month must be an int")

        if not cohort:
            return self.m_customer_retention_rate(month=month)

        error_message = "cohort must be a date in the format yyyy-mm-01"
        try:
            cohort_date = datetime.fromisoformat(cohort)

            if cohort_date.day != 1:
                raise ValueError(error_message)
        except:
            raise ValueError(error_message)

        sql = """
                SELECT 
                (
                    CAST(
                    (
                        SELECT COUNT(*) 
                        FROM CUSTOMER_COHORT 
                        WHERE MONTHS_SINCE_FIRST_PURCHASE = ? AND COHORT_DATE = ?
                    ) AS REAL) / 
                    CAST(
                        (
                            SELECT COUNT(DISTINCT(USERID)) 
                            FROM CUSTOMER_COHORT
                            WHERE COHORT_DATE = ?
                        ) AS REAL)
                ) * 100 AS M_RETENTION
              """
        return self.execute_query(
            query=sql,
            param=(
                month,
                cohort,
                cohort,
            ),
        )

    def m_provider_retention_rate_by_cohort(self, month=1, cohort=None):
        """
        Returns M provider retention rate by cohort. Defauts to MI retention rate of entire customers

        Keyword arguments:

        :cohort: -- the cohort in the form yyyy-mm-01
        """

        if not isinstance(month, int):
            raise ValueError("month must be an int")

        if not cohort:
            return self.m_provider_retention_rate(month=month)

        error_message = "cohort must be a date in the format yyyy-mm-01"
        try:
            cohort_date = datetime.fromisoformat(cohort)

            if cohort_date.day != 1:
                raise ValueError(error_message)
        except:
            raise ValueError(error_message)

        sql = """
                SELECT 
                (
                    CAST(
                    (
                        SELECT COUNT(*) 
                        FROM PROVIDER_COHORT 
                        WHERE MONTHS_SINCE_FIRST_PURCHASE = ? AND COHORT_DATE = ?
                    ) AS REAL) / 
                    CAST(
                        (
                            SELECT COUNT(DISTINCT(providerId)) 
                            FROM PROVIDER_COHORT
                            WHERE COHORT_DATE = ?
                        ) AS REAL)
                ) * 100 AS M_RETENTION
              """
        return self.execute_query(
            query=sql,
            param=(
                month,
                cohort,
                cohort,
            ),
        )
    
    def execute_query(self, query: str, param : tuple | None= None):
        """
        Execute SQL query with parameters
        """

        self._create_connection()
        with self.conn as connection:
            return pd.read_sql_query(sql=query, con=connection, params=param)

    def _create_top_partners_by_sales_view(self):
        """
        Creates top partners by sales view
        """

        self._log("Creating top partners by sales view in database")
        script_path = os.path.join(self._script_path,"top_partners.sql")
        self._execute_script(script_path)

    def _create_partner_segment_order_quantity_view(self):
        """
        Creates partner segments and order quantity view
        """

        self._log("Creating partner segments and order quantity view in database")
        script_path = os.path.join(self._script_path,"partner_segment_order_quantity.sql")
        self._execute_script(script_path)

    def _create_customer_cohorts_view(self):
        """
        Creates customer cohorts view
        """

        self._log("Creating customer cohorts view in database")
        script_path = os.path.join(self._script_path,"customer_cohort.sql")
        self._execute_script(script_path)
    
    def _create_provider_cohorts_view(self):
        """
        Creates provider cohorts view
        """

        self._log("Creating provider cohorts view in database")
        script_path = os.path.join(self._script_path,"provider_cohort.sql")
        self._execute_script(script_path)

    def _create_lifespan_frequency_sales_view(self):
        """
        Creates Lifespan, Frequency, sales value view
        """

        self._log("Creating Lifespan, Frequency, sales value view in database")
        script_path = os.path.join(self._script_path,"lifespan_frequency_sales_value.sql")
        self._execute_script(script_path)

    def _create_connection(self):
        """
        Creates a database connection
        """

        self.conn = sqlite3.connect(self.db_path)

    def _close_connection(self):
        """
        Closes existing database connection.
        """

        if self.conn:
            self.conn.commit()
            self.conn.close()

    def _execute_script(self, script_path):
        """
        Execute SQL script from file
        """

        with open(script_path, "r") as file:
            sql_script = file.read()

        cursor = self.conn.cursor()
        cursor.executescript(sql_script)

    def _log(self, text):
        print(datetime.now().strftime("%H:%M:%S"), text)

### Executing the pipeline

In [28]:
pipeline = DataPipeline()

13:06:44 Initializing pipeline
13:06:44 Creating customer cohorts view in database
13:06:44 Creating provider cohorts view in database
13:06:44 Creating top partners by sales view in database
13:06:44 Creating partner segments and order quantity view in database
13:06:44 Creating Lifespan, Frequency, sales value view in database
13:06:44 Done


The Analyst can now use the pipeline to create presentation tables, as shown below


Favourite partner segment:

In [29]:
pipeline.customers_top_partner_segment()

Unnamed: 0,PARTNER_SEGMENT,SUM_ORDER_QUANTITY
0,meal,305254


Top 3 favourite partner segment:

In [30]:
pipeline.customers_top_partner_segment(top_n = 3)

Unnamed: 0,PARTNER_SEGMENT,SUM_ORDER_QUANTITY
0,meal,305254
1,snack,63182
2,grocery-bag,29884


Top N partners by sales:

In [31]:
pipeline.top_n_partners(top_n=10)

Unnamed: 0,providerId,currency,TOTAL_SALES
0,7198110370745783236,sek,10917800
1,8312310143652755348,sek,7467750
2,8097235958083241788,sek,2383700
3,3865474760205653333,sek,2223400
4,8084884958338058541,eur,1868140
5,4734853230275691017,sek,1702100
6,5305286819167536850,sek,1690500
7,1066258454353124935,sek,1568100
8,7642201963087705313,sek,1472000
9,4014236829817167297,sek,1457000


M1 customer retention rate:

In [32]:
pipeline.m_customer_retention_rate()

Unnamed: 0,M_RETENTION
0,14.587527


M2 customer retention rate:

In [33]:
pipeline.m_customer_retention_rate(month=2)

Unnamed: 0,M_RETENTION
0,12.001656


M1 customer retention rate per cohort:

In [34]:
pipeline.m_customer_retention_rate_by_cohort(month=1, cohort="2022-09-01")

Unnamed: 0,M_RETENTION
0,29.511369


M1 provider retention

In [35]:
pipeline.m_provider_retention_rate(month=1)

Unnamed: 0,M_RETENTION
0,77.216611


M2 provider retention rate:

In [36]:
pipeline.m_provider_retention_rate(month=2)

Unnamed: 0,M_RETENTION
0,71.380471


M1 provider retention rate per cohort:

In [37]:
pipeline.m_provider_retention_rate_by_cohort(month=1, cohort="2022-09-01")

Unnamed: 0,M_RETENTION
0,93.338323
