In [34]:
from sqlalchemy import create_engine
from sqlalchemy import text
import redshift_connector
import pandas as pd
import psycopg2
import yaml

### Subscription_price_changes

In [33]:
url = 'https://raw.githubusercontent.com/clrcrl/advanced-sql/main/subscription-price-changes/subscription_price_changes.csv'
supscription_price_changes = pd.read_csv(url)
supscription_price_changes.to_csv(r'supscription_price_changes.csv')
supscription_price_changes

Unnamed: 0,change_id,subscription_id,price,changed_at
0,1,1,50,2020-01-10
1,2,1,60,2020-01-15
2,3,2,40,2020-01-12


In [32]:
data = {
    'rebilling_id': [1, 2, 3], 
    'subscription_id': [1, 1, 3],
    'rebilled_at': ['2020-02-01', '2020-03-01','2020-02-01']
}  

rebillings = pd.DataFrame(data)
rebillings.to_csv(r'rebillings.csv')
rebillings

Unnamed: 0,rebilling_id,subscription_id,rebilled_at
0,1,1,2020-02-01
1,2,1,2020-03-01
2,3,3,2020-02-01


In [29]:
supscription_price_changes

Unnamed: 0,change_id,subscription_id,price,changed_at
0,1,1,50,2020-01-10
1,2,1,60,2020-01-15
2,3,2,40,2020-01-12


In [30]:
rebillings

Unnamed: 0,rebilling_id,subscription_id,rebilled_at
0,1,1,2020-02-01
1,2,1,2020-03-01
2,3,3,2020-02-01


In [31]:
res = pd.merge(supscription_price_changes, rebillings, how = 'outer', on = 'subscription_id', indicator = False)
res

Unnamed: 0,change_id,subscription_id,price,changed_at,rebilling_id,rebilled_at
0,1.0,1,50.0,2020-01-10,1.0,2020-02-01
1,1.0,1,50.0,2020-01-10,2.0,2020-03-01
2,2.0,1,60.0,2020-01-15,1.0,2020-02-01
3,2.0,1,60.0,2020-01-15,2.0,2020-03-01
4,3.0,2,40.0,2020-01-12,,
5,,3,,,3.0,2020-02-01


In [257]:
sql = '''
with subscriptions as (

  select * from spreadsheet_raw.subscription_price_changes

),

rebillings as (

  select * from spreadsheet_raw.rebillings

),

subscription_ranking as (

  select
    *,
    row_number() over(partition by subscription_id order by changed_at desc, rebilled_at) as ranking

  from subscriptions
  left join rebillings using (subscription_id)

)

select
  subscription_id,
  price as new_price,
  changed_at,
  rebilled_at as effective_at

from subscription_ranking
where ranking = 1
  and rebilled_at is not null
'''
data = pd.read_sql_query(text(sql), aon)
data

Unnamed: 0,subscription_id,new_price,changed_at,effective_at
0,1,60,2020-01-15,2020-02-01


### Payments

In [259]:
url = 'https://raw.githubusercontent.com/clrcrl/advanced-sql/main/apportioning-payments/payments.csv'
payments = pd.read_csv(url)
payments.to_csv(r'payments.csv')
payments[payments['task_id'] == 5]

Unnamed: 0,payment_id,task_id,payment_type,amount
10,11,5,inbound,20
11,12,5,inbound,40
12,13,5,inbound,30
13,14,5,payout,25
14,15,5,refund,65


In [318]:
sql = '''
with ranking_payments as (

  select
    task_id,
    payment_id as inbound_payment_id,
    
    case 
      when payment_type = 'inbound' then amount 
      else 0
    end as inbound_amount,
    
    case 
      when payment_type = 'payout' then amount 
      else 0
    end as payout_amount,
    
    case 
      when payment_type = 'refund' then amount 
      else 0
    end as refund_amount,
    
    row_number() over(partition by task_id order by payment_id) as ranking

  from spreadsheet_raw.payments

),

number_of_invoices as (

  select 
    task_id,
    
    sum(
      case 
        when payment_type = 'inbound' then 1 
        else 0 
      end
    ) as number_of_invoices
  
  from spreadsheet_raw.payments
  group by 1

),
 
raw_payments as (

  select * from ranking_payments
  left join number_of_invoices using (task_id)

),

--select * from raw_payments
--where number_of_invoices > 1
--order by task_id, ranking


multiple_invoices_totals as (

  select 
    task_id,
    sum(payout_amount) as total_payout,
    sum(refund_amount) as total_refund

  from raw_payments
  where number_of_invoices > 1
  group by 1

),

cte_payout_amount as (

  select
    task_id,
    inbound_payment_id,
    inbound_amount,

    case
      when ranking = 1 then inbound_amount
      else 0
    end as payout_amount_aux,

    case
      when ranking = 2 then total_payout - lag(payout_amount_aux) over(partition by task_id order by ranking)
      else payout_amount_aux
    end as payout_amount,

    ranking,
    total_refund

  from raw_payments
  inner join multiple_invoices_totals using (task_id)

),

cte_refund_amount as (

  select 
    *,
    case
      when ranking = 2 then inbound_amount - payout_amount
      else 0
    end as refund_amount_aux,
  
    case
      when ranking = 3 then total_refund - lag(refund_amount_aux) over(partition by task_id order by ranking)
      else refund_amount_aux
    end as refund_amount
  
  from cte_payout_amount

),

multiple_invoices as (

  select 
    task_id, 
    inbound_payment_id, 
    inbound_amount, 
    payout_amount, 
    refund_amount 

  from cte_refund_amount
  where inbound_amount > 0

)

select * from multiple_invoices
'''
raw_payments = pd.read_sql_query(text(sql), aon)
raw_payments

