## Access to the DB

### SQLite

In [1]:
import os
import sqlalchemy as sa
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
import pandas as pd

# the following example will depend on a sqlite db
sqlfile="../churn.db"
engine = create_engine(f"sqlite:///{sqlfile}")
session = sessionmaker(bind=engine)()

from churnmodels.schema import Observation,ActiveWeek, ActivePeriod, Account, Metric, MetricName, Subscription, Event, EventType


### PostGres

In [1]:
import os
import sqlalchemy as sa
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import pandas as pd

from churnmodels.schema import get_schema, get_db_uri

options = {"user": "postgres",
           "pass": "password",
           "dbname": "churn",
           "schema": "biznet1"
           #"host" : "localhost" # ...if needed
           #"port" : "5432" # ...if needed
           }

if True:
    # tables is a (dynamical) module containg Wrapper classes for our data base
    T=get_schema(options)

    # connect to the database
    db_uri=get_db_uri(options, "postgres") # "postgres" names the dialect we are using
    engine=create_engine(db_uri)
    engine.dialect.has_schema(engine, options["schema"]) 
    session = sessionmaker(bind=engine)()

    # ..how to bring all tables in T to the global namespace
    for tbl in T.__dict__.keys():
        if not tbl[0].isupper():
            continue
        exec(f"{tbl} = T.{tbl}")


In [2]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
from sqlalchemy import func, or_
import pandas as pd

def days_between(d1, d2):
    d1 = datetime.strptime(d1, "%Y-%m-%d")
    d2 = datetime.strptime(d2, "%Y-%m-%d")
    return abs((d2 - d1).days)

#d_start_date = "2020-01-01"
#d_end_date = "2020-03-01"

d_start_date = "2020-03-01"
d_end_date = "2020-04-01"


In [3]:
from sqlalchemy import func
if session.bind.dialect.name == "sqlite":
    # sqlite problematic when computing days
    to_days = lambda some_date: func.julianday(some_date)
else:
    # dummy func because of sqlite
    to_days = lambda some_date: some_date


## ongoing active accounts (§4.1)

In [4]:
"""
with RECURSIVE active_period_params as    
(
    select interval '%gap_interval'  as allowed_gap,
    '%to_yyyy-mm-dd'::date as calc_date
),
active as  
(
	select distinct account_id, min(start_date) as start_date    
	from subscription inner join active_period_params 
on start_date <= calc_date    
		and (end_date > calc_date or end_date is null)
	group by account_id

	UNION

	select s.account_id, s.start_date  
	from subscription s 
	cross join active_period_params 
	inner join active e on s.account_id=e.account_id  
		and s.start_date < e.start_date  
		and s.end_date >= (e.start_date-allowed_gap)::date  

) 
insert into active_period (account_id, start_date, churn_date)     
select account_id, min(start_date) as start_date, NULL::date as churn_date  
from active
group by account_id, churn_date  
"""

from sqlalchemy import distinct, literal, and_, Float, cast, select


# we set the allowed gap to one week
allowed_gap = 7
# ...the calculation date to
d_calc_date="2020-05-10"

active = session.query(
        Subscription.account_id, 
        func.min(Subscription.start_date).label("start_date"))\
    .filter(Subscription.start_date <= d_calc_date,
        or_(Subscription.end_date > d_calc_date, Subscription.end_date == None))\
    .group_by(Subscription.account_id)\
    .cte(recursive=True, name="active")

active_L = session.query(
        Subscription.account_id, 
        Subscription.start_date)\
    .join(active, Subscription.account_id == active.c.account_id)\
    .filter(and_(Subscription.start_date < active.c.start_date,
        Subscription.end_date >= (active.c.start_date-allowed_gap)))

union_all = active.union_all(active_L)
subq=session.query(union_all).subquery()


qr=session.query(
        subq.c.account_id, 
        func.min(subq.c.start_date).label("start_date"),
        func.DATE(None).label("churn_date")
        )\
    .group_by(subq.c.account_id, "churn_date")\
    .order_by("account_id")\
    #.filter(subq.c.account_id<10)
    
