## Version 3
- No Manual prescoring
  

In [None]:
import sys
from pathlib import Path
root_pth = str(Path.cwd().parent)
if root_pth not in sys.path:
    sys.path.insert(0, root_pth)

## General Preprocessing
- Load data
- Strip whitespace 
- Convert "Creation date" column to string
- Add year to "Name" column where missing, use year from creation date: "PDP |" -> "PDP 2020 |"
- Add delimiter to "Name" column where missing: "PDP 2020 |" -> "PDP | 2020 |"   
- Lower text in Name column
- Seperate Url from Description


In [None]:
import re
import string
from datetime import datetime
from IPython.display import display_html
import pandas as pd

class TextPreprocessing():

    def __init__(self,pth):
            self.punctuation = list(string.punctuation)
            self.has_year_in_it = r".*([1-3][0-9]{3})"
            self.has_pipe_in_it = r"\bPDP\b\s\|"
            self.mlen = 50
            self.pth = pth 

    def get_data(self):
        self.data = pd.read_excel(self.pth)
        return(self)



    def add_missing_year(self,verbose=True):
            name = self.data["Name"]
            goal_or_result = self.data["Goal / Key Result"]
            date = self.data["Deadline"]
            print("--- Add missing year ".ljust(79,'-')) 
            for count, val in enumerate(name):
                if goal_or_result[count] == "Goal":
                    year = datetime.strptime(date[count], "%d-%m-%Y").year
                    if not re.match(self.has_year_in_it, val):
                        string_with_year = f"PDP {str(year)} " + val.split("PDP ", 1)[1]
                        
                        if verbose:
                            short_form_pre = val[:self.mlen].ljust(self.mlen )
                            short_form_post = string_with_year[:self.mlen].ljust(self.mlen)
                            print(short_form_pre, "->", short_form_post)
                        
                        name.iat[count] = string_with_year
                        
            self.data["Name"] = name
              
            return self



    def add_missing_delimiter(self, verbose=True):
        
        name = self.data["Name"]
        print("--- Add missing delimiter ".ljust(79,'-'))
        for count, val in enumerate(name):
            print(val)
            missing_year = re.match(self.has_year_in_it, val)
            with_pipe = re.match(self.has_pipe_in_it, val) 
            if missing_year and not with_pipe:
                string_with_pipe = "PDP | " + val.split("PDP ", 1)[1]

                if verbose:
                        short_form_pre = val[:self.mlen].ljust(self.mlen )
                        short_form_post = string_with_pipe[:self.mlen].ljust(self.mlen)
                        print(short_form_pre, "->", short_form_post)

                name.iat[count] = string_with_pipe

        self.data["Name"] = name
         
        return self

    def extract_url(self):
        url = []
        description = []

        for d in self.data["Description"]:
            match = re.search(r'http\S+',str(d))
            if not isinstance(d,float) and match:
                url.append(match[0])
                description.append(re.sub(r'http\S+', '', d))
            else:
                url.append("")
                description.append("")
        self.data["Url"] = url
        self.data["Description"] = description

        print("--- Extract url ".ljust(79,'-'))
        display_html(self.data[["Url","Description"]].head(5))
        return(self)

    def lower_col(self,colnames):
        for col in  colnames:
            self.data[col] = self.data[col].str.lower()
        return(self)

    def rm_punctuations(self,colnames):
        """Removes hyphens from field 
        """
        import re
        for col in  colnames:
            self.data[col] = self.data[col].apply(lambda x: 
                                    re.sub("-","",x) if isinstance(x,str)
                                    else x 
                                        )
        return(self)

    def strip_cols(self,colnames):
        for col in  colnames:
            self.data[col] = self.data[col].str.strip()
        return(self)

    def change_col_type(self):
        self.data = self.data.astype({"Deadline": str}, errors="raise")
        return(self)

    def filter_by_value(self):
        """Split data into Goals and Results dfs.
        """
        print('Split Data into Goals and Results')
        self.Goals = self.data[self.data["Goal / Key Result"] == "Goal"].reset_index(drop=True)
        self.Results = self.data[self.data["Goal / Key Result"] == "Key Result"].reset_index(drop=True)
        return(self)

    def split_by_pipe(self):
        """Split Name by delimiter 

        Example: 
        Name
        PDP 2022 | Q2 | AWS Machine Learning Plan durc...



        Returns:
            _type_: _description_
        """
        new_columns = ["Type", "Goal Year", "Goal Quarter", "Tmp"]
        self.Goals[new_columns] = self.Goals["Name"].str.split("|", expand=True)
        self.Goals[new_columns] = self.Goals[new_columns].apply(lambda x: x.str.strip())
        return self

    @property
    def drop(self):
        return self.data.drop
    
    @property
    def rename(self):
        return self.data.rename
    
    @property
    def apply(self):
        return self.data.apply

    @property
    def loc(self):
        return self.data.loc

    def split_name_by_pipe(self):
        import numpy as np
        new_columns = ["Type", "Goal Year", "Goal Quarter", "Tmp"]
        self.data[new_columns] = self.data["Name"].str.split("|", expand=True)
        self.data[new_columns] = self.data[new_columns].apply(lambda x: x.str.strip())
        self.data = self.data.drop(columns="Name")
        #
        idx_result = np.where(self.data["Goal / Key Result"] == "Key Result")
        self.data.loc[idx_result[0],"Tmp"] = self.data.iloc[idx_result[0]]["Type"]
        #
        idx_type = np.where(self.data["Type"] != "pdp")
        self.data.loc[idx_type[0],"Type"] = None

        self.data.rename(columns={"Tmp": "Name"}, inplace=True)
        return (self)

    def preprocess_goals(self):
            print("Split Name into Type, Goal Year, Goal Quarter and Topic")
            self.split_by_pipe()
            print("Drop Name and Parent ID")
            self.Goals.drop(columns=["Name", "Parent ID"], inplace=True)
            print("Rename Tmp back into Name")
            self.Goals.rename(columns={"Tmp": "Name"}, inplace=True)
            return(self)
    
    def combine_text(self):

        self.data["text"] = self.data["Name"] + ". " + self.data["Description"] 
        return(self)

    def fill_na(self, columns_to_fill):
        for col in columns_to_fill:
            self.data[col].fillna(0,inplace=True)
        return(self)

    def capitalize_each_Word(self,colname):
        """Returns a list of tuples. 1st element is the old string and 2nd element
        the new string where each word is capitalized.
        This list can be used as replacement parameter for replace_values.
        """
        import numpy as np
        replacements = []
        for old in self.data[colname].unique():

            if not isinstance(old ,float):
                #Capitalize Word if first letter is small else use word
                # This way SEO does not turn into Seo 
                new_list = [x if str.isupper(x[0]) else str.capitalize(x) for x in old.split()]
                new = " ".join(new_list)
                if len({old, new}) > 1:
                    replacements.append((old,new))

        return(replacements)
        
    def replace_values(self,colname,replacement: list):
        
        for values in replacement:
            if len(set(values))>1:
                print(f"Replace {values[0]} with {values[1]}")
                self.data[colname] = self.data[colname].str.replace(values[0],values[1])
        return(self)
    
    def replace_word(self,colname,replacement: list):
        
        for values in replacement:
            if len(set(values))>1:
                print(f"Replace {values[0]} with {values[1]}")
                self.data[colname] = self.data["Role"].str.replace(fr'\b{values[0]}\b', values[1], regex=True)
        return(self)
            

    def make_months_column(self):
        self.data["Leapsome Deadline Months"] = pd.to_datetime(self.data["Deadline"],infer_datetime_format=True)
        return(self)
    
    #TODO how to use this function as a property


    def get_dico_quarters(self,num,colname):
        def dico_quarters(d):
        
            MonthNr = d.month
            year = d.year
            
            #if not isinstance(MonthNr, float):
                #return("Unknown")
            if MonthNr >= 1 and MonthNr < 4:
                return(f"{year}-Q1")
            elif MonthNr >= 4 and MonthNr <= 6:
                return(f"{year}-Q2") 
            elif MonthNr >= 7 and MonthNr <= 9:
                return(f"{year}-Q3")
            else:
                return(f"{year}-Q4")

        self.data[colname] = self.data["Leapsome Deadline Months"].apply(lambda x: dico_quarters(x) if isinstance(x,pd.Timestamp) else "unknown")
        return(self)
    