Unnamed: 0,task_id,inbound_payment_id,inbound_amount,payout_amount,refund_amount
0,5,13,30,0,30
1,5,12,40,5,35
2,5,11,20,20,0
3,3,6,20,20,0
4,3,5,40,40,0


In [322]:
sql = '''
with ranking_payments as (

  select
    task_id,
    payment_id as inbound_payment_id,
    
    case 
      when payment_type = 'inbound' then amount 
      else 0
    end as inbound_amount,
    
    case 
      when payment_type = 'payout' then amount 
      else 0
    end as payout_amount,
    
    case 
      when payment_type = 'refund' then amount 
      else 0
    end as refund_amount,
    
    row_number() over(partition by task_id order by payment_id) as ranking

  from spreadsheet_raw.payments

),

number_of_invoices as (

  select 
    task_id,
    
    sum(
      case 
        when payment_type = 'inbound' then 1 
        else 0 
      end
    ) as number_of_invoices
  
  from spreadsheet_raw.payments
  group by 1

),
 
raw_payments as (

  select 
    *,
    case 
      when number_of_invoices > 1 then True
      else False
    end as in_escrow

  from ranking_payments
  left join number_of_invoices using (task_id)

),

simple_invoices_agg as (

  select 
    task_id,
    sum(inbound_amount) as inbound_amount,
    sum(payout_amount) as payout_amount,
    sum(refund_amount) as refund_amount
  
  from raw_payments
  where number_of_invoices = 1
  group by 1

),

simple_invoices as (

  select
    simple_invoices_agg.task_id,
    raw_payments.inbound_payment_id,
    raw_payments.in_escrow,
    simple_invoices_agg.inbound_amount,
    
    case
      when simple_invoices_agg.payout_amount = 0
        and simple_invoices_agg.refund_amount = 0
        and simple_invoices_agg.inbound_amount != 0 then simple_invoices_agg.inbound_amount
      else simple_invoices_agg.payout_amount
    end as payout_amount,
    
    simple_invoices_agg.refund_amount

  from raw_payments
  inner join simple_invoices_agg using (task_id)
  where ranking <= number_of_invoices

),

multiple_invoices_totals as (

  select 
    task_id,
    sum(payout_amount) as total_payout,
    sum(refund_amount) as total_refund

  from raw_payments
  where in_escrow
  group by 1

),

cte_payout_amount as (

  select
    task_id,
    inbound_payment_id,
    inbound_amount,
    in_escrow,

    case
      when ranking = 1 then inbound_amount
      else 0
    end as payout_amount_aux,

    case
      when ranking = 2 then total_payout - lag(payout_amount_aux) over(partition by task_id order by ranking)
      else payout_amount_aux
    end as payout_amount,

    ranking,
    total_refund

  from raw_payments
  inner join multiple_invoices_totals using (task_id)

),

cte_refund_amount as (

  select 
    *,
    case
      when ranking = 2 then inbound_amount - payout_amount
      else 0
    end as refund_amount_aux,
  
    case
      when ranking = 3 then total_refund - lag(refund_amount_aux) over(partition by task_id order by ranking)
      else refund_amount_aux
    end as refund_amount
  
  from cte_payout_amount

),

multiple_invoices as (

  select 
    task_id, 
    inbound_payment_id,
    in_escrow,
    inbound_amount, 
    payout_amount, 
    refund_amount 

  from cte_refund_amount
  where inbound_amount > 0

),

final as (

  select * from simple_invoices

  union all

  select * from multiple_invoices
)

select * from final
order by 1
'''
data = pd.read_sql_query(text(sql), aon)
data

Unnamed: 0,task_id,inbound_payment_id,in_escrow,inbound_amount,payout_amount,refund_amount
0,1,1,False,50,50,0
1,2,3,False,30,0,30
2,3,6,True,20,20,0
3,3,5,True,40,40,0
4,4,8,False,20,15,5
5,5,13,True,30,0,30
6,5,12,True,40,5,35
7,5,11,True,20,20,0
8,6,16,False,15,0,15


In [293]:
url = 'https://raw.githubusercontent.com/clrcrl/advanced-sql/main/apportioning-payments/inbound_payment_states.csv'
result = pd.read_csv(url)

In [294]:
res = pd.merge(data, result, how = 'outer', on = ['task_id','inbound_payment_id'], indicator = True)

In [295]:
def mismatch_detector(merge, original, pk):
    
    results = []
    
    for row in merge.index:
        for col in original.columns:
            if col in pk:
                continue
            elif merge.loc[row, col + '_x'] != merge.loc[row, col + '_y']:
                results.append((row, col))
    
    mismatches = set([result[0] for result in results])
    return mismatches
    

In [296]:
mismatch_detector(res, data, ['task_id','inbound_payment_id'])

set()

In [314]:
def ordered_columns(original, pk):
    ordered_columns = list(pk)

    for column in original.columns:
        if column in pk:
            continue
        else:
            ordered_columns.append(column + '_x')
            ordered_columns.append(column + '_y')
    
    return(ordered_columns)

In [317]:
res = res.reindex(columns = ordered_columns(result, ['task_id', 'inbound_payment_id']))
res

Unnamed: 0,task_id,inbound_payment_id,inbound_amount_x,inbound_amount_y,payout_amount_x,payout_amount_y,refund_amount_x,refund_amount_y
0,1,1,50,50,50,50,0,0
1,2,3,30,30,0,0,30,30
2,3,6,20,20,20,20,0,0
3,3,5,40,40,40,40,0,0
4,4,8,20,20,15,15,5,5
5,5,13,30,30,0,0,30,30
6,5,12,40,40,5,5,35,35
7,5,11,20,20,20,20,0,0