#account_tenure = pd.read_sql(qr.statement, engine)
#print(account_tenure)
#print(qr.statement)


# delete if existing already
if session.bind.dialect.name == "postgresql":
    tablename=ActivePeriod.__table__.fullname
    engine.execute(sa.sql.text(f"TRUNCATE TABLE {tablename}").execution_options(autocommit=True))
else:
    session.commit()
    old_metrics=session.query(ActivePeriod)
    old_metrics.delete()
    session.commit()

if True:

    new_active_period_insert=qr.cte("new_active_period_insert")
    select_stm=select([
            new_active_period_insert.c.account_id, 
            new_active_period_insert.c.start_date,
            new_active_period_insert.c.churn_date])
    target_columns=['account_id', 'start_date', 'churn_date']
    session.execute(ActivePeriod.__table__.insert().from_select(target_columns, select_stm))
    session.commit()


## churned periods (§4.2)

In [5]:
"""
with RECURSIVE active_period_params as    
(
	select INTERVAL '%gap_interval' as allowed_gap,
	       '%to_yyyy-mm-dd'::date as observe_end,
	       '%from_yyyy-mm-dd'::date as observe_start
),
end_dates as    
(
	select distinct account_id, start_date, end_date, 
(end_date +  allowed_gap)::date as extension_max
	from subscription inner join active_period_params 
	on end_date between observe_start and observe_end    
), 
resignups as     
(
	select distinct e.account_id, e.end_date   
	from end_dates e inner join subscription s on e.account_id = s.account_id
		and s.start_date <= e.extension_max
		and (s.end_date > e.end_date or s.end_date is null)      
),
churns as    
(
	select e.account_id, e.start_date, e.end_date as churn_date    
	from end_dates e left outer join resignups r  
	on e.account_id = r.account_id    
		and e.end_date = r.end_date
	where r.end_date is null    

	UNION

	select s.account_id, s.start_date, e.churn_date    
	from subscription s 
	cross join active_period_params
	inner join churns e on s.account_id=e.account_id
		and s.start_date < e.start_date
		and s.end_date >= (e.start_date- allowed_gap)::date
) 
insert into active_period (account_id, start_date, churn_date)    
select account_id, min(start_date) as start_date, churn_date  
from churns
group by account_id, churn_date
"""
from sqlalchemy import and_, select
from sqlalchemy.orm import scoped_session, sessionmaker

#Session = scoped_session(sessionmaker(engine))
#Session.remove()

# we set the allowed gap to two weeks
allowed_gap = 14
d_observe_start = "2020-02-09"
d_observe_end = "2020-05-10"

end_dates=session.query(
        Subscription.account_id,
        Subscription.start_date,
        Subscription.end_date,
        func.DATE(to_days(Subscription.end_date) +  allowed_gap).label("extension_max") 
        )\
    .filter(Subscription.end_date.between(d_observe_start, d_observe_end))\
    .cte("end_dates")

resignups=session.query(
        end_dates.c.account_id,
        end_dates.c.end_date
        )\
    .join(Subscription, end_dates.c.account_id==Subscription.account_id)\
    .filter( Subscription.start_date <= end_dates.c.extension_max,
        or_(Subscription.end_date > end_dates.c.end_date, Subscription.end_date == None))\
    .cte("resignups")

churns = session.query(
        end_dates.c.account_id, 
        end_dates.c.start_date, 
        end_dates.c.end_date.label("churn_date"))\
    .join(resignups, and_(
            resignups.c.account_id == end_dates.c.account_id,
            resignups.c.end_date == end_dates.c.end_date
            ), isouter=True)\
    .filter(resignups.c.end_date==None)\
    .cte(recursive=True, name="churns")

