Import Libraries

In [1]:
import pandas as pd
import datetime
from datetime import datetime, timedelta
import os
import polars as pl
import gc
import numpy as np
from tqdm import tqdm
import pyarrow as pa
import pyarrow.parquet as pq

import s3fs
import boto3
from io import BytesIO as bo

In [2]:
#variables -
this_day = datetime.today()
# ### FOR TESTING - REMOVE LATER ### 
# this_day = this_day - timedelta(days=7)
# ####
days_to_monday = (this_day.weekday() - 0) % 7
monday = this_day - timedelta(days=days_to_monday)

CUR_PROC_WK = monday.strftime("%Y%m%d")

PRE_PROC_WK0 = monday - timedelta(days=7)
PRE_PROC_WK = str(PRE_PROC_WK0.year) + str(PRE_PROC_WK0.month).zfill(2) + str(PRE_PROC_WK0.day).zfill(2)

CUR_WK0 = monday - timedelta(days=17)
CUR_WK = str(CUR_WK0.year) + str(CUR_WK0.month).zfill(2) + str(CUR_WK0.day).zfill(2)

PRE_WK0 = monday - timedelta(days=24)
PRE_WK = PRE_WK0.strftime("%Y%m%d")


In [3]:
bucket = 'vortex-staging-a65ced90'

Library names and paths

In [4]:
raw_path = f'PYADM/raw/{CUR_PROC_WK}/inbound/'
curwk = f'PYADM/raw/{CUR_PROC_WK}/dataframes/'
prewk = f'PYADM/raw/{PRE_PROC_WK}/dataframes/'
wkxpn = 'PYADM/weekly/staging/xponent/'
mthxpn = 'PYADM/monthly/staging/xponent/'
pwkxpn = f'PYADM/weekly/archive/{PRE_WK}/xponent/'

In [5]:
# Picking Up Data from Rx_2
# weekly -
wkxpn_LAX_N = pl.read_parquet(f's3://{bucket}/{curwk}wkxpn_LAX_N.parquet') #Source for this is subject to change , may add full version in future?

In [6]:
#Dropping rows where product is null
wkxpn_LAX_N = wkxpn_LAX_N.filter(wkxpn_LAX_N['PROD_CD'] != "")

#Picking up Date Parm from previous code to get week number and merging it rx data
date_parm_wk = pl.read_parquet(f's3://{bucket}/{curwk}curwk_DATE_PARM_WK.parquet')
date_parm_wk = date_parm_wk.with_columns(pl.col('WK_END_DATE').dt.date()) 

#fix for dtype fix | join not working | NEW
wkxpn_LAX_N = wkxpn_LAX_N.with_columns(pl.col('WK_END_DATE').cast(pl.Date))

LAX_N_1 = wkxpn_LAX_N.join(date_parm_wk,on='WK_END_DATE',how='left') # SHOULD I CHANGE THIS TO INNER ? 

#Dropping redundant columns , not pertinet to creation of lax_dn
LAX_N_1 = LAX_N_1.drop(['MKT_CD','MarketName','G_B','PFAM_NAME','PROD_NAME','WK_END_DATE','RO_TYPE'])

#This might come in use when creating buckets , CAN ALSO USE SEG DEF DIRECTLY OR HARD CODE
#fam_prod_mapping = LAX_N_1.select(['PFAM_CD', 'PROD_CD']).unique()

In [7]:
metrics = ['TRX','NRX','TUN','NUN','TUF','NUF']
# Adding a new column called PROD_WK , will contain PROD_CD and the week number of transaction
# this the column on which we transpose the data  
df_with_new_cols = LAX_N_1.with_columns([(pl.col("PROD_CD").cast(pl.Utf8) + "P_" + pl.col("I").cast(pl.Utf8)).alias("PROD_WK")])

# Since PROD_WK now has prod and week info, we dont these columns
df_dropped = df_with_new_cols.drop(['PFAM_CD','PROD_CD','I'])

# Sorting data at IID level to chunk and filter effectively
df_sorted = df_dropped.sort('IID')

#For memory protection
del LAX_N_1
gc.collect()

0

In [8]:
# 'full_unique_vals' contains a list of all possible column names after transposing data
# we will use it to standardize the shape of every chunk, as it will help us concat / stack them each iteration

unique_vals = list(df_sorted['PROD_WK'].unique()) #using full data to get all unique values here
# NOTE : It might very well be possible that some weeks of data may be missing, we might have to add those columns manually at some point

