# Chain Store Budget Planner

## PART 0: INTRO

The app is for the chain convenience store (5 stores in total) to manage product supply and budget expenses. There are three types of users : the owner (only 1 owner in total), the managers(1 manager for each store,5 in total), and the workers(3 workers for each store, 15 workers in total). 

The code is divided into two parts: Part 1 are the definitions of functions and classess; Part 2 shows the procedures and interfaces for different types of users.

### Main functions :

- User registration and login system: 
    - the owner can sign up new accounts for managers, the manager can sign up new accounts for workers;
    - managers and workers will be given username and passwords to log in;
    - workers have no right to sign up new accounts.
- Different interfaces for owner/manager/worker:
    - worker: 
        - keep record of transactions;
    - manager: 
        - daily report of sold-out product and suggest amounts to buy for the next 7 days;
    - owner: 
        - monthly report of sale performance and profits
        - decide manager bonus according to sale performance and total profits;

### Table Reference:
- df_users: except for the owner(username & password pre-given), other users' information is generated in sign-up process; this table contain information of registered users, including their username/password/identity/leader
- df_type: pre-given;contain product_id and their correspondant types (so that workers don't have to input type every time)
- df_records : generated by workers; contain transaction records inputed by workers
- df_sale_speed : generated by manager using the 'product supply management' function; contain today's sold-out products and their estimated sale speed (for a particular manager/store)

## PART 1: Define Class & Functions

In [1]:
from pyspark.sql import SparkSession
from datetime import date as dt
spark=SparkSession.builder.appName('Budget Planner').getOrCreate()

### Class 1: Sign-in

- input: username, password
- output:
    - signin(): if signed in, user's identity (if signed in )
    - match_uname(): whether the username exists

In [2]:
class Sign():
    def __init__(self, uname,pwd):
        self.uname = uname
        self.pwd = pwd
    def signin(self,df): # df is the table of registered username and passwords (incuding their identity and leader)
        rows = df.collect()
        for i in range(len(rows)):
            if rows[i].username == self.uname and rows[i].password ==self.pwd:
                return True,rows[i].identity
        return False,''
    def match_uname(self,df):
        rows = df.collect()
        for i in range(len(rows)):
            if rows[i].username == self.uname:
                return True
        return False

### Class 2: Transaction Records

- add_record():
    - input: target table (which keep the records),date, and other sale information
    - output: target table updated

In [3]:
# dataframe of product_id and their types are pre-given
vals = [('000','A'),('001','A'),('002','A'),\
       ('003','B'),('004','B'),('005','B'),\
        ('006','C'),('007','C'),('008','C')]
cols = ['pid','type']
df_ptype = spark.createDataFrame(vals,cols)
# df_ptype.coalesce(1).write.format("csv").option("header","True").save('./df')

# initialize the record table
columns2 = ['date','product_id','unit_price', 'units','type','inputer']
vals = [(0,0,0,0,0,'')]
df_records = spark.createDataFrame(vals,columns2)
df_records = df_records.filter(df_records['date']!=0)

class Transaction():
    def __init__(self):
        global df_ptype
        self.df_ptype = df_ptype
        self.record_cols = ['date','product_id','unit price', 'units','type','inputer']
    def add_record(self,df,date,in_out,pid,unit_price,units,inputer):
        # firstly, match product type
        ptype ='UNKNOWN'
        for i in self.df_ptype.collect():
            if i.pid ==pid:
                ptype = i.type
                break
        # in means import products, out means sell products to customers
        if in_out == 'in':
            newRow = spark.createDataFrame([(int(date),pid,float(unit_price),int(units),ptype,inputer)],self.record_cols)
        else:
            newRow = spark.createDataFrame([(int(date),pid,float(unit_price),-int(units),ptype,inputer)],self.record_cols)
        df = df.union(newRow) 
        return df

### Class 3: Product Supply Management

- will report today's sold-out products and suggest amounts to buy for the next 7 days according to historical sale speed
- sold_out():
    - input: target table ( which keep records)
    - output: the list of products(product_ids) which is sold-out today
- sale_speed():
    - input: sold-out product list
    - output: sold-out product_id and prediction of import amounts for next 7 days

In [4]:
class Supply():
    def __init__(self,leader):
        self.leader = leader
    def sold_out(self,df_records):
        today = int(dt.today().strftime('%Y%m%d'))
        df_records.registerTempTable('table_records')
        df_users.registerTempTable('table_users')
        today_stock = spark.sql("SELECT {} as today, leader, product_id, sum(units) as stock\
                  FROM table_records r\
                  left join table_users u\
                  on r.inputer = u.username\
                  WHERE date <= {} \
                  group by product_id,leader".format(today,today))
        today_stock = today_stock.filter(today_stock['leader']==self.leader)
        sold_out = today_stock.filter(today_stock['stock']==0).select('today','leader','product_id')
        return sold_out
    def sale_speed(self,df_sold_out):
        today = int(dt.today().strftime('%Y%m%d'))
        temp = spark.sql('select rank(date) over (partition by product_id order by date desc) as rank\
                            ,date, product_id, units\
                    from table_records \
                    where date <{} and units>0'.format(today))
        temp.registerTempTable('temp_table')
        # the last date you import the product and its units
        df_last_in = spark.sql('select date as last_in_date, product_id, sum(units) as units\
                        from temp_table\
                        where rank=1\
                        group by date,product_id')
        df_last_in.registerTempTable('table_last_in')
        df_sold_out.registerTempTable('table_sold_out')
        df_sale_speed = spark.sql('select s.leader, s.product_id, round((l.units/(s.today-l.last_in_date))*7,2) as week_sale_speed\
                    from table_sold_out s\
                    left join table_last_in l\
                    on s.product_id = l.product_id')
        spark.catalog.dropTempView("temp_table")
        spark.catalog.dropTempView("table_last_in")
        spark.catalog.dropTempView("table_sold_out")
        return df_sale_speed

### Class 4: Monthly report for Owner

- metrics() 
    - input: month, dimension
        - there're three dimensions (groupingby) : product type, manager, total 
    - output: Manager's sale performance metrics:
        - manager's performance metrics: sale units, sale values, sale diversity, profits
            - if dimension is product type, sale diversity is defined as distinct numbers of product_ids
            - if dimension is manager or total, sale diversity is defined as distinct numbers of product types
- bonus():
    - after owner deciding the weights for different metrics and percentage of profits as bonus, will generate a mark for each manager and select top-3 managers, and give different amounts of bonus to top3 managers according to their marks
    - input: table of manager performance metrics, weights for metrics, percentage as bonus
    - output: show top3 managers and their bonus amounts

In [5]:
class Monthly_report():
    def __init__(self,df):
        self.df_records =df
    def metrics(self,month,dimension):
        df = self.df_records
        if dimension in ('manager','type'):
            sale_units = df.filter((df['month'] ==month) & (df['units']<0)).select('month',dimension,'units')\
                        .groupby(['month',dimension]).sum('units').withColumnRenamed('sum(units)','sale units')
            sale_value = df.filter((df['month'] ==month) & (df['units']<0)).select('month',dimension,'value')\
                        .groupby(['month',dimension]).sum('value').withColumnRenamed('sum(value)','sale value')
            if dimension == 'type':
                sale_diversity = df.filter((df['month'] ==month) & (df['units']<0))\
                        .select('month',dimension,'product_id').distinct().groupby(['month',dimension]).count()\
                        .withColumnRenamed('count','sale diversity')
            else:
                sale_diversity = df.filter((df['month'] ==month) & (df['units']<0))\
                        .select('month',dimension,'type').distinct().groupby(['month',dimension]).count()\
                        .withColumnRenamed('count','sale diversity')
            profit = df.filter(df['month'] ==month).select('month',dimension,'value')\
                    .groupby(['month',dimension]).sum('value').withColumnRenamed('sum(value)','profit')
            
            s1,s2,s3,s4 = sale_units,sale_value,sale_diversity,profit
            df1 = s1.join(s2,s1[dimension]==s2[dimension],'left').select(s1.month,s1[dimension],'sale units','sale value')
            df2 = df1.join(s3,df1[dimension]==s3[dimension],'left').select(df1.month,df1[dimension],'sale units','sale value','sale diversity')
            df3 = df2.join(s4,df2[dimension]==s4[dimension],'left').select(df2.month,df2[dimension],'sale units','sale value','sale diversity',s4['profit'])
        else:
            sale_units = df.filter((df['month'] ==month) & (df['units']<0)).select('month','units')\
                        .groupby('month').sum('units').withColumnRenamed('sum(units)','sale units')
            sale_value = df.filter((df['month'] ==month) & (df['units']<0)).select('month','value')\
                        .groupby('month').sum('value').withColumnRenamed('sum(value)','sale value')
            sale_diversity = df.filter((df['month'] ==month) & (df['units']<0)).select('month','type')\
                        .distinct().groupby('month').count().withColumnRenamed('count','sale diversity')
            profit = df.filter(df['month'] ==month).select('month','value').groupby('month').sum('value')\
                        .withColumnRenamed('sum(value)','profit')
            s1,s2,s3,s4 = sale_units,sale_value,sale_diversity,profit
            df1 = s1.join(s2,s1['month']==s2['month'],'left').select(s1.month,'sale units','sale value')
            df2 = df1.join(s3,df1['month']==s3['month'],'left').select(df1.month,'sale units','sale value','sale diversity')
            df3 = df2.join(s4,df2['month']==s4['month'],'left').select(df2.month,'sale units','sale value','sale diversity',s4['profit'])
        return df3
    def bonus(self,manager_report,w,bonus_percentage): # w is a list of metric weights
        mark_udf = udf(lambda a,b,c,d:a*w[0]+b*w[1]+c*w[2]+d*w[3],FloatType())
        manager_report = manager_report.withColumn('marks',mark_udf(manager_report['sale units'],manager_report['sale value']\
                                                            ,manager_report['sale diversity'],manager_report['profit']))\
                                .select('month','manager','marks').orderBy('marks',ascending=False)
        top3_manager_marks = manager_report.head(3)
        top3_marks = [i.marks for i in top3_manager_marks]
        total_marks = sum(top3_marks) # float
        total_report = report.metrics(month,'total')
        total_profit = total_report.collect()[0].profit # float
        bonus_udf = udf(lambda x:(total_profit*int(p)/100)*(x/total_profit),FloatType())
        if total_profit<0:
            manager_report.withColumn('bonus',0).show(3)
        else:
            manager_report.withColumn('bonus',bonus_udf(manager_report['marks'])).show(3)

## PART 2: Procedural Interfaces

### Interface 1: Sign-in & Sign-up

#### Step 1: Owner Sign-in

- Firstly, the system needs owner to log in and sign up for managers
- The owner's username and password is pre-given, which indicates his rights to sign up for others
- If user entered username or password wrongly for 5 times, his account will be locked.

In [6]:
# Initialize DataFrame to store username and password
# the owner's account is pre-given
columns = ['username', 'password','identity','leader']
vals = [('BOSS','boss666','owner','BOSS')]
df_users = spark.createDataFrame(vals,columns)

In [7]:
print('Please log in to verify your identity.Enter:')
uname = input('Username: ')
pwd = input('Password: ')

user = Sign(uname,pwd)
res = user.signin(df_users)
signed_in = res[0]
identity = res[1]

t=0
if user.signin(df_users)[0]:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
else:
    while not user.signin(df_users)[0] and t<5:
        print('The username or password is incorrect, please re-enter.')
        uname = input('Username: ')
        pwd = input('Password: ')
        user = Sign(uname,pwd)
        t+=1
if t==5:
    print('You have tried 5 times, your account will be temporarily locked.')
elif t>0:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
    user = Sign(uname,pwd)
    res = user.signin(df_users)
    signed_in = res[0]
    identity = res[1]

Please log in to verify your identity.Enter:


Username:  BOSS
Password:  boss666


Welcome! owner BOSS.


#### Step 2: Owner Sign-up for Managers

- If you're the owner, you can sign up for managers
- If you're the manager, you can sign up for workers
- Rules for username and password:
    - Only alphabets or numbers are allowed
    - Different users cannot have identical usernames

In [8]:
# check the identity, worker cannot sign up for others
if identity =='worker':
    print('Sorry, you have no access to the sign-up system.')
elif identity =='':
    print('Sorry, you have not signed in. Please log in to proceed.')
else:
    print('Proceeding to sign up...')
    print('[Kindly note that only alphabets and numbers are allowed in your username and password.]')
    uname = input('Please enter the username: ')
    
# firstly, check the format of username
    match_user = Sign(uname,'')
    while not uname.isalnum() or match_user.match_uname(df_users):
        print('Username is taken or contains non-alphabets/non-numbers. Please re-enter.')
        uname = input('Please enter the username: ')
        match_user = Sign(uname,'')       
# if username valid and not taken, input password and check the format
    pwd = input('Password: ')
    pwd2 = input('Re-enter the password to verify: ')
    while not pwd.isalnum() or pwd!=pwd2:
        print('Passwords are not the same or contain non-alphabets/non-numbers. Please re-enter.')
        pwd = input('Password: ')  
        pwd2 = input('Re-enter the password to verify: ')
        
# register
    if identity =='owner':
        newRow = spark.createDataFrame([(uname,pwd,'manager',user.uname)], columns)
    else:
        newRow = spark.createDataFrame([(uname,pwd,'worker',user.uname)], columns)
    print('You have successfully signed up for {} {}.'.format(newRow.collect()[0].identity,uname))
    df_users = df_users.union(newRow)   

Proceeding to sign up...
[Kindly note that only alphabets and numbers are allowed in your username and password.]


Please enter the username:  M1
Password:  111
Re-enter the password to verify:  111


You have successfully signed up for manager M1.


In [9]:
df_users.show()

+--------+--------+--------+------+
|username|password|identity|leader|
+--------+--------+--------+------+
|    BOSS| boss666|   owner|  BOSS|
|      M1|     111| manager|  BOSS|
+--------+--------+--------+------+



#### Step 3: Managers sign up for workers

- Firstly, the manager sign in to verify identity

In [11]:
print('Please log in to verify your identity.Enter:')
uname = input('Username: ')
pwd = input('Password: ')

user = Sign(uname,pwd)
res = user.signin(df_users)
signed_in = res[0]
identity = res[1]

t=0
if user.signin(df_users)[0]:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
else:
    while not user.signin(df_users)[0] and t<5:
        print('The username or password is incorrect, please re-enter.')
        uname = input('Username: ')
        pwd = input('Password: ')
        user = Sign(uname,pwd)
        t+=1
if t==5:
    print('You have tried 5 times, your account will be temporarily locked.')
elif t>0:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
    user = Sign(uname,pwd)
    res = user.signin(df_users)
    signed_in = res[0]
    identity = res[1]

Please log in to verify your identity.Enter:


Username:  M1
Password:  111


Welcome! manager M1.


- Then, the manager sign up for workers
- If you logged in as worker, you cannot sign up for others

In [12]:
# check the identity, worker cannot sign up for others
if identity =='worker':
    print('Sorry, you have no access to the sign-up system.')
elif identity =='':
    print('Sorry, you have not signed in. Please log in to proceed.')
else:
    print('Proceeding to sign up...')
    print('[Kindly note that only alphabets and numbers are allowed in your username and password.]')
    uname = input('Please enter the username: ')
    
# firstly, check the format of username
    match_user = Sign(uname,'')
    while not uname.isalnum() or match_user.match_uname(df_users):
        print('Username is taken or contains non-alphabets/non-numbers. Please re-enter.')
        uname = input('Please enter the username: ')
        match_user = Sign(uname,'')       
# if username valid and not taken, input password and check the format
    pwd = input('Password: ')
    pwd2 = input('Re-enter the password to verify: ')
    while not pwd.isalnum() or pwd!=pwd2:
        print('Passwords are not the same or contain non-alphabets/non-numbers. Please re-enter.')
        pwd = input('Password: ')  
        pwd2 = input('Re-enter the password to verify: ')
        
# register
    if identity =='owner':
        newRow = spark.createDataFrame([(uname,pwd,'manager',user.uname)], columns)
    else:
        newRow = spark.createDataFrame([(uname,pwd,'worker',user.uname)], columns)
    print('You have successfully signed up for {} {}.'.format(newRow.collect()[0].identity,uname))
    df_users = df_users.union(newRow)   

Proceeding to sign up...
[Kindly note that only alphabets and numbers are allowed in your username and password.]


Please enter the username:  W1
Password:  111
Re-enter the password to verify:  11*


Passwords are not the same or contain non-alphabets/non-numbers. Please re-enter.


Password:  111
Re-enter the password to verify:  111


You have successfully signed up for worker W1.


In [13]:
df_users.show()

+--------+--------+--------+------+
|username|password|identity|leader|
+--------+--------+--------+------+
|    BOSS| boss666|   owner|  BOSS|
|      M1|     111| manager|  BOSS|
|      W1|     111|  worker|    M1|
+--------+--------+--------+------+



- for convenience, let's assume owner and managers have signed up for all, the results are kept in df_users

In [14]:
# Suppose owner has signed up for all managers and manager has signed up for all workers
# the list is generated in repeated processes of sign-up
df_users=spark.read.csv('df_users.csv',inferSchema=True,header=True)
df_users.show()

+--------+--------+--------+------+
|username|password|identity|leader|
+--------+--------+--------+------+
|    BOSS| boss666|   owner|  BOSS|
|      M1|     111| manager|  BOSS|
|      M2|     222| manager|  BOSS|
|      M3|     333| manager|  BOSS|
|      M4|      44| manager|  BOSS|
|      M5|     555| manager|  BOSS|
|      W1|     111|  worker|    M1|
|      W2|     222|  worker|    M1|
|      W3|     333|  worker|    M1|
|      W4|     444|  worker|    M2|
|      W5|     555|  worker|    M2|
|      W6|     666|  worker|    M2|
|      W7|     777|  worker|    M3|
|      W8|     888|  worker|    M3|
|      W9|     999|  worker|    M3|
|     W10|     101|  worker|    M4|
|     W11|     111|  worker|    M4|
|     W12|     121|  worker|    M4|
|     W13|     131|  worker|    M5|
|     W14|     141|  worker|    M5|
+--------+--------+--------+------+
only showing top 20 rows



- Suppose you are a worker and signed in, you cannot sign up for others

### Interface 2: Transaction Records (For Worker)

- Firstly, the worker sign in to his own account (only in this way the inputer will be his username)
- different stores' records are kept in a global database (df_records)

In [15]:
print('Please log in to verify your identity.Enter:')
uname = input('Username: ')
pwd = input('Password: ')

user = Sign(uname,pwd)
res = user.signin(df_users)
signed_in = res[0]
identity = res[1]

t=0
if user.signin(df_users)[0]:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
else:
    while not user.signin(df_users)[0] and t<5:
        print('The username or password is incorrect, please re-enter.')
        uname = input('Username: ')
        pwd = input('Password: ')
        user = Sign(uname,pwd)
        t+=1
if t==5:
    print('You have tried 5 times, your account will be temporarily locked.')
elif t>0:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
    user = Sign(uname,pwd)
    res = user.signin(df_users)
    signed_in = res[0]
    identity = res[1]

Please log in to verify your identity.Enter:


Username:  W1
Password:  111


Welcome! worker W1.


In [17]:
if identity !='worker':
    print('Sorry. Only workers have access to transaction recording.')
else:
    in_out = input("Type 'in' to indicate product imports, 'out' to indicate product sales: ")
    while in_out not in ('in','out'):
        in_out = input("Only type 'in' or 'out'. Please re-try: ")
    # input procedure
    date = input('Date(YYYYMMDD):')
    while len(date)!=8 or not date.isnumeric():
        print('Please follow the format YYYYMMDD, only numbers are allowed.')
        date = input('Date(YYYYMMDD):')
    pid = input('product ID:')
# suppose the workers are trained to know that product_id are like '001','002', etc.
    while not pid.isnumeric():
        print('Invalid, Please check and re-enter.')
        pid = input('product ID: ')    
    unit_price = input('Unit Price: ')
    while not unit_price.isnumeric() and '.' not in unit_price:
        print('Invalid, Please check and re-enter.')
        unit_price = input('Unit Price: ')
    units = input('Units: ')
    while not units.isnumeric():
        print('Invalid, Please check and re-enter.')
        units = input('Units: ')
    inputer = user.uname

    rec = Transaction()
    df_records = rec.add_record(df_records,date,in_out,pid,unit_price,units,inputer)
    print('Item successfully added. Press Shift+Enter to add more.')

Type 'in' to indicate product imports, 'out' to indicate product sales:  20220305
Only type 'in' or 'out'. Please re-try:  out
Date(YYYYMMDD): 20220305
product ID: 001
Unit Price:  10
Units:  10


Item successfully added. Press Shift+Enter to add more.


In [18]:
df_records.show()

+--------+----------+----------+-----+----+-------+
|    date|product_id|unit_price|units|type|inputer|
+--------+----------+----------+-----+----+-------+
|20220304|       001|      10.0|   10|   A|     W1|
|20220305|       001|      10.0|  -10|   A|     W1|
+--------+----------+----------+-----+----+-------+



- for convenience of showing data processing, assume after different worker's input, df_records is the following:

In [19]:
#suppose after workers have recorded transactions, the data now is:
df_records=spark.read.csv('df_records.csv',inferSchema=True,header=True)
df_records.show()

+--------+----------+----------+-----+----+-------+
|    date|product_id|unit_price|units|type|inputer|
+--------+----------+----------+-----+----+-------+
|20220201|         1|        10|   10|   A|     W1|
|20220207|         1|        20|   -5|   A|     W1|
|20220214|         1|        20|   -5|   A|     W1|
|20220221|         1|        10|   20|   A|     W1|
|20220224|         1|        10|  -10|   A|     W1|
|20220202|         4|         1|  100|   B|     W4|
|20220208|         4|         5|  -50|   B|     W4|
|20220215|         6|        20|   10|   C|     W4|
|20220222|         6|        50|   -2|   C|     W4|
|20220225|         6|        50|   -6|   C|     W4|
|20220201|         3|         1|  100|   B|    W10|
|20220207|         3|         5|  -10|   B|    W10|
|20220214|         3|         5|  -10|   B|    W10|
|20220221|         3|         5|  -10|   B|    W10|
|20220224|         3|         5|  -10|   B|    W10|
|20220304|         5|        20|   10|   B|     W7|
|20220214|  

### Interface 3: Product Supply Management (For Managers)

#### Step 1 : Sign in as manager

In [24]:
print('Please log in to verify your identity.Enter:')
uname = input('Username: ')
pwd = input('Password: ')

user = Sign(uname,pwd)
res = user.signin(df_users)
signed_in = res[0]
identity = res[1]

t=0
if user.signin(df_users)[0]:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
else:
    while not user.signin(df_users)[0] and t<5:
        print('The username or password is incorrect, please re-enter.')
        uname = input('Username: ')
        pwd = input('Password: ')
        user = Sign(uname,pwd)
        t+=1
if t==5:
    print('You have tried 5 times, your account will be temporarily locked.')
elif t>0:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
    user = Sign(uname,pwd)
    res = user.signin(df_users)
    signed_in = res[0]
    identity = res[1]

Please log in to verify your identity.Enter:


Username:  M3
Password:  333


Welcome! manager M3.


#### Step 2: Check sold-out products & Decide units to buy for next 7 days

- sold-out means in TODAY , its stock =0 (e.g. in Mar.6, M3 has sold-out product 005, weekly sale speed is 70)
- if when you run the program, it's no longer Mar.6, the weekly sale speed is different given the data
- note that only M3 has sold-out products(Please log in as M3 to check this function)

In [25]:
# Firstly, check the identity
if identity !='manager':
    print('Sorry, only managers have access to this function.')
else:
    psm = Supply(user.uname) # only manage the product supply of its managing store
    df_sold_out = psm.sold_out(df_records)
    df_sale_speed = psm.sale_speed(df_sold_out)
    df_sale_speed.show()
    print('The above table shows products sold-out today, and \
estimated units you need to import for the next 7 days.')
    print('Note: units_to_import = week_sale_speed = last_in_units/(today-last_in_date)*7\n')

+------+----------+---------------+
|leader|product_id|week_sale_speed|
+------+----------+---------------+
|    M3|         5|           70.0|
+------+----------+---------------+

The above table shows products sold-out today, and estimated units you need to import for the next 7 days.
Note: units_to_import = week_sale_speed = last_in_units/(today-last_in_date)*7



### Interface 4: Monthly report & Revenue allocations (For Owner)

#### Step 1: Sign in as Owner

In [26]:
print('Please log in to verify your identity.Enter:')
uname = input('Username: ')
pwd = input('Password: ')

user = Sign(uname,pwd)
res = user.signin(df_users)
signed_in = res[0]
identity = res[1]

t=0
if user.signin(df_users)[0]:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
else:
    while not user.signin(df_users)[0] and t<5:
        print('The username or password is incorrect, please re-enter.')
        uname = input('Username: ')
        pwd = input('Password: ')
        user = Sign(uname,pwd)
        t+=1
if t==5:
    print('You have tried 5 times, your account will be temporarily locked.')
elif t>0:
    print('Welcome! {} {}.'.format(user.signin(df_users)[1],uname))
    user = Sign(uname,pwd)
    res = user.signin(df_users)
    signed_in = res[0]
    identity = res[1]

Please log in to verify your identity.Enter:


Username:  BOSS
Password:  boss666


Welcome! owner BOSS.


#### Step 2: Monthly Report of Sale Performance and Revenue

- Firstly, join tables and add columns for calculations

In [27]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [28]:
# 
month_udf = udf(lambda date:str(date)[:6])
value_udf = udf(lambda unit_price, units: float(-unit_price*units),FloatType())
df_records = df_records.withColumn("month", month_udf(df_records.date))
df_records = df_records.withColumn("value", value_udf(df_records['unit_price'],df_records.units))
spark.catalog.dropTempView('records')
spark.catalog.dropTempView('users')
df_records.registerTempTable('records')
df_users.registerTempTable('users')
df_records = spark.sql('''
select date,product_id,unit_price,units,type,inputer,month,value,u.leader as manager
from records r
left join users u
on r.inputer = u.username
''')

In [34]:
df_records.show()

+--------+----------+----------+-----+----+-------+------+------+-------+
|    date|product_id|unit_price|units|type|inputer| month| value|manager|
+--------+----------+----------+-----+----+-------+------+------+-------+
|20220201|         1|        10|   10|   A|     W1|202202|-100.0|     M1|
|20220207|         1|        20|   -5|   A|     W1|202202| 100.0|     M1|
|20220214|         1|        20|   -5|   A|     W1|202202| 100.0|     M1|
|20220221|         1|        10|   20|   A|     W1|202202|-200.0|     M1|
|20220224|         1|        10|  -10|   A|     W1|202202| 100.0|     M1|
|20220202|         4|         1|  100|   B|     W4|202202|-100.0|     M2|
|20220208|         4|         5|  -50|   B|     W4|202202| 250.0|     M2|
|20220215|         6|        20|   10|   C|     W4|202202|-200.0|     M2|
|20220222|         6|        50|   -2|   C|     W4|202202| 100.0|     M2|
|20220225|         6|        50|   -6|   C|     W4|202202| 300.0|     M2|
|20220201|         3|         1|  100|

In [33]:
if identity !='owner':
    print('Sorry, only the owner have access to this function.')
else:
    print('The system will generate the monthly report. Please select month and dimension to continue: ')
    month = input('Month (YYYYMM): ')
    while len(month) !=6 or not month.isnumeric:
        month = input('Invalid format. Please re-try. Month (YYYYMM): ')
    dimension = input('Choose the dimension in (manager, type, toal): ')
    while dimension not in ('manager','type','total'):
        dimension = input('Invalid format. Choose the dimension in (manager, type, toal): ')
    
    print('\n------{} MONTHLY REPORT------'.format(dimension.upper()))
    report = Monthly_report(df_records)
    res = report.metrics(month,dimension)
    res.orderBy('profit',ascending=False).show()

The system will generate the monthly report. Please select month and dimension to continue: 


Month (YYYYMM):  202202
Choose the dimension in (manager, type, toal):  manager



------MANAGER MONTHLY REPORT------
+------+-------+----------+----------+--------------+------+
| month|manager|sale units|sale value|sale diversity|profit|
+------+-------+----------+----------+--------------+------+
|202202|     M2|       -58|     650.0|             2| 350.0|
|202202|     M4|       -40|     200.0|             1| 100.0|
|202202|     M3|        -6|      60.0|             1|  35.0|
|202202|     M1|       -20|     300.0|             1|   0.0|
|202202|     M5|        -5|     100.0|             1|   0.0|
+------+-------+----------+----------+--------------+------+



#### Step 3: Bonus Allocation

- Evaluate manager performance according to weighted metrics, bonus will be given to the top 3 managers
- The exact amount of bonus is a percentage of total profit, and allocated between top 3 managers according their marks

In [35]:
print('Choose weights for each performance metric (The weights must add up to 100): ')
a = input('Sale Units(%): ')
b = input('Sale Value(%): ')
c = input('Sale Diversity(%): ')
d = input('Profit(%): ')
p = input('And what percentage of total profits do you want to give to managers(%)?')

while int(a)+int(b)+int(c)+int(d) !=100 or not a.isnumeric() \
        or not b.isnumeric() or not c.isnumeric() or not d.isnumeric()\
        or not p.isnumeric() or int(p)>100 or int(p)<0:
    print('The weights and bonus fraction must be numeric and add up to 100, please re-enter:')
    a = input('Sale Units(%): ')
    b = input('Sale Value(%): ')
    c = input('Sale Diversity(%): ')
    d = input('Profit(%): ')
    p = input('Bonus fraction(%): ')
metric_weights = [float(a)/100,float(b)/100,float(c)/100,float(d)/100]
manager_report = report.metrics(month,'manager')

print('\n----TOP3 Manager Mark & Bonus----')
report.bonus(manager_report,metric_weights,p)

Choose weights for each performance metric (The weights must add up to 100): 


Sale Units(%):  20
Sale Value(%):  40
Sale Diversity(%):  10
Profit(%):  50
And what percentage of total profits do you want to give to managers(%)? 50


The weights and bonus fraction must be numeric and add up to 100, please re-enter:


Sale Units(%):  20
Sale Value(%):  40
Sale Diversity(%):  10
Profit(%):  30
Bonus fraction(%):  40



----TOP3 Manager Mark & Bonus----
+------+-------+-----+------+
| month|manager|marks| bonus|
+------+-------+-----+------+
|202202|     M2|353.6|141.44|
|202202|     M1|116.1| 46.44|
|202202|     M4|102.1| 40.84|
+------+-------+-----+------+
only showing top 3 rows