In [None]:
fn = "PDP_Goals_data_Export_Leapsome_2022-06-10.xlsx"
filepath = f"../data/raw/{fn}"

In [None]:
TP = TextPreprocessing(pth = filepath)
pdp = TP.get_data()
pdp = pdp.replace_values('Managed By',[('Laura Wiegmann', 'Steffen Blankenbach')])
pdp = pdp.strip_cols(["Name","Description"])
pdp = pdp.add_missing_year()
pdp = pdp.add_missing_delimiter()
pdp = pdp.lower_col(["Name","Description"])
pdp = pdp.extract_url()
pdp = pdp.rm_punctuations(["Name","Description"])
pdp = pdp.split_name_by_pipe()
pdp = pdp.combine_text()
#pdp.data = pdp.data.rename(columns={"level": "Level"}, inplace=True)
pdp = pdp.make_months_column()
pdp = pdp.get_dico_quarters(True,"Leapsome Deadline Quarter")

In [None]:
pdp.data.columns

## Split and merge Goals and Results


In [None]:
pdp = pdp.filter_by_value()

### Goals

In [None]:
#pdp = pdp.preprocess_goals()

In [None]:
def categorize_sentence(sentence):
    """_summary_

    Args:
        sentence (str): A sentence that needs to be categorized.

    Returns:
        str: A predefinefd category
    """
    str_sentence = str(sentence)

    if any(map(str_sentence.__contains__, ["training"])):
        return "Training"
    elif any(map(str_sentence.__contains__, ["completion", "done"])):
        return "Completion"
    elif any(map(str_sentence.__contains__, ["certifi", "zertifi"])):
        return "Certification"
    else:
        return ""