full_unique_vals = []
def unique_vals_prod_wk(col_name):   #this function breaks down PROD_WK to your regular column names like LI1PTUF
    parts = col_name.split('_')
    for m in metrics:
        full = parts[0]+m+parts[-1]
        full_unique_vals.append(full)
    
for i in unique_vals:
    unique_vals_prod_wk(i)

# Could add a modifier here to check and have full 105 weeks of data ?
    
full_unique_vals.sort()
full_unique_vals.insert(0,'IID') #Adding IID because i will also use this list to standardize the order of columns in each chunk

###### QC only

In [9]:
wk_tst = pl.DataFrame()
wk_tst = wk_tst.with_columns(pl.Series(name='col_names_raw',values=full_unique_vals[1:]))

def split_col_names(value):
    prod = value[:3]
    metric = value[4:7]
    wknum = value[7:]
    return prod, metric, wknum

wk_tst = wk_tst.with_columns([pl.col("col_names_raw").map_elements(split_col_names, return_dtype=pl.Object).alias("split_values")])

wk_tst = wk_tst.with_columns([
    pl.col("split_values").map_elements(lambda x: x[0], return_dtype=pl.Utf8).alias("prod"),
    pl.col("split_values").map_elements(lambda x: x[1], return_dtype=pl.Utf8).alias("metric"),
    pl.col("split_values").map_elements(lambda x: x[2], return_dtype=pl.Utf8).alias("wknum"),
])
wk_tst = wk_tst.drop(["split_values","col_names_raw"])

res = wk_tst.group_by(['prod','metric']).agg([pl.col('wknum').n_unique().alias('num_of_wks')])
missing_wknum = res.filter(pl.col('num_of_wks') != 105)
missing_wknum = missing_wknum.sort(by='prod')
print("Number of products in data which do not have 105 weeks of data : ",len(missing_wknum['prod'].unique()))
print(list((missing_wknum['prod'].unique())))
missmps = list((missing_wknum['prod'].unique()))
missing_wknum_print = missing_wknum.select(pl.col(['prod','num_of_wks']))
missing_wknum_print = missing_wknum_print.unique(subset=['prod','num_of_wks'])
print(missing_wknum_print)

wk_conti = pl.DataFrame()
wk_conti = wk_tst.filter(pl.col('prod').is_in(missmps))
wk_conti = wk_conti.drop('metric')
wk_conti = wk_conti.unique(subset=['prod','wknum'])
wk_conti = wk_conti.with_columns(pl.col("wknum").cast(pl.Int32))

print('BUT !  - ')
for prod in missmps:
    f1 = wk_conti.filter(pl.col('prod')== prod )
    if (len(f1['wknum'].unique()) != f1['wknum'].max()):
        print(prod," has gaps in weeks")
    

Number of products in data which do not have 105 weeks of data :  2
['MRB', 'ZEL']
shape: (2, 2)
┌──────┬────────────┐
│ prod ┆ num_of_wks │
│ ---  ┆ ---        │
│ str  ┆ u32        │
╞══════╪════════════╡
│ MRB  ┆ 82         │
│ ZEL  ┆ 58         │
└──────┴────────────┘
BUT !  - 
MRB  has gaps in weeks
ZEL  has gaps in weeks


#### Following Transposes data by chunks worth 5000 HCPS and exports them

In [10]:
unique_iids = df_sorted['IID'].unique() #(This will be the number of rows in final result)
chunk_size = 50000 #Each chunk will contain 5000 HCPs worth of transactions (NOT ROWS, they may differ each chunk)

iid_chunks = [unique_iids[i:i + chunk_size] for i in range(0, len(unique_iids), chunk_size)]
#So IID_chunks is a list of lists, each list contains 5000 HCPs and number of lists is our number of chunks

Basic logic for the following section -(DOUBLE CLICK FOR MORE READABLITY)
So, 'df_sorted' is your base data (lax_n from previous code) with-
1)Records with no prod_cd removed
2)Redundant columns dropped and a new column 'PROD_WK' containing prod code and week number (from 1 to 105)
3)Then sorted on IID to make filtering faster ? (that sort may be reundant actually but doesnt hurt as its pretty fast)

We need to transpose it on PROD_WK, but we cant do the operation on the whole dataframe in one go, as it would be too memory
intensive for the operations we need to do , hence we break down the task into destinct chunks and export the result part by part.

