In [1]:
import os
import time
import pandas as pd
from collections import defaultdict
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base

In [2]:
SALES_TABLE_NAME = os.getenv('SALES_TABLE_NAME', 'rossman_sales')
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
DB_CONNECTION_URL = os.getenv('DB_CONNECTION_URL', f'postgresql://spark_user:SuperSecurePwdHere@postgres:{POSTGRES_PORT}/spark_pg_db')

In [3]:
def subtract_date(baseline_date, this_date):
    base = datetime.strptime(baseline_date, '%Y-%m-%d')
    current = datetime.strptime(this_date, '%Y-%m-%d')
    return (base - current).days

In [4]:
def date_from_baseline_back(baseline, n_days):
    return (baseline - timedelta(days=n_days)).strftime('%Y-%m-%d')

## Insert latest 5 months data to postgres

In [5]:
df = pd.read_csv('datasets/rossmann-store-sales/train_exclude_last_10d.csv')

  df = pd.read_csv('datasets/rossmann-store-sales/train_exclude_last_10d.csv')


In [6]:
ori_cols_order = df.columns

In [9]:
# get today with only Y-m-d
now = datetime.now()
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today - timedelta(days=1)

# get latest 5 months data
df['Month'] = df['Date'].apply(lambda x: x[:x.rfind('-')])
df_sort = df.sort_values('Date', ascending=True)
last_months = df_sort['Month'].unique()[-5:]
last_months_df = df_sort[df_sort['Month'].isin(last_months)]

# convert to relative to time.now()
latest_day = last_months_df.iloc[-1]['Date']
last_months_df['days_from_latest'] = last_months_df['Date'].apply(lambda x: subtract_date(latest_day, x))
last_months_df['Relative date'] = last_months_df['days_from_latest'].apply(lambda x: date_from_baseline_back(yesterday, x))

# clean up
last_months_df = last_months_df.drop(['Date', 'days_from_latest', 'Month'], axis=1)
last_months_df = last_months_df.rename(columns={'Relative date': 'Date'})

# add a dummy item name as an example for extensibility
last_months_df['ProductName'] = "product_A"
# rearrange columns
last_months_df = last_months_df[list(ori_cols_order)+["ProductName"]]

# NOTE: We are not fucusing on model performance in this project, and of course, this time conversion 
# hurts the performance because it will mess up the holidays data (sales on holidays)
# time conversion is here for mimick the realistic usage scenario only

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  last_months_df['days_from_latest'] = last_months_df['Date'].apply(lambda x: subtract_date(latest_day, x))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  last_months_df['Relative date'] = last_months_df['days_from_latest'].apply(lambda x: date_from_baseline_back(yesterday, x))


In [10]:
last_months_df.columns = map(lambda x: x.lower(), last_months_df.columns)

In [11]:
last_months_df

Unnamed: 0,store,dayofweek,date,sales,customers,open,promo,stateholiday,schoolholiday,productname
158701,372,7,2023-11-03,0,0,0,0,0,0,product_A
158697,368,7,2023-11-03,0,0,0,0,0,0,product_A
158698,369,7,2023-11-03,0,0,0,0,0,0,product_A
158699,370,7,2023-11-03,0,0,0,0,0,0,product_A
158700,371,7,2023-11-03,0,0,0,0,0,0,product_A
...,...,...,...,...,...,...,...,...,...,...
745,746,2,2024-03-24,4821,426,1,0,0,0,product_A
746,747,2,2024-03-24,6214,589,1,0,0,1,product_A
747,748,2,2024-03-24,4200,378,1,0,0,1,product_A
741,742,2,2024-03-24,6239,707,1,0,0,1,product_A


In [12]:
Base = declarative_base()

class RossmanSalesTable(Base):
    __tablename__ = SALES_TABLE_NAME
    id = Column(Integer, primary_key=True)
    store = Column(Integer)
    dayofweek = Column(Integer)
    date = Column(DateTime)
    sales = Column(Integer)
    customers = Column(Integer)
    open = Column(Integer)
    promo = Column(Integer)
    stateholiday = Column(String)
    schoolholiday = Column(String)
    productname = Column(String)