pdp.Goals["Name_Category"] = pdp.Goals["Name"].apply(categorize_sentence)
pdp.Goals["Description_Category"] = pdp.Goals["Description"].apply(categorize_sentence)

pdp.Goals["Category"] = ""
for count, val in enumerate(pdp.Goals["Name_Category"]):
    NC = pdp.Goals.loc[count, "Name_Category"]
    DC = pdp.Goals.loc[count, "Description_Category"]
    new_word = None
    if NC == "" and DC == "":
        new_word = ""
    elif NC == DC:
        new_word = NC
    elif NC != "" and DC != "":
        new_word = f"{NC} & {DC}"
    else:
        new_word = DC if NC == "" else NC
    pdp.Goals.loc[count, ["Category"]] = new_word



In [None]:
pdp.Goals.head(5)

In [None]:
pdp.Goals.columns

In [None]:
pdp.Goals[["Name", "Name_Category", "Description_Category", "Category"]].head(15)

In [None]:
Unknown_Category = pdp.Goals[pdp.Goals["Category"] == ""][
    ["Managed By", "Name", "Description", "Category"]
]
Unknown_Category

###  Results

The results tbale has many columns with banks because Leapsome does not track these values.
- Creation date
- Deadline
- Weight
- Type
- Status
- Last update
- Tag 1
- Contributor 1

Other columns have same data as in Goals
- Owner
- Owner ID
- Owner Email
- Managed By
- Managed By ID
- Managed By Email