churns_L = session.query(
        Subscription.account_id, 
        Subscription.start_date, 
        churns.c.churn_date)\
    .join(churns, and_(
            Subscription.account_id == churns.c.account_id,
            Subscription.start_date < churns.c.start_date,
            Subscription.end_date >=  func.DATE( to_days(churns.c.start_date) - allowed_gap)
            ))\

churns_top = churns.union_all(churns_L)
subq=session.query(churns_top.c.account_id, func.min(churns_top.c.start_date).label("start_date"), churns_top.c.churn_date)\
    .group_by(churns_top.c.account_id, churns_top.c.churn_date)\
    .order_by(churns_top.c.account_id, churns_top.c.churn_date)

#print(subq.statement)
#ddf=pd.read_sql(subq.statement, engine)
#print(ddf)

# delete if existing already
if True:
    session.commit()
    old_metrics=session.query(ActivePeriod).filter(ActivePeriod.churn_date != None)
    old_metrics.delete()
    session.commit()


new_active_period_insert=subq.cte("new_active_period_insert")
select_stm=select([
        new_active_period_insert.c.account_id, 
        new_active_period_insert.c.start_date,
        new_active_period_insert.c.churn_date])
target_columns=['account_id', 'start_date', 'churn_date']
session.execute(ActivePeriod.__table__.insert().from_select(target_columns, select_stm))
session.commit()



## active event weeks (§4.3)

In [6]:
"""
WITH
   periods as (    
	select i::timestamp as period_start, i::timestamp + '7 day'::interval as period_end 
	from generate_series('%from_yyyy-mm-dd', '%to_yyyy-mm-dd', '7 day'::interval) i
)
insert into active_week (account_id, start_date, end_date)
select account_id, 
period_start::date,     
period_end::date
from event inner join periods on event_time>=period_start    
	and event_time < period_end     
group by account_id, period_start, period_end    

"""
from sqlalchemy import func, and_, select
from sqlalchemy import MetaData, Table, Column, Date, Integer
from churnmodels.helpers import make_day_interval

freq = 7
d_period_start = "2020-02-09"
d_period_end = "2020-05-10"


# making the Date Intervals available in the data base
# example
freq="7D" # week has 7 days
periods=1 # 4 weeks back in time
days_df=make_day_interval(d_period_start, d_period_end, periods, freq)

meta=MetaData(bind=engine)
TmpPeriodsVec = Table('tmp_periods_vec', meta, 
                   Column('id', Integer, primary_key=True, autoincrement=True),
                   Column('start_date', Date), Column('end_date', Date))
if TmpPeriodsVec.exists():
    TmpPeriodsVec.drop()
TmpPeriodsVec.create()
# storing a temporary table to the data base with pandas
days_df.to_sql("tmp_periods_vec",con=engine, if_exists='append', index=False)
session.commit()

periods=session.query(
        TmpPeriodsVec.c.start_date.label("period_start"),
        TmpPeriodsVec.c.end_date.label("period_end")
    )\
    .cte("periods")

qr=session.query(
        Event.account_id,
        periods.c.period_start, periods.c.period_end,
        )\
    .join(Event, and_(Event.event_time>=periods.c.period_start, 
            Event.event_time<periods.c.period_end))\
    .group_by(Event.account_id, periods.c.period_start, periods.c.period_end)

#qr=session.query(periods)
#print(qr.statement)
#ddf=pd.read_sql(qr.statement, engine)
#print(ddf)

new_active_period_insert=qr.cte("new_active_period_insert")
select_stm=select([
        new_active_period_insert.c.account_id, 
        new_active_period_insert.c.period_start,
        new_active_period_insert.c.period_end])
target_columns=['account_id', 'start_date', 'end_date']
session.execute(ActiveWeek.__table__.insert().from_select(target_columns, select_stm))
session.commit()


## observation dates (§4.4)