In [13]:
engine = create_engine(DB_CONNECTION_URL)

In [14]:
# drop if exsist and create table
if engine.has_table(RossmanSalesTable.__tablename__):
    Base.metadata.drop_all(engine, tables=[RossmanSalesTable.__table__])
engine = create_engine(DB_CONNECTION_URL)
Base.metadata.create_all(engine)

  if engine.has_table(RossmanSalesTable.__tablename__):


In [15]:
start = time.time()
# Create a brand new empty table before if_exists='append' is important
# otherwise, the table will be created with the wrong schema
last_months_df.to_sql(SALES_TABLE_NAME, engine, if_exists='append', index=False)
print(f'Putting df to postgres took {time.time()-start:.3f} s')

Putting df to postgres took 3.867 s


## Get last 3 months data from postgres

In [16]:
import sqlalchemy
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker

In [17]:
def open_db_session(engine: sqlalchemy.engine) -> sqlalchemy.orm.Session:
    Session = sessionmaker(bind=engine)
    session = Session()
    return session

In [18]:
def query_last_rows(session, table, date_col='date', last_days=None, last_n=None):
    q = session.query(table)
    table_date = getattr(table, date_col)
    if last_days:
        days_ago = datetime.utcnow() - timedelta(days=last_days)
        days_ago_str = days_ago.strftime('%Y-%m-%d %H:%M:%S')
        # Query the rows added in the last 7 days regardless of database time zone
        q = q.filter(
                func.timezone('UTC', table_date) >= days_ago_str
            )
        if last_n:
            q = q.limit(last_n)
        ret = q.all()
    elif last_n:
        ret = q.order_by(table_date.desc()).limit(last_n).all()
    else:
        ret = q.order_by(table_date.desc()).all()
    return ret

In [19]:
def df_from_query(sql_ret, use_cols) -> pd.DataFrame:
    data = defaultdict(list)
    for row in sql_ret:
        for col in use_cols:
            data[col].append(getattr(row, col))
    df = pd.DataFrame(data).set_index('id')
    return df

In [20]:
engine = create_engine(DB_CONNECTION_URL)

In [21]:
session = open_db_session(engine)

In [22]:
ret = query_last_rows(session, RossmanSalesTable, last_days=(3*30))
# ret = query_last_rows(session, RossmanSalesTable, last_n=10000)

In [23]:
all_cols = [column.name for column in RossmanSalesTable.__table__.columns]

In [24]:
ret_df = df_from_query(ret, all_cols)

In [25]:
ret_df = ret_df.sort_values('date')
ret_df

Unnamed: 0_level_0,store,dayofweek,date,sales,customers,open,promo,stateholiday,schoolholiday,productname
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
60211,369,5,2023-12-27,5727,507,1,0,0,0,product_A
60946,1113,5,2023-12-27,6150,648,1,0,0,0,product_A
60947,1112,5,2023-12-27,6857,646,1,0,0,0,product_A
60948,1111,5,2023-12-27,3584,369,1,0,0,0,product_A
60949,1110,5,2023-12-27,3788,487,1,0,0,0,product_A
...,...,...,...,...,...,...,...,...,...,...
158699,5,2,2024-03-24,3497,445,1,0,0,1,product_A
158698,4,2,2024-03-24,9176,1149,1,0,0,1,product_A
158697,3,2,2024-03-24,5600,661,1,0,0,1,product_A
158703,8,2,2024-03-24,5168,673,1,0,0,1,product_A


### Verify with SQL
SELECT *   
FROM rossman_sales  
WHERE date >= (NOW() AT TIME ZONE 'UTC') - INTERVAL '3 months'  
ORDER BY date DESC;  

Note: Result of INTERVAL '90 days' is an exact match, but a day or 2 diff when using '3 month'