In [None]:
!pip install streamlit -q

In [None]:
!npm install -g localtunnel

[K[?25h/tools/node/bin/lt -> /tools/node/lib/node_modules/localtunnel/bin/lt.js
+ localtunnel@2.0.2
updated 1 package in 1.792s


In [None]:
!wget -q -O - ipv4.icanhazip.com

34.106.136.242


In [None]:
%%writefile app.py
import streamlit as st
import pandas as pd
import torch
from transformers import BertTokenizer, BertModel
import re
import numpy as np
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize


nltk.download('stopwords')
nltk.download('punkt')

stop_words = set(stopwords.words('english'))

# Loading data
ba_analytics_df = pd.read_excel('data sets BA analytics.xlsx')
data = pd.read_excel('subtests.xlsx')  # Replace with your skill sets file path
data['Skills Assessed'] = data['Skills Assessed'].fillna('').astype(str)

#make list with skills and corresponding weights
def extract_skills_and_weights(text):
    skills_weights = text.split('\n')
    skills_weights_list = []
    for sw in skills_weights:
        match = re.search(r'-\s*([\d.]+)%', sw)
        if match:
            weight = float(match.group(1)) / 100.0
            skill = sw[:match.start()].strip()
            skills_weights_list.append((skill, weight))
    return skills_weights_list

def preprocess_text(text):
    tokens = word_tokenize(text.lower())
    filtered_tokens = [word for word in tokens if word.isalnum() and word not in stop_words]
    return ' '.join(filtered_tokens)

def get_embeddings(text_list):
    inputs = tokenizer(text_list, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    embeddings = outputs.last_hidden_state[:, 0, :]
    return embeddings

def match_skills_with_tests(skill_set, skill_offset, test_names, skills_embeddings, tests_embeddings):
    matches = []
    for i, (skill, weight) in enumerate(skill_set):
        skill_embedding = torch.tensor(skills_embeddings[skill_offset + i])
        similarities = []
        for j, test_embedding in enumerate(tests_embeddings):
            test_embedding = torch.tensor(test_embedding)
            similarity = torch.nn.functional.cosine_similarity(skill_embedding.unsqueeze(0), test_embedding.unsqueeze(0), dim=1).item()
            similarities.append((test_names[j], similarity))
        matches.append(sorted(similarities, key=lambda x: x[1], reverse=True))
    return matches

def select_suitable_tests(skill_set, matches, data, min_time=40, max_time=60):
    weighted_matches = []
    for i, skill in enumerate(skill_set):
        skill_weight = skill[1]
        weighted_scores = [(match[0], match[1] * skill_weight if match[1] > 0 else 0) for match in matches[i]]
        weighted_matches.append(sorted(weighted_scores, key=lambda x: x[1], reverse=True))

    final_tests = {}
    for test, score in sum(weighted_matches, []):
        if test in final_tests:
            final_tests[test] += score
        else:
            final_tests[test] = score
    sorted_tests = sorted(final_tests.items(), key=lambda x: x[1], reverse=True)

    #Time constraints
    selected_tests = []
    total_time = 0

    for test, score in sorted_tests:
        test_time = data[data.apply(lambda row: f"{row['Test Type']} - {row['Subcategory']}", axis=1) == test]['Time'].values[0]
        try:
            test_time = int(test_time)
        except ValueError:
            test_time = 0

        if total_time + test_time <= max_time:
            selected_tests.append((test, score))
            total_time += test_time
        if total_time >= min_time:
            break


    if total_time < min_time:
        for test, score in sorted_tests[len(selected_tests):]:
            test_time = data[data.apply(lambda row: f"{row['Test Type']} - {row['Subcategory']}", axis=1) == test]['Time'].values[0]
            try:
                test_time = int(test_time)
            except ValueError:
                test_time = 0

            selected_tests.append((test, score))
            total_time += test_time
            if total_time >= max_time:
                break

    return selected_tests, total_time, sorted_tests

def find_similar_tests(selected_tests, tests_embeddings, threshold=0.9):
    similar_tests = {}
    selected_test_names = [test[0] for test in selected_tests]
    selected_test_embeddings = [tests_embeddings[test_names.index(test[0])] for test in selected_tests]

    for i, (test1, _) in enumerate(selected_tests):
        similar_tests[test1] = []
        embedding1 = torch.tensor(selected_test_embeddings[i])
        for j, (test2, _) in enumerate(selected_tests):
            if i != j:
                embedding2 = torch.tensor(selected_test_embeddings[j])
                similarity = torch.nn.functional.cosine_similarity(embedding1.unsqueeze(0), embedding2.unsqueeze(0), dim=1).item()
                if similarity > threshold:
                    similar_tests[test1].append((test2, similarity))
    return similar_tests

def remove_lowest_score_test(selected_tests, similar_tests, data):
    test_to_remove = None
    for test1, similars in similar_tests.items():
        if len(similars) > 0:
            similar_test = max(similars, key=lambda x: x[1])
            test1_score = next(score for test, score in selected_tests if test == test1)
            similar_test_score = next(score for test, score in selected_tests if test == similar_test[0])
            if test1_score < similar_test_score:
                test_to_remove = test1
            else:
                test_to_remove = similar_test[0]
            break

    if not test_to_remove:
        test_to_remove = min(selected_tests, key=lambda x: x[1])[0]

    test_time = data[data.apply(lambda row: f"{row['Test Type']} - {row['Subcategory']}", axis=1) == test_to_remove]['Time'].values[0]
    return test_to_remove, int(test_time)


skill_sets = []
for index, row in ba_analytics_df.iterrows():
    skill_set = extract_skills_and_weights(row['Constructs and Weights (Step 3)'])
    skill_sets.append(skill_set)

for skill_set in skill_sets:
    for i, (skill, weight) in enumerate(skill_set):
        skill_set[i] = (preprocess_text(skill), weight)

data['Combined'] = (data['Description'] + " " + data['Skills Assessed']).apply(preprocess_text)


tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

skills_text = [skill for skill_set in skill_sets for skill, weight in skill_set]
tests_text = data['Combined'].tolist()

skills_embeddings = get_embeddings(skills_text)
tests_embeddings = get_embeddings(tests_text)

test_names = data.apply(lambda row: f"{row['Test Type']} - {row['Subcategory']}", axis=1).tolist()


st.title("Skill to Test Matching App")


skills_input = st.text_area("Enter your skill set (comma-separated, with weights like 'Skill1:1, Skill2:0.5'):")
skills = [(preprocess_text(skill.split(":")[0].strip()), float(skill.split(":")[1].strip())) for skill in skills_input.split(",") if ":" in skill]

if 'step' not in st.session_state:
    st.session_state.step = 1

if 'detailed_matches' not in st.session_state:
    st.session_state.detailed_matches = []
if 'matched_tests' not in st.session_state:
    st.session_state.matched_tests = []
if 'total_time' not in st.session_state:
    st.session_state.total_time = 0
if 'all_tests' not in st.session_state:
    st.session_state.all_tests = []

if st.session_state.step == 1:
    if st.button("Find Tests"):
        skill_offset = 0
        detailed_matches = match_skills_with_tests(skills, skill_offset, test_names, skills_embeddings, tests_embeddings)
        matched_tests, total_time, all_tests = select_suitable_tests(skills, detailed_matches, data)

        st.session_state.detailed_matches = detailed_matches
        st.session_state.matched_tests = matched_tests
        st.session_state.total_time = total_time
        st.session_state.all_tests = all_tests
        st.session_state.step = 2

if st.session_state.step >= 2:
    st.write("Recommended Tests:")
    for test, score in st.session_state.matched_tests:
        st.write(f"Test: {test} - Score: {score:.2f}")
    st.write(f"Total Test Duration: {st.session_state.total_time} minutes")

    if st.session_state.step == 2:
        change_num_tests = st.text_input("Do you want to change the number of tests? (yes/no):")

        if change_num_tests:
            if change_num_tests.lower() == 'yes':
                st.session_state.step = 3
            elif change_num_tests.lower() == 'no':
                st.write("DONE")
                test_names_to_save = [test[0] for test in st.session_state.matched_tests]
                pd.DataFrame(test_names_to_save, columns=["Test Names"]).to_csv("selected_tests.csv", index=False)
                st.session_state.step = 1
            st.experimental_rerun()

    elif st.session_state.step == 3:
        adjust_choice = st.text_input("Do you want to increase or decrease the number of tests? (increase/decrease):")

        if adjust_choice:
            if adjust_choice.lower() == 'increase':
                current_tests = st.session_state.matched_tests
                additional_test = [test for test in st.session_state.all_tests if test not in current_tests]
                if additional_test:
                    additional_test = additional_test[0]
                    st.session_state.matched_tests.append(additional_test)
                    st.session_state.total_time += data[data.apply(lambda row: f"{row['Test Type']} - {row['Subcategory']}", axis=1) == additional_test[0]]['Time'].values[0]

                    st.write("Updated Recommended Tests:")
                    for test, score in st.session_state.matched_tests:
                        st.write(f"Test: {test} - Score: {score:.2f}")

                    st.write(f"Updated Total Test Duration: {st.session_state.total_time} minutes")

            elif adjust_choice.lower() == 'decrease':
                current_tests = st.session_state.matched_tests
                similar_tests = find_similar_tests(current_tests, tests_embeddings)
                test_to_remove, test_time = remove_lowest_score_test(current_tests, similar_tests, data)
                st.session_state.matched_tests = [test for test in current_tests if test[0] != test_to_remove]
                st.session_state.total_time -= test_time

                st.write("Updated Recommended Tests:")
                for test, score in st.session_state.matched_tests:
                    st.write(f"Test: {test} - Score: {score:.2f}")

                st.write(f"Updated Total Test Duration: {st.session_state.total_time} minutes")


            st.session_state.step = 2
            st.experimental_rerun()


In [None]:
!streamlit run app.py & npx localtunnel --port 8501


Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.106.218.249:8501[0m
[0m
[K[?25hnpx: installed 22 in 3.186s
your url is: https://long-towns-tickle.loca.lt
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
2024-06-27 20:18:40.711 Uncaught app exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/streamlit/runtime/scriptrunner/script_runner.py", line 589, in _run_script
    exec(code, module.__dict__)
  File "/content/app.py", line 158, in <module>
    skills_embeddings = get_embeddings(skill_texts)
  File "/