In [28]:
"""
with RECURSIVE observation_params as    
(    
	select interval '%obs_interval' as obs_interval,
	       interval '%lead_time'  as lead_time,
	       '%from_yyyy-mm-dd'::date as obs_start,
	       '%to_yyyy-mm-dd'::date as obs_end
),observations as (    
	select  account_id,
	    start_date,
	    1 as obs_count,
	    (start_date+obs_interval-lead_time)::date as obs_date,
	    case 
	         when churn_date >= (start_date +   obs_interval-lead_time)::date 
		      and churn_date <  (start_date + 2*obs_interval-lead_time)::date
				then true 
		    else false 
		end as is_churn    
	from active_period inner join observation_params
	on (churn_date > (obs_start+obs_interval-lead_time)::date   
        or churn_date is null)

	UNION    

	SELECT  o.account_id,
	    o.start_date,
		 obs_count+1 as obs_count,
	    (o.start_date+(obs_count+1)*obs_interval-lead_time)::date as obs_date,
		case 
	        when churn_date >= (o.start_date + (obs_count+1)*obs_interval-lead_time)::date
		      and churn_date < (o.start_date + (obs_count+2)*obs_interval-lead_time)::date
				then true 
			else false 
		end as is_churn     
	from observations o inner join observation_params
	on   (o.start_date+(obs_count+1)*obs_interval-lead_time)::date <= obs_end
	inner join active_period s on s.account_id=o.account_id    
	and  (o.start_date+(obs_count+1)*obs_interval-lead_time)::date >= s.start_date
	and ((o.start_date+(obs_count+1)*obs_interval-lead_time)::date < s.churn_date or churn_date is null)
) 
insert into observation (account_id, observation_date, is_churn)
select distinct account_id, obs_date, is_churn
from observations
inner join observation_params on obs_date between obs_start and obs_end
"""
from sqlalchemy import cast, or_, literal, and_, case, select
obs_interval = 28
lead_time = 7
d_obs_start = "2020-02-09"
d_obs_end = "2020-05-10"
obs_start=func.DATE(d_obs_start)
obs_end=func.DATE(d_obs_end)

observation = session.query(
        ActivePeriod.account_id.label("account_id"),
        ActivePeriod.start_date,
        literal(1).label("obs_count"),
        (func.DATE(to_days(ActivePeriod.start_date) + obs_interval - lead_time)).label("obs_date"),
        case([(and_(
                ActivePeriod.churn_date >= 
                    func.DATE(to_days(ActivePeriod.start_date)+obs_interval-lead_time),
                ActivePeriod.churn_date < 
                    func.DATE(to_days(ActivePeriod.start_date)+(2*obs_interval)-lead_time)
            )
            ,literal(True))
            ], else_=literal(False)).label("is_churn")
        )\
    .filter(
        or_(
            ActivePeriod.churn_date > func.DATE(to_days(obs_start)+obs_interval-lead_time),
            ActivePeriod.churn_date == None)
        )\
    .cte(recursive=True, name="observation")

observation_L = session.query(
        observation.c.account_id.label("account_id"),
        observation.c.start_date.label("start_date"),
        (observation.c.obs_count + 1).label("obs_count"),
        (func.DATE(to_days(observation.c.start_date)
                +(observation.c.obs_count + 1)*obs_interval
                -lead_time)).label("obs_date"),
        case([(and_(
                ActivePeriod.churn_date >= 
                    func.DATE(to_days(observation.c.start_date)+ (observation.c.obs_count + 1)*obs_interval-lead_time),
                ActivePeriod.churn_date < 
                    func.DATE(to_days(observation.c.start_date)+ (observation.c.obs_count + 2)*obs_interval-lead_time)
            )
            ,literal(True))
            ], else_=literal(False)).label("is_churn")
        )\
    .join(ActivePeriod, ActivePeriod.account_id == observation.c.account_id)\
    .filter(and_(
            func.DATE(to_days(observation.c.start_date)+(observation.c.obs_count+1)*obs_interval-lead_time) <= obs_end,
            func.DATE(to_days(observation.c.start_date)+(observation.c.obs_count+1)*obs_interval-lead_time) >= ActivePeriod.start_date,
            or_(
            func.DATE(to_days(observation.c.start_date)+(observation.c.obs_count+1)*obs_interval-lead_time) < ActivePeriod.churn_date,
            ActivePeriod.churn_date == None))
        )\
    