In [None]:
#Results = pdp.Results

#Results["Description"] = Results["Name"] 


Goal_merge =  pdp.Goals.copy(deep= True)
Goal_merge.drop(
    columns=[
        "level",
        "Technologie_condense",
        "Location"],
    inplace=True,
    errors='ignore')

Result_merge = pdp.Results.copy(deep = True)




Result_merge.drop(
    columns=[
        "Name",
        "Creation date",
        "Deadline",
        "Last Update",
        "Tag 1",
        "Contributor 1",
        "Owner",
        "Owner ID",
        "Owner Email",
        "Managed By",
        "Managed By ID",
        "Managed By Email",
        "Weight",
        "Type",
        "Goal / Key Result",
        "Leaning Platform",
        "Status",
        "Goal Year",
        "Goal Quarter"
    ],
    inplace=True,
    errors='ignore'
)


In [None]:
Goal_merge.columns

In [None]:
Result_merge.columns

In [None]:
Merged = pd.merge(
    Goal_merge,
    Result_merge,
    left_on="ID",
    right_on="Parent ID",
    how="left",
    suffixes=("_Goals", "_Results"),
)

Merged.drop(columns={'ID_Goals'},inplace = True)

Merged.rename(columns={'ID_Results':'ID'},inplace = True)



In [None]:
Merged.columns

In [None]:
sorted_columns = Merged.columns.sort_values()
first_cols = ['Location',
                'Goal Year', 
                'Goal Quarter',
                'Managed By',
                'Owner']
sorted_columns = sorted_columns.drop(first_cols)

new_order = first_cols + list(sorted_columns)
new_order


In [None]:
Merged = Merged.reindex(columns=new_order)
Merged.columns

In [None]:
Merged.head(3)

In [None]:
Merged["Goal Quarter"] = Merged["Goal Quarter"].str.capitalize() 

## Save goals and results

In [None]:
"""
filepath = Path("../data/pyGoals_v2.csv")
filepath.parent.mkdir(parents=True, exist_ok=True)
pdp.Goals.to_csv(filepath)

filepath = Path("../data/pyResults_v2.csv")
filepath.parent.mkdir(parents=True, exist_ok=True)
Results.to_csv(filepath)

#filepath = Path("pyUnknown_Category.csv")
#filepath.parent.mkdir(parents=True, exist_ok=True)
#Unknown_Category.to_csv(filepath)

filepath = Path("../data/pyMerged_v2.csv")
filepath.parent.mkdir(parents=True, exist_ok=True)
Merged.to_csv(filepath) """

In [None]:
with pd.ExcelWriter("../data/py/pyGoals_v2.xlsx") as writer:
    pdp.Goals.to_excel(writer)  

with pd.ExcelWriter("../data/py/pyResults_v2.xlsx") as writer:
    pdp.Results.to_excel(writer)  

with pd.ExcelWriter("../data/py/pyMerged_v2.xlsx") as writer:
    Merged.to_excel(writer)  

with pd.ExcelWriter("../data/py/pyAppended.xlsx") as writer:
    pdp.data.to_excel(writer)  

Files need to be copied to /Users/redzesas/Library/CloudStorage/OneDrive-diconiumGmbH/Documents/projekte/power bi

In [None]:
pth = "/Users/redzesas/Library/CloudStorage/OneDrive-diconiumGmbH/Documents/projekte/power bi/pdp_report/"

with pd.ExcelWriter(f"{pth}pyGoals_v2.xlsx") as writer:
    pdp.Goals.to_excel(writer)  

with pd.ExcelWriter(f"{pth}pyResults_v2.xlsx") as writer:
    pdp.Results.to_excel(writer)  

with pd.ExcelWriter(f"{pth}pyAppended.xlsx") as writer:
    pdp.data.to_excel(writer)  

In [None]:
pdp.data[pdp.data[ 'Goal Quarter']=="q3"]