Now when we tranpose a data across a certain column, in our case PROD_WK, and since our resultant data will be unique on IID,
i.e, Since IID will be the index, we need to make sure all the records for a given IID are not spread accross multiple chunks,
otherwise we will get multiple records with the same IID in the transposed dataset. 
Hence, 
1)'df_chunk' - each chunk taken from df_sorted will contain ALL the records for the given 5000 hcps, this is done by a simple is_in() function and
the previously made series 'iid_chunks'.
2)'df_pivot_chunk' - Used polars libary inbult data transpose function.
3)applied a lamba function to rename the columns after tranposing.
4)Now Since all the chunks need to eventually fit in one file , we need to make sure their structure is same.
Their shapes will be inherently different as each chunk may not have the same number of products or weeks's worth of transactions.
to fix this we can use the list of all distinct possible column names we created before 'full_unique_vals' and loop over it
to add any column not present in a given chunk.
NOTE : This step on its own is pretty time consuming. Things to consider:
We are forcefully populating it with series 'None' values of same length as the chunk (5000).
Polars stores 'None' values a bit differently, its a distinct datatype holding a non zero amount of space in memory.
https://pola-rs.github.io/polars/py-polars/html/reference/datatypes.html
This action alone will cause the size of the chunk to jump. Alternative options were explored , like passing blanks [] in the seires
but that causes their length to truncate to 0 , and unsuable to be fitted into a column.
Using other libraries other than polars could also be a longshot , but probably wont be worth the trade off of speed and stablity.

5)A simple column reorder is done to using the full_unique_vals list which is now possible because all of those columns were manually added.
Polars inbuilt function 'select' is used to faciliate this.

The chunk of data 'df_pivot_chunk' is now fully processed and is ready to be exported.
After testing with of variations , I've come to the conclusion that :
-Holding 5 chunks worth of data in memory (Faciliated by appending / concatination / (using vstack() function))
-And then writing to an external file
Will be the most stable and time efficient.
Loops that only process a chunk and dont do I/O take about 5 to 7 Seconds, on Loops where 'df_final" is being exported to an external parquet file
take 25 to 30 seconds. (Note : The system can handle upto 10 chunks in memory if background useage was low but that causes a trade-off by causing
I/O loops to take longer. Consider Min-Maxing the Iteration Cycle if System memory changes)

6)'df_final' holds data 5 chunks, and is exported to curwk location using pyarrow libary.
pyarrow seems to be working the most stable as compared to Polars or Pandas related parquet export functions.

Future Note :
used tqdm library to track progress : First recorded runtime on 13th Dec 2023 was 15~16 Mins.
The most time consuming part of this loop is adding Null Values and the Export of chunks.
Space inefficieny can also be looked into because of the Null Values.

In [11]:
writer = None  #this is used by pyarrow to write data in chunks to external file
df_final = pl.DataFrame()  #so, df_final will be the object holding final data and being used for exporting
loop_counter = 0 #just for utility

for iid_chunk in tqdm(iid_chunks):

    df_chunk = df_sorted.filter(pl.col('IID').is_in(iid_chunk))
    df_pivot_chunk = df_chunk.pivot(values=metrics,index='IID',columns='PROD_WK',maintain_order=True,sort_columns=True)
    df_pivot_chunk = df_pivot_chunk.select(pl.all().name.map(lambda col_name: col_name.split('_')[3] + col_name.split('_')[0] + col_name.split('_')[-1] if 'PROD_WK_' in col_name else col_name))

    missing_cols = [col for col in full_unique_vals if col not in df_pivot_chunk.columns]
    for col in missing_cols: # This Takes 5 Seconds
        null_series = pl.Series(col, [None]*len(df_pivot_chunk), dtype=pl.Float64)
        df_pivot_chunk = df_pivot_chunk.with_columns(null_series)

    df_pivot_chunk = df_pivot_chunk.select(full_unique_vals)

    df_final = df_final.vstack(df_pivot_chunk)

    if loop_counter % 3 == 0 and loop_counter != 0: # This takes about 25 seconds ? TTT Should be ~ 30 Secs
        table = df_final.to_arrow()
        if writer is None:
            #writer = pq.ParquetWriter(curwk+'\\df_final.parquet', table.schema)
            writer = pq.ParquetWriter(f's3://{bucket}/{curwk}df_final.parquet',table.schema)
        writer.write_table(table)
        df_final = pl.DataFrame() # Reset df_final after writing to file

    loop_counter += 1

# Write any remaining chunks to the Parquet file
if len(df_final) > 0:
    table = df_final.to_arrow()
    if writer is None:
        #writer = pq.ParquetWriter(curwk+'\\df_final.parquet', table.schema)
        writer = pq.ParquetWriter(f's3://{bucket}/{curwk}df_final.parquet',table.schema)
    writer.write_table(table)