union_all = observation.union_all(observation_L)
subq=session.query(union_all)
subq = subq.filter(union_all.c.obs_date.between(obs_start, obs_end))
subq = subq.subquery()
qr=session.query(subq).distinct()#.limit(300000)

# debug: looking at the SQL pretty printed
if False:
    import sqlparse
    text1=str(qr.statement.compile(engine, compile_kwargs={"literal_binds": True}))
    text2=sqlparse.format(text1, reindent=True, keyword_case='upper')
    print(text2)

#ddf=pd.read_sql(qr.statement, engine)
#print(ddf)

# delete if existing already
if True:
    session.commit()
    old_metrics=session.query(Observation)
    old_metrics.delete()
    session.commit()

new_active_period_insert=qr.cte("new_active_period_insert")
select_stm=select([
        new_active_period_insert.c.account_id, 
        new_active_period_insert.c.obs_date,
        new_active_period_insert.c.is_churn])
target_columns=['account_id', 'observation_date', 'is_churn']
session.execute(Observation.__table__.insert().from_select(target_columns, select_stm))
session.commit()


## dataset (§4.5)

In [30]:
"""
with observation_params as     
(
    select  interval '%metric_interval' as metric_period,
    '%from_yyyy-mm-dd'::timestamp as obs_start,
    '%to_yyyy-mm-dd'::timestamp as obs_end
)
select m.account_id, o.observation_date, is_churn,
sum(case when metric_name_id=0 then metric_value else 0 end) as like_per_month,
sum(case when metric_name_id=1 then metric_value else 0 end) as newfriend_per_month,
sum(case when metric_name_id=2 then metric_value else 0 end) as post_per_month,
sum(case when metric_name_id=3 then metric_value else 0 end) as adview_per_month,
sum(case when metric_name_id=4 then metric_value else 0 end) as dislike_per_month,
sum(case when metric_name_id=5 then metric_value else 0 end) as unfriend_per_month,
sum(case when metric_name_id=6 then metric_value else 0 end) as message_per_month,
sum(case when metric_name_id=7 then metric_value else 0 end) as reply_per_month,
sum(case when metric_name_id=8 then metric_value else 0 end) as account_tenure
from metric m inner join observation_params
on metric_time between obs_start and obs_end    
inner join observation o on m.account_id = o.account_id
    and m.metric_time > (o.observation_date - metric_period)::timestamp    
    and m.metric_time <= o.observation_date::timestamp
group by m.account_id, metric_time, observation_date, is_churn    
order by observation_date,m.account_id

"""
from sqlalchemy import case, func, literal

metric_period=7
d_obs_start = "2020-02-09"
d_end_end = "2020-05-10"


fields=[
    Metric.account_id,
    Observation.observation_date,
    Observation.is_churn
]
targets={}
df_metricnames=pd.read_sql(session.query(MetricName).statement,engine)
for index, row in df_metricnames.iterrows():
    newfield=func.sum(case([
        (Metric.metric_name_id == row.metric_name_id, Metric.metric_value)
        ], else_=0)).label(row.metric_name)
    fields.append(newfield)
    
qr=session.query(*fields)\
    .join(Observation, Metric.account_id==Observation.account_id)\
    .filter(
        Metric.metric_time> func.DATE(to_days(Observation.observation_date)-metric_period), 
        Metric.metric_time<= Observation.observation_date)\
    .group_by(Metric.account_id, Metric.metric_time,
              Observation.observation_date, Observation.is_churn)\
    .order_by(Observation.observation_date, Metric.account_id)

# debug: looking at the SQL pretty printed
if False:
    import sqlparse
    text1=str(qr.statement.compile(engine, compile_kwargs={"literal_binds": True}))
    text2=sqlparse.format(text1, reindent=True, keyword_case='upper')
    print(text2)

