### Packages and username

In [1]:
import pandas as pd
import os
import numpy as np
import glob
from pathlib import Path
from datetime import datetime as dt
from datetime import timedelta
import pyodbc
import shutil
import getpass

# path 
home = Path.home()
today = dt.today().date()

# username for collaboration
username = getpass.getuser()
username

'QiXu260'

### Define path and load data

In [2]:
# dateragnes
dateRange = [today - timedelta(days = x) for x in range(365)]
# dateRange = [i.strftime("%Y%m%d") for i in dateRange]

# change type
today = today.strftime("%Y%m%d")

ODMdict = {
    'FWH' : 'WHFXN',
    'FXN' : 'WHFXN',
    'Foxconn WH' : 'WHFXN',
    'Compal' : 'KSCEI',
    'CEI' : 'KSCEI',
    'Wistron' : 'CQWIS',
    'wistron' : 'CQWIS',
    'Wistron CQ': 'CQWCQ',
    'WCQ': 'CQWIS',
    'Inventec' : 'CQIEC',
    'Inventec CQ' : 'CQIEC',
    'Quanta' : 'CQQCI',
    'Quanta CQ' : 'CQQCI',
    'Pegatron' : 'CQPCQ'
}

# reverse date range to correct sort
dateRange.reverse()

In [3]:
# define path
if username == 'panj':
    path = os.path.join(home,'New OneDrive', 'HP Inc','GPS TW Innovation - 文件','Project team','Project RiXin - Shortage management') # Jesse 
else:
    path = os.path.join(home, 'New OneDrive','HP Inc','GPS TW Innovation - Documents','Project team','Project RiXin - Shortage management') # Dustin

# define path
if username == 'panj':
    PNFVPath = Path(home, 'New OneDrive','HP Inc', 'GPS TW Innovation - 文件', 'Users', 'BSP','Shortage management related (Ri Xin)', 'PN FV description mapping table_ALL.xlsx') # Jesse
else:
    PNFVPath = Path(home, 'New OneDrive','HP Inc', 'GPS TW Innovation - Documents', 'Users', 'BSP','Shortage management related (Ri Xin)', 'PN FV description mapping table_ALL.xlsx') # Dustin

# read critical shortage file, external report, and PNFV table
target = Path (path,'Single shortage')
ExternalReportFolder = Path(path, 'External_Destination', 'today')
PNFVFile = pd.read_excel(PNFVPath)
PNFVFile = PNFVFile [['PN', 'Descr']]
PNFVFile = PNFVFile.rename(columns = {'PN': 'HP PN'})

### Clean up dataframe

In [4]:
# this function is tool clean the external 
def clean(fname: str, file : pd.DataFrame, externalReportDate : str) -> pd.DataFrame:

    currentYear = dt.now().year
    currentday = fname.split('\\')[-1][-13:-5]
    file = file.assign(LastSGreportDate = currentday)
    
    file['LastSGreportDate'] = file['LastSGreportDate'].apply(lambda x: dt.strptime(x, '%Y%m%d'))
    file['LastSGreportDate'] = pd.to_datetime(file['LastSGreportDate'])

    file = file.assign(reportDate = externalReportDate)

    file['reportDate'] = file['reportDate'].apply(lambda x: dt.strptime(x, '%Y%m%d'))
    file['reportDate'] = pd.to_datetime(file['reportDate'])

    file['HP PN'] = file['HP PN'].apply(lambda x: x[:10])

    # clean
    file.columns = file.columns.str.strip()
    
    # replace ODM name
    file['ODM'] = file['ODM'].replace(ODMdict)
    return file   

### Create date key for lookup

In [5]:
# get today's file from archieved folder
fileList = [str(x) for x in Path(target,'Archive').glob("*xlsx")]
fileListDateNum = [dt.strptime(x[-13:-5], "%Y%m%d").date() for x in fileList]

errorList = []
resultList = []
lookupSGdateList = []

# create the tuple list looking for critical shortage
i = 0
for _ in dateRange:
    if i == len(fileListDateNum)-1:
        lookupSGdateList.append(fileListDateNum[i])
        continue
    
    if _ < fileListDateNum[i+1]:
        lookupSGdateList.append(fileListDateNum[i])
    else:
        i = i + 1
        lookupSGdateList.append(fileListDateNum[i])

