# Exploring Window Functions with SQL and Pandas
Based on http://www.helenanderson.co.nz/sql-window-functions-part-1/

In [195]:
import numpy as np
import pandas as pd
np.random.seed(20190904)

In [196]:
import psycopg2
import psycopg2.extras



In [197]:
import os

# Generate sample dataset

In [198]:
N = 100_000
NB_YEARS = 1
START_DATE = '2018-01-01'

In [199]:
pd.Series(pd.to_datetime([1,2,3], origin='2018-01-01', unit='D')).dt.strftime('%Y')

0    2018
1    2018
2    2018
dtype: object

In [200]:
def sample_dataset(start_date=START_DATE, nb_years=NB_YEARS, size=N, precision=1):
    sale_date = pd.Series(
        pd.to_datetime(
            np.random.randint(0, 365 * nb_years, size=size), 
            unit='D', 
            origin=start_date
        )
    )
    ordermonth = sale_date.dt.strftime('%Y-%m')
    territoryid = pd.Series(np.random.randint(0, 10, size))
    subtotal = pd.Series(np.random.uniform(500, 2500, size=size)).round(precision)
    taxamt = (0.075 * subtotal).round(precision)
    freight = (0.025 * subtotal).round(precision)
    totaldue = subtotal + taxamt
    customerid = pd.Series(np.random.randint(1, size//10 + 1, size) + 2000)
    salesorderid = np.arange(4000, 4000 + size)
    df = pd.DataFrame({
        'salesorderid': salesorderid,
        'sale_date': sale_date,
        'ordermonth': ordermonth,
        'territoryid': territoryid,
        'subtotal': subtotal,
        'taxamt': taxamt,
        'freight': freight,
        'totaldue': totaldue,
        'customerid': customerid
    })
    # turn all floats into decimals 
    
    return df

In [201]:
dataset['subtotal'].round(2)

0        1299.37
1        1311.49
2         650.67
3        1112.70
4         798.64
          ...   
99995    2398.01
99996     943.49
99997    1793.18
99998    2211.79
99999    2449.33
Name: subtotal, Length: 100000, dtype: float64

In [202]:
dataset = sample_dataset()
dataset.head()

Unnamed: 0,salesorderid,sale_date,ordermonth,territoryid,subtotal,taxamt,freight,totaldue,customerid
0,4000,2018-10-15,2018-10,2,955.2,71.6,23.9,1026.8,2102
1,4001,2018-08-27,2018-08,5,834.3,62.6,20.9,896.9,8187
2,4002,2018-08-19,2018-08,6,1940.4,145.5,48.5,2085.9,3033
3,4003,2018-08-30,2018-08,9,1611.1,120.8,40.3,1731.9,8565
4,4004,2018-03-25,2018-03,3,579.3,43.4,14.5,622.7,7144


In [203]:
dataset.to_csv('sales.csv', index=False)

In [204]:
pwd

'/Users/sinayoks/dev/scratch-sql/sql'

In [205]:
!head sales.csv

salesorderid,sale_date,ordermonth,territoryid,subtotal,taxamt,freight,totaldue,customerid
4000,2018-10-15,2018-10,2,955.2,71.6,23.9,1026.8,2102
4001,2018-08-27,2018-08,5,834.3,62.6,20.9,896.9,8187
4002,2018-08-19,2018-08,6,1940.4,145.5,48.5,2085.9,3033
4003,2018-08-30,2018-08,9,1611.1,120.8,40.3,1731.8999999999999,8565
4004,2018-03-25,2018-03,3,579.3,43.4,14.5,622.6999999999999,7144
4005,2018-05-09,2018-05,8,1697.9,127.3,42.4,1825.2,4831
4006,2018-07-15,2018-07,8,747.5,56.1,18.7,803.6,11079
4007,2018-05-28,2018-05,4,1936.3,145.2,48.4,2081.5,3052
4008,2018-11-16,2018-11,5,2050.5,153.8,51.3,2204.3,6906


# create postgres database

In [206]:
def connect_db(dbname, user=None, **kwargs):
    user = user or 'postgres'
    con = psycopg2.connect(database=dbname, user=user, **kwargs)
    con.autocommit = True
    return con

In [207]:
def create_db(dbname='sales', drop=True):
    con = connect_db('postgres')
    cur = con.cursor()
    cur.execute(f'DROP DATABASE IF EXISTS {dbname};')
    cur.execute(f'DROP USER IF EXISTS {dbname};')
    cur.execute(f'CREATE DATABASE {dbname};')
    cur.execute(f'CREATE USER {dbname};')
    con.close()

def create_table(dbname='sales', filename='sales.csv'):
    """Import data from csv into postgres database.
    
    Uses psycopg2 and sql COPY. 
    Could also use pandas.to_sql but this removes dependency on pandas and is also more scaleable.
    """
    filename = os.path.abspath(filename)
    assert os.path.exists(filename), f'csv file {filname} not found'
    con = connect_db(dbname)
    try:
        with con.cursor() as cur:
            cur.execute(f'DROP TABLE IF EXISTS {dbname};')
            cur.execute(f"""
    CREATE TABLE {dbname} (
    salesorderid INTEGER UNIQUE NOT NULL,
    sale_date DATE NOT NULL,
    ordermonth VARCHAR (7) NOT NULL,
    territoryid FLOAT NOT NULL,
    subtotal FLOAT NOT NULL,
    taxamt FLOAT NOT NULL,
    freight FLOAT NOT NULL,
    totaldue FLOAT NOT NULL,
    customerid INTEGER NOT NULL
    );""")
        with con.cursor() as cur:
            cur.execute(
            f"COPY {dbname} FROM '{filename}' WITH (FORMAT csv, HEADER true);"
            )
            cur.execute(f'GRANT SELECT ON {dbname} TO {dbname};')
    finally:
        con.close()

def query(query, dbname='sales', columns=dataset.columns):
    con = connect_db(dbname, user=dbname)
    with con.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cur:
        cur.execute(query)
        return cur.fetchall()

def queryframe(*args, **kwargs):
    data = query(*args, **kwargs)
    return pd.DataFrame(data)
    
    

In [209]:
create_db()

In [210]:
create_table()

In [211]:
query('select * from sales limit 2')

[Record(salesorderid=4000, sale_date=datetime.date(2018, 10, 15), ordermonth='2018-10', territoryid=2.0, subtotal=955.2, taxamt=71.6, freight=23.9, totaldue=1026.8, customerid=2102),
 Record(salesorderid=4001, sale_date=datetime.date(2018, 8, 27), ordermonth='2018-08', territoryid=5.0, subtotal=834.3, taxamt=62.6, freight=20.9, totaldue=896.9, customerid=8187)]

In [212]:
queryframe('select * from sales limit 2')

Unnamed: 0,salesorderid,sale_date,ordermonth,territoryid,subtotal,taxamt,freight,totaldue,customerid
0,4000,2018-10-15,2018-10,2.0,955.2,71.6,23.9,1026.8,2102
1,4001,2018-08-27,2018-08,5.0,834.3,62.6,20.9,896.9,8187


# Running Aggregations
Running sum, min, max or average by day. 


In [213]:
def running_aggs_with_sql():
    running_agg_query = """
-- running total	
select
  sale_date,
  salesorderid,
  subtotal,
  sum(subtotal) 
  over(
  	partition by sale_date 
	order by salesorderid
  ) as total_sales,
  min(subtotal)
  over(
    partition by sale_date 
    order by salesorderid 
  ) min_sales, 
  avg(subtotal)
  over(
    partition by sale_date 
    order by salesorderid 
  ) avg_sales
from 
  sales
where 
  sale_date between 
'2018-01-01' and '2018-12-31'
order by sale_date;
    """
    res = queryframe(running_agg_query)
    return res

In [214]:
aggs_sql = running_aggs_with_sql()
aggs_sql

Unnamed: 0,sale_date,salesorderid,subtotal,total_sales,min_sales,avg_sales
0,2018-01-01,4065,2120.0,2120.0,2120.0,2120.000000
1,2018-01-01,4572,2285.0,4405.0,2120.0,2202.500000
2,2018-01-01,4625,573.7,4978.7,573.7,1659.566667
3,2018-01-01,5840,1947.8,6926.5,573.7,1731.625000
4,2018-01-01,5926,1490.7,8417.2,573.7,1683.440000
...,...,...,...,...,...,...
99995,2018-12-31,102632,1003.8,406016.8,504.9,1465.764621
99996,2018-12-31,102880,1990.2,408007.0,504.9,1467.651079
99997,2018-12-31,103118,650.7,408657.7,504.9,1464.722939
99998,2018-12-31,103504,1639.4,410297.1,504.9,1465.346786


In [215]:
aggs_pandas = dataset[['sale_date', 'salesorderid', 'subtotal']].assign(
    total_sales=dataset.groupby(['sale_date'])['subtotal'].cumsum(),
    min_sales=dataset.groupby(['sale_date'])['subtotal'].cummin(),
    avg_sales=dataset.sort_values(['sale_date', 'salesorderid']).groupby(['sale_date'])['subtotal'].expanding().mean().reset_index(0)['subtotal'],
).sort_values(
    ['sale_date', 'salesorderid']
).reset_index(
    drop=True
).astype({'sale_date':'str'})
aggs_pandas

Unnamed: 0,sale_date,salesorderid,subtotal,total_sales,min_sales,avg_sales
0,2018-01-01,4065,2120.0,2120.0,2120.0,2120.000000
1,2018-01-01,4572,2285.0,4405.0,2120.0,2202.500000
2,2018-01-01,4625,573.7,4978.7,573.7,1659.566667
3,2018-01-01,5840,1947.8,6926.5,573.7,1731.625000
4,2018-01-01,5926,1490.7,8417.2,573.7,1683.440000
...,...,...,...,...,...,...
99995,2018-12-31,102632,1003.8,406016.8,504.9,1465.764621
99996,2018-12-31,102880,1990.2,408007.0,504.9,1467.651079
99997,2018-12-31,103118,650.7,408657.7,504.9,1464.722939
99998,2018-12-31,103504,1639.4,410297.1,504.9,1465.346786


In [216]:
for col in ['total_sales', 'min_sales', 'avg_sales']:
    pd.util.testing.assert_almost_equal(
        aggs_sql[col], 
        aggs_pandas[col], 
        check_less_precise=False
    )

# Rankings
Rank rows by some column or some partition.  


In [257]:
def ranking_with_sql():
    sql_query = """
-- top sales in year by subtotal
-- can be used to identify the customers that put in 
-- the largest orders in hte previous year 
select
  sale_date,
  salesorderid,
  subtotal,
  rank() over(order by subtotal desc) as sales_rank,
  dense_rank() over(order by subtotal desc) as sales_dense_rank
from 
  sales
where 
  sale_date between 
'2018-01-01' and '2018-12-31'
order by sales_rank, sale_date, salesorderid;
    """
    res = queryframe(sql_query)
    return res

In [258]:
ranks_sql = ranking_with_sql()
ranks_sql

Unnamed: 0,sale_date,salesorderid,subtotal,sales_rank,sales_dense_rank
0,2018-01-21,91151,2500.0,1,1
1,2018-11-23,60595,2500.0,1,1
2,2018-05-01,29112,2499.9,3,2
3,2018-07-03,44417,2499.9,3,2
4,2018-08-27,101346,2499.9,3,2
...,...,...,...,...,...
99995,2018-08-28,71329,500.0,99994,19870
99996,2018-09-01,18954,500.0,99994,19870
99997,2018-09-07,6150,500.0,99994,19870
99998,2018-09-09,57375,500.0,99994,19870


In [232]:
# ranking with Pandas

In [267]:
def ranking_with_pandas():
    df = dataset[
        ['sale_date', 'salesorderid', 'subtotal']
    ].assign(**{
        'sales_rank': lambda x: x['subtotal'].rank(method='min', ascending=False).astype(int),
        'sales_dense_rank': lambda x: x['subtotal'].rank(method='dense', ascending=False).astype(int),
    }).sort_values(
        ['sales_rank', 'sale_date', 'salesorderid'], 
    ).reset_index(drop=True)
    return df

In [268]:
ranks_pandas = ranking_with_pandas()
ranks_pandas    

Unnamed: 0,sale_date,salesorderid,subtotal,sales_rank,sales_dense_rank
0,2018-01-21,91151,2500.0,1,1
1,2018-11-23,60595,2500.0,1,1
2,2018-05-01,29112,2499.9,3,2
3,2018-07-03,44417,2499.9,3,2
4,2018-08-27,101346,2499.9,3,2
...,...,...,...,...,...
99995,2018-08-28,71329,500.0,99994,19870
99996,2018-09-01,18954,500.0,99994,19870
99997,2018-09-07,6150,500.0,99994,19870
99998,2018-09-09,57375,500.0,99994,19870


In [269]:
for col in ['sales_rank', 'sales_dense_rank']:
    pd.util.testing.assert_almost_equal(
        ranks_sql[col], 
        ranks_pandas[col], 
        check_less_precise=False
    )