ddf=pd.read_sql(qr.statement, engine)
print(ddf)


       account_id observation_date  is_churn  post_per_month  \
0              31       2020-03-02     False             4.0   
1              56       2020-03-02     False             7.0   
2              74       2020-03-02     False            10.0   
3              94       2020-03-02     False             0.0   
4             154       2020-03-02     False           124.0   
...           ...              ...       ...             ...   
12670       11899       2020-04-05     False             0.0   
12671       11928       2020-04-05     False             6.0   
12672       12003       2020-04-05     False            15.0   
12673       12014       2020-04-05     False             4.0   
12674       12070       2020-04-05      True             5.0   

       newfriend_per_month  like_per_month  adview_per_month  \
0                      1.0            10.0              12.0   
1                      3.0            39.0              26.0   
2                      4.0            2

## current customers (§4.6)

In [31]:
"""
with metric_date as
(
    select  max(metric_time) as last_metric_time from metric
)
select m.account_id, d.last_metric_time,
sum(case when metric_name_id=0 then metric_value else 0 end) as like_per_month,
sum(case when metric_name_id=1 then metric_value else 0 end) as newfriend_per_month,
sum(case when metric_name_id=2 then metric_value else 0 end) as post_per_month,
sum(case when metric_name_id=3 then metric_value else 0 end) as adview_feed_per_month,
sum(case when metric_name_id=4 then metric_value else 0 end) as dislike_per_month,
sum(case when metric_name_id=5 then metric_value else 0 end) as unfriend_per_month,
sum(case when metric_name_id=6 then metric_value else 0 end) as message_per_month,
sum(case when metric_name_id=7 then metric_value else 0 end) as reply_per_month,
sum(case when metric_name_id=8 then metric_value else 0 end) as account_tenure
from metric m inner join metric_date d on m.metric_time = d.last_metric_time
inner join subscription s on m.account_id=s.account_id
where s.start_date <= d.last_metric_time
and (s.end_date >= d.last_metric_time or s.end_date is null)
group by m.account_id, d.last_metric_time
order by m.account_id

"""
from sqlalchemy import case, func, literal, or_, Text

last_metric_time=session.query(func.DATE(func.max(Metric.metric_time))).one()[0] or 0
#print(last_metric_time)


fields=[
    Metric.account_id,
    func.DATE(last_metric_time).label("last_metric_time"),
]
targets={}
df_metricnames=pd.read_sql(session.query(MetricName).statement,engine)
for index, row in df_metricnames.iterrows():
    newfield=func.sum(case([
        (Metric.metric_name_id == row.metric_name_id, Metric.metric_value)
        ], else_=0)).label(row.metric_name)
    fields.append(newfield)
    
qr=session.query(*fields)\
    .join(Subscription, Metric.account_id==Subscription.account_id)\
    .filter(
        func.DATE(Metric.metric_time) == last_metric_time, 
        Subscription.start_date <= last_metric_time, 
        or_(Subscription.end_date >= last_metric_time, Subscription.end_date==None)
        )\
    .group_by(Metric.account_id, "last_metric_time")\
    .order_by(Metric.account_id)

#print(qr.statement)

ddf=pd.read_sql(qr.statement, engine)
print(ddf)



       account_id last_metric_time  post_per_month  newfriend_per_month  \
0               1       2020-03-29             3.0                  1.0   
1               4       2020-03-29            23.0                  3.0   
2               5       2020-03-29            36.0                  8.0   
3               7       2020-03-29            14.0                  3.0   
4               8       2020-03-29            11.0                  5.0   
...           ...              ...             ...                  ...   
10547       12096       2020-03-29            73.0                  2.0   
10548       12097       2020-03-29            12.0                  2.0   
10549       12098       2020-03-29             6.0                  5.0   
10550       12099       2020-03-29            49.0                 13.0   
10551       12100       2020-03-29           282.0                 19.0   

       like_per_month  adview_per_month  dislike_per_month  \
0                61.0               8