# clean up key format
dateRange = [i.strftime("%Y%m%d") for i in dateRange]
lookupSGdateList = [i.strftime("%Y%m%d") for i in lookupSGdateList]
zip(dateRange, lookupSGdateList)

<zip at 0x1bf3167c8c0>

In [6]:
def addKey(res: pd.DataFrame) -> tuple[list, pd.DataFrame]:
    LatestSGMaterial = res
    LatestSGMaterial = LatestSGMaterial.merge(PNFVFile, on = 'HP PN', how = 'left')
    LatestSGMaterial['Key'] = LatestSGMaterial['ODM'] + LatestSGMaterial['Descr']
    KeyList = LatestSGMaterial['Key'].tolist()
    print("addKey done!")
    return KeyList, res

### Concat current day external report

In [7]:
def concatExternal(extReportPathList: list, KeyList: list) -> pd.DataFrame:
    externalResultDFList = []
    if not extReportPathList:
        return 

    for _ in extReportPathList:
        try: 
            temp = pd.read_excel(_)
            temp['ODM'] = temp['ODM'].ffill()
            temp['ODM'] = temp['ODM'].replace(ODMdict)
            temp['FV/Des'] = temp['FV/Des'].ffill()
            #temp['ETA'] = temp['ETA'].ffill()
            temp['key'] = temp['ODM'] + temp['FV/Des']
            temp = temp[temp.key.isin(KeyList)]
            
            try:
                temp = temp[['ODM', 'FV/Des', 'HP_PN', 'ETA', 'GPS Remark']]
            except:
                temp = temp[['ODM', 'FV/Des', 'HP PN', 'ETA', 'GPS Remark']]
                temp = temp.rename(columns = {'HP PN' : 'HP_PN'})
                print("Rocky wrong format!")
                
            temp = temp.groupby(['ODM', 'FV/Des']).agg({'ETA' : lambda x: '\n'.join(set(x.dropna())),
                                                        'GPS Remark': lambda x: '\n'.join(set(x.dropna()))})
            temp = temp.reset_index()
            if len(temp) > 0:
                print(len(temp))
                externalResultDFList.append(temp)
            else:
                pass
        except Exception as e:
            print(e)
            print(_)
    try:
        externalResultDF = pd.concat(externalResultDFList)
    except Exception as e:
        print(e)
        print("No single shortage match!")
        return pd.DataFrame(columns = ['ODM', 'FV/Des', 'HP_PN', 'ETA', 'GPS Remark'])
    print('External process done!')
    return externalResultDF

### Lookup PNFV and merge external reportm then output

In [8]:
def mergeNoutput(SGres: pd.DataFrame, extRes: pd.DataFrame, dateStr: str) -> None:

    # give new FV based on PN on single shortage 
    SGres = SGres.merge(PNFVFile.rename(columns = {'PN': 'HP PN'}), on = 'HP PN', how = 'left')
    # merge FD on FV
    SGres = SGres.merge(extRes.rename(columns = {'FV/Des' : 'Descr'}), on = ['ODM', 'Descr'], how = 'left')
    SGres = SGres.drop_duplicates()
    # 
    SGres.to_excel(Path(target, 'test','total singal shortage_' + dateStr +'.xlsx'), index = False)
    print("Output done!")
    return SGres

### Output result

In [9]:
# reverse for sorting
dateRange.reverse()
lookupSGdateList.reverse()

# for i, j in zip(dateRange[0], lookupSGdateList[0]):
#     print(i, j)

# glob all file in single shortage to check which file
[f for f in glob.glob(str(Path(target, 'Single shortage ' + '*')))][-1]

'C:\\Users\\QiXu260\\New OneDrive\\HP Inc\\GPS TW Innovation - Documents\\Project team\\Project RiXin - Shortage management\\Single shortage\\Single shortage 20231230.xlsx'

In [10]:
et = today
# et = '20231229'
# sg = '20230209'

fname = [f for f in glob.glob(str(Path(target, 'Single shortage ' + '*')))][-1]
ExternalReport = [f for f in glob.glob(str(Path(ExternalReportFolder, et + '*')))]

if not ExternalReport:
    print("No external on " + et)
    # exit()