# Close the ParquetWriter
if writer is not None:
    writer.close()


100%|██████████| 11/11 [02:44<00:00, 14.97s/it]


In [12]:
#for memory preservation - WIP , TWEAK AS REALTIME RUNS Progress

# for obj in gc.get_objects():
#     if isinstance(obj, pl.dataframe.frame.DataFrame):
#         varnames = [varname for varname, varval in globals().items() if varval is obj]
#         size_gb = obj.estimated_size(unit='gb')
#         print(f"Variable names: {varnames}, Estimated size (GB): {size_gb}")


del df_with_new_cols
del df_dropped
del df_sorted
del df_chunk
del df_pivot_chunk
del df_final

gc.collect()

0

#### May have to add a restart kernel clause here because of sytem stablity
##### Followed by re declaring variables and paths 

Notes for next section -
The previous section processed data at IID level for all the products , metrics for 105 weeks (curwk, df_final)
But we still need to add columns for product family and markets.

-Groupings for product Family (F) and product markets (M) are hard coded bellow
-Using pyarrow lib to read data in chunks from df_final.parquet
-Applied same logic as previous code : Read and Process 5 Chunks , Then Export 

The main function for this is to create all the prod family and prod market columns from 'prod_family_market_buckets'
New columns are created by summing the columns present in the corrosponding lists for a prod family/market

In [13]:
prod_family_market_buckets = {
    "MRXF" : ["MRGP","MRBP","GLYP"],
    "LINF" : ["LI1P","LI2P","LI3P"],
    "LUBF" : ["AMTP","LUBP"],
    "GENM" : ["FLXP","LACP","LUBP","MRGP","GLYP"],
    "BRDM" : ["AMTP","MRBP","LI1P","LI2P","LI3P","TRUP","MOTP","ZELP","IRLP"],
    "LAXM" : ["AMTP","FLXP","LACP","LUBP","MRGP","MRBP","LI1P","LI2P","LI3P","TRUP","GLYP","MOTP","ZELP","IRLP"]
}
metrics = ['TRX','NRX','TUN','NUN','TUF','NUF']

In [14]:
#parquet_file = pq.ParquetFile(curwk+'\\df_final.parquet') 
parquet_file = pq.ParquetFile(f's3://{bucket}/{curwk}df_final.parquet') #25 minutes for this
writer = None
wkxpn_LAX_DN = pl.DataFrame()
loop_counter = 0

for batch in tqdm(parquet_file.iter_batches(batch_size=30000)):
    
    pl_batch = pl.from_arrow(batch)
    all_columns = pl_batch.columns 
    
    for prod_family, prod_codes in prod_family_market_buckets.items():
        for metric in metrics:
            relevant_columns = [col for col in all_columns if any(prod_code + metric in col for prod_code in prod_codes)]
            week_numbers = sorted(set(int(col.split(metric)[-1]) for col in relevant_columns))
            for week_number in week_numbers:
                new_column = prod_family + metric + str(week_number)
                week_columns = [col for col in relevant_columns if col.endswith(metric + str(week_number))]
                #pl_batch = pl_batch.with_columns(sum(pl.col(c) for c in week_columns).alias(new_column)) # This Gives Nulls !
                pl_batch = pl_batch.with_columns(pl.sum_horizontal(week_columns).alias(new_column))

    wkxpn_LAX_DN = wkxpn_LAX_DN.vstack(pl_batch)
    
    if loop_counter % 3 == 0 and loop_counter != 0:
        table = wkxpn_LAX_DN.to_arrow()
        if writer is None:
            #writer = pq.ParquetWriter(wkxpn+'\\LAX_DN.parquet', table.schema)
            writer = pq.ParquetWriter(f's3://{bucket}/{wkxpn}LAX_DN.parquet', table.schema)
        writer.write_table(table)
        wkxpn_LAX_DN = pl.DataFrame()
    
    loop_counter += 1

# Write any remaining chunks to the Parquet file
if len(wkxpn_LAX_DN) > 0:
    table = wkxpn_LAX_DN.to_arrow()
    if writer is None:
        #writer = pq.ParquetWriter(wkxpn+'\\LAX_DN.parquet', table.schema)
        writer = pq.ParquetWriter(f's3://{bucket}/{wkxpn}LAX_DN.parquet', table.schema)
    writer.write_table(table)

# Close the ParquetWriter
if writer is not None:
    writer.close()

#LAX_DN FOR WEEKLY level is complete !

18it [10:41, 35.64s/it]