try:
    file = pd.read_excel(str(fname))
    sg_res = clean(str(fname), file, et)
    print(str(fname) + " process done!")
except Exception as e:
    errorList.append([str(fname), e])
    print(e)
    print(str(fname) + " process failed!")
    # exit()

k, sg_res = addKey(sg_res)
ext_res = concatExternal(ExternalReport, k)
if ext_res is None:
    print("No external on " + et)
    # exit()

sg_result = mergeNoutput(sg_res, ext_res, et)
sg_result

C:\Users\QiXu260\New OneDrive\HP Inc\GPS TW Innovation - Documents\Project team\Project RiXin - Shortage management\Single shortage\Single shortage 20231230.xlsx process done!
addKey done!
"['GPS Remark'] not in index"
C:\Users\QiXu260\New OneDrive\HP Inc\GPS TW Innovation - Documents\Project team\Project RiXin - Shortage management\External_Destination\today\20241210_AC ADAPTOR_CEI_CuiAimme(BSPCN).xlsx
"['GPS Remark'] not in index"
C:\Users\QiXu260\New OneDrive\HP Inc\GPS TW Innovation - Documents\Project team\Project RiXin - Shortage management\External_Destination\today\20241210_AC ADAPTOR_CQQCI_ChenYona(BSPCN).xlsx
"['GPS Remark'] not in index"
C:\Users\QiXu260\New OneDrive\HP Inc\GPS TW Innovation - Documents\Project team\Project RiXin - Shortage management\External_Destination\today\20241210_AC ADAPTOR_CQQCI_CuiAimme(BSPCN).xlsx
"['GPS Remark'] not in index"
C:\Users\QiXu260\New OneDrive\HP Inc\GPS TW Innovation - Documents\Project team\Project RiXin - Shortage management\Externa

Unnamed: 0,Commodity,Critical shortage qty,ODM,FV/Des,HP PN,Remark,LastSGreportDate,reportDate,Descr,HP_PN,ETA,GPS Remark
0,SSS,11,CQIEC,SSD 4TB 2280 M2 PCIE 4x4 NVME TLC,N12390-001,Pull-in ASAP,2023-12-30,2024-12-10,SSD 4TB 2280 M2 PCIE 4x4 NVME TLC,,,


In [11]:
# upload to single shortage table
conn = pyodbc.connect('Driver={SQL Server}; Server=g7w11206g.inc.hpicorp.net; Database=CSI; Trusted_Connection=Yes;')
cursor = conn.cursor()

# clean up to prevent error
sg_result['ETA'] = sg_result['ETA'].apply(lambda x: x.replace("'","") if type(x) == str else x)

# upload rows by rows
for index, row in sg_result.iterrows():
    sg_Commodity = row['Commodity']
    sg_Qty = row['Critical shortage qty']
    sg_ODM = row['ODM']
    sg_PN_all = row['HP PN']
    sg_reportDate = row['reportDate']
    sg_ETA = row['ETA']
    sg_gpsRemark = row['GPS Remark']

    # Corrected SQL INSERT statement
    cursor.execute(f"""
        INSERT INTO CSI.OPS.GPS_tbl_ops_Single_shortage 
        (Commodity, [Single Shortage QTY], ODM, [HP PN], reportDate, ETA, [GPS Remark])
        VALUES ('{sg_Commodity}', '{sg_Qty}', '{sg_ODM}', '{sg_PN_all}', '{sg_reportDate}', '{sg_ETA}', '{sg_gpsRemark}')
        """.replace("'NaT'", "NULL").replace("'nan'", "NULL"))

# remember to close
conn.commit()
conn.close()


In [12]:
# send external to archieved
ExternalreportArchiveFolder = os.path.join(path, 'External_Destination', 'Archive')

for f in os.listdir(ExternalReportFolder):
    if f.endswith('.xlsx'):
        shutil.move(os.path.join(ExternalReportFolder, f), os.path.join(ExternalreportArchiveFolder, f))

for f in os.listdir(os.path.join(ExternalReportFolder, 'amend')):
    if f.endswith('.xlsx'):
        shutil.move(os.path.join(ExternalReportFolder, 'amend', f), os.path.join(ExternalreportArchiveFolder, f))