In [2]:
from pymongo import MongoClient
import logging
from typing import List, Dict


class MongoDBClient:
    def __init__(self, uri: str, database_name: str, collection_name: str, test_mode: bool):
        try:
            self.client = MongoClient(uri)
            self.test_mode = test_mode
            if test_mode:
                self.db = self.client['test_db']  # Use test database if in test mode
            else:
                self.db = self.client[database_name]  # Use production database
            self.collection = self.db[collection_name]
            logging.info(f"Connected to MongoDB database: {self.db.name}, collection: {self.collection.name}")
        except Exception as e:
            logging.error(f"Error connecting to MongoDB: {e}")
            raise


    def make_index(self, index_fields) -> None:
        """
        Creates an index on the specified fields in the collection.

        Args:
            index_fields (str or list): Field(s) for the index. Can be a single field as a string or multiple fields as a list of tuples.
        """
        try:
            # If a single string is passed, create a single field index
            if isinstance(index_fields, str):
                self.collection.create_index(index_fields, unique=True)
                logging.info(f"Index created on {index_fields}")
            # If a list of tuples is passed, create a compound index
            elif isinstance(index_fields, list) and all(isinstance(field, tuple) for field in index_fields):
                self.collection.create_index(index_fields, unique=True)
                logging.info(f"Compound index created on {index_fields}")
            else:
                logging.error("Invalid index format. Provide a string or list of tuples for compound indexes.")
        except Exception as e:
            logging.error(f"Error creating index: {e}")


    def insert_document(self, doc: dict, col_name: str = None) -> bool:
        """Inserts a document into the collection."""
        collection = self.collection if col_name is None else self.db[col_name]
        try:
            collection.insert_one(doc)
            logging.info(f"Document inserted into {collection.name}")
            return True
        except Exception as e:
            logging.error(f"Error inserting document: {e}")
            return False

    def update_document(self, query: dict, update: dict, upsert: bool = True, col_name: str = None):
        """Updates or inserts a document based on the query."""
        collection = self.collection if col_name is None else self.db[col_name]
        try:
            collection.update_one(query, {'$set': update}, upsert=upsert)
            logging.debug(f"Document updated or inserted in {collection.name}")
        except Exception as e:
            logging.error(f"Error updating document: {e}")

    def insert_documents(self, docs: List[Dict], col_name: str) -> bool:
        """Performs bulk insertion of documents into the specified collection."""
        collection = self.db[col_name]
        if docs:
            try:
                collection.insert_many(docs)
                logging.info(f"Inserted {len(docs)} documents into {collection.name} in bulk.")
                return True
            except Exception as e:
                logging.error(f"Error during bulk insertion into {collection.name}: {e}")
                return False
        else:
            logging.info("No documents to insert.")
            return False

    def update_many_documents(self, query: dict, update: dict, col_name: str):
        """Updates multiple documents in the specified collection."""
        collection = self.db[col_name]
        try:
            result = collection.update_many(query, {'$set': update})
            logging.info(f"Updated {result.modified_count} documents in {collection.name}")
        except Exception as e:
            logging.error(f"Error updating documents in {collection.name}: {e}")

    def query_documents(self, query: dict, projection: dict = None, col_name: str = None):
        """Queries documents from the collection."""
        collection = self.collection if col_name is None else self.db[col_name]
        try:
            results = collection.find(query, projection)
            logging.debug(f"Queried documents from {collection.name}")
            return results
        except Exception as e:
            logging.error(f"Error querying documents: {e}")
            return None

    def change_database_and_collection(self, new_database_name: str = None, new_collection_name: str = None) -> None:
        """Changes the database and/or collection to new specified names."""
        try:
            # If test_mode is False, allow changing the database
            if not self.test_mode and new_database_name:
                self.db = self.client[new_database_name]
                logging.info(f"Database changed to: {self.db.name}")

            # Change collection if new_collection_name is provided
            if new_collection_name:
                self.collection = self.db[new_collection_name]
                logging.info(f"Collection changed to: {self.collection.name}")
        except Exception as e:
            logging.error(f"Error changing database and/or collection: {e}")

    def close_connection(self) -> None:
        """Closes the connection to MongoDB."""
        try:
            self.client.close()
            logging.info("MongoDB connection closed.")
        except Exception as e:
            logging.error(f"Error closing MongoDB connection: {e}")

In [3]:
import logging
from typing import Dict, List
from pymongo import ASCENDING


class QualifiedService:
    def __init__(self, mdb_client: MongoDBClient):
        """
        Initializes the QualifiedService with a MongoDBClient instance.

        Args:
            mdb_client (MongoDBClient): An instance of MongoDBClient.
        """
        self.mdb_client = mdb_client

    def get_place_of_work_count_grouped_by_role_and_state(
            self, country: str, state: str, role: str
    ) -> List[Dict]:
        """
        Queries the 'qualified' collection to get the count of 'place_of_work' grouped
        for a specific country, state, and role.

        Args:
            country (str): The country to filter by.
            state (str): The state to filter by.
            role (str): The role to filter by.

        Returns:
            List[Dict]: A list of dictionaries containing the count of 'place_of_work' occurrences.
        """
        try:
            # Use the change_database_and_collection method to switch to the 'qualified' collection
            self.mdb_client.change_database_and_collection(new_collection_name="qualified")
            # Construct the match query
            match_query = {
                "country": country,
                "state": state,
                "role": role
            }

            # Perform aggregation to group and count only by 'place_of_work'
            pipeline = [
                {"$match": match_query},
                {
                    "$group": {
                        "_id": "$place_of_work",
                        "count": {"$sum": 1}
                    }
                },
                {"$sort": {"count": ASCENDING}}  # Sort by count in ascending order
            ]

            # Use the aggregate method from the MongoDB client
            results = self.mdb_client.collection.aggregate(pipeline)
            grouped_data = [doc for doc in results]

            logging.info(f"Queried and grouped data for country: {country}, state: {state}, role: {role}")
            return grouped_data
        except Exception as e:
            logging.error(f"Error querying and grouping data: {e}")
            return []

    def get_bigram_data_by_country_state_role(
            self, country: str, state: str, role: str
    ) -> Dict[str, List[Dict]]:
        """
        Queries the 'bigram' collection to get tools, libraries, skills, and languages
        for a specific country, state, and role.

        Args:
            country (str): The country to filter by.
            state (str): The state to filter by.
            role (str): The role to filter by.

        Returns:
            Dict[str, List[Dict]]: A dictionary containing tools, libraries, skills, and languages data.
        """
        try:
            # Use the change_database_and_collection method to switch to the 'bigram' collection
            self.mdb_client.change_database_and_collection(new_collection_name="bigrams")

            # Construct the query
            query = {
                "country": country,
                "state": state,
                "role": role
            }

            # Specify the projection to include only the desired fields
            projection = {
                "tools": 1,
                "libraries": 1,
                "skills": 1,
                "languages": 1,
                "_id": 0  # Exclude the _id field if not needed
            }

            # Query the 'bigram' collection with the projection
            result = self.mdb_client.query_documents(query, projection)
            document = next(result, None)  # Get the first document if available

            if document:
                tools = document.get("tools", [])
                libraries = document.get("libraries", [])
                skills = document.get("skills", [])
                languages = document.get("languages", [])

                logging.info(f"Fetched bigram data for country: {country}, state: {state}, role: {role}")
                return {
                    "tools": tools,
                    "libraries": libraries,
                    "skills": skills,
                    "languages": languages
                }
            else:
                logging.info(f"No bigram data found for country: {country}, state: {state}, role: {role}")
                return {"tools": [], "libraries": [], "skills": [], "languages": []}
        except Exception as e:
            logging.error(f"Error querying bigram data: {e}")
            return {"tools": [], "libraries": [], "skills": [], "languages": []}

    def get_education_data_by_country_state_role(
            self, country: str, state: str, role: str
    ) -> List[Dict]:
        """
        Queries the 'bigram' collection to get the education data
        for a specific country, state, and role.

        Args:
            country (str): The country to filter by.
            state (str): The state to filter by.
            role (str): The role to filter by.

        Returns:
            List[Dict]: A list containing education data. Returns an empty list if no data is found.
        """
        try:
            # Use the change_database_and_collection method to switch to the 'bigram' collection
            self.mdb_client.change_database_and_collection(new_collection_name="bigrams")

            # Construct the query
            query = {
                "country": country,
                "state": state,
                "role": role
            }

            # Specify the projection to include only the 'education' field
            projection = {
                "education": 1,
                "_id": 0  # Exclude the _id field if not needed
            }

            # Query the 'bigram' collection with the projection
            result = self.mdb_client.query_documents(query, projection)
            document = next(result, None)  # Get the first document if available

            if document:
                education = document.get("education", [])
                logging.info(f"Fetched education data for country: {country}, state: {state}, role: {role}")
                return education
            else:
                logging.info(f"No education data found for country: {country}, state: {state}, role: {role}")
                return []
        except Exception as e:
            logging.error(f"Error querying education data: {e}")
            return []

    def get_freq_grouped_by_state(self, country: str) -> List[Dict]:
        """
        Queries the 'qualified' collection to count the number of records grouped by state
        for a specific country, excluding the state "ALL".

        Args:
            country (str): The country to filter by.

        Returns:
            List[Dict]: A list of dictionaries containing state and count of records, excluding the state "ALL".
        """
        try:
            # Use the change_database_and_collection method to switch to the 'qualified' collection
            self.mdb_client.change_database_and_collection(new_collection_name="qualified")

            # Construct the match query to filter by country and exclude the state "All"
            match_query = {
                "country": country,
                "state": {"$ne": "All"}  # Exclude the state "All"
            }

            # Perform aggregation to group by state and count the number of records
            pipeline = [
                {"$match": match_query},
                {
                    "$group": {
                        "_id": "$state",
                        "count": {"$sum": 1}
                    }
                },
                {"$sort": {"count": -1}}  # Sort by count in descending order
            ]

            # Use the aggregate method from the MongoDB client
            results = self.mdb_client.collection.aggregate(pipeline)
            grouped_data = [{"state": doc["_id"], "count": doc["count"]} for doc in results]

            logging.info(f"Fetched record count grouped by state for country: {country}, excluding state: ALL")
            return grouped_data
        except Exception as e:
            logging.error(f"Error querying record count grouped by state: {e}")
            return []




In [4]:
mongo_uri = "mongodb+srv://nkalpam:nkalpam123@linkedinjobs.92q9w.mongodb.net/?retryWrites=true&w=majority&appName=LinkedinJobs"
database_name ="linkedindb_prod"
collection_raw = "clean"
test_mode = False

In [None]:
mongo_client = MongoDBClient(
    uri=mongo_uri,
    database_name=database_name,
    collection_name=collection_raw,
    test_mode=test_mode
)


: 

In [9]:
qs = QualifiedService(mongo_client)

In [10]:
powData = qs.get_place_of_work_count_grouped_by_role_and_state(country="United States",state= "All",role= "Data Analyst")

In [11]:
usaFreq = qs.get_freq_grouped_by_state(country="United States")

In [17]:
bigramsData = qs.get_bigram_data_by_country_state_role(country="United States",state= "All",role= "Data Analyst")

In [18]:
bigramsData

{'tools': [{'bigram': ['microsoft', 'office'], 'score': 8},
  {'bigram': ['publisher', 'tableau'], 'score': 2},
  {'bigram': ['power', 'bi'], 'score': 210},
  {'bigram': ['bi', 'tableau'], 'score': 8},
  {'bigram': ['powerbi', 'tableau'], 'score': 6},
  {'bigram': ['salesforce', 'marketing'], 'score': 2},
  {'bigram': ['g', 'tableau'], 'score': 2},
  {'bigram': ['tableau', 'power'], 'score': 21},
  {'bigram': ['like', 'tableau'], 'score': 4},
  {'bigram': ['analytics', 'adobe'], 'score': 4},
  {'bigram': ['adobe', 'google'], 'score': 3},
  {'bigram': ['quicksight', 'tableau'], 'score': 2},
  {'bigram': ['server', 'postgresql'], 'score': 3},
  {'bigram': ['postgresql', 'oracle'], 'score': 3},
  {'bigram': ['adobe', 'analytics'], 'score': 9},
  {'bigram': ['focus', 'mysql'], 'score': 4},
  {'bigram': ['mysql', 'database'], 'score': 2},
  {'bigram': ['using', 'tableau'], 'score': 5},
  {'bigram': ['tableau', 'prep'], 'score': 2},
  {'bigram': ['tableau', 'spotfire'], 'score': 2},
  {'bigr

In [13]:
educationData  = qs.get_education_data_by_country_state_role(country="United States",state= "All",role= "Data Analyst")

In [16]:
educationData

[{'bigram': ['bachelor', 'degree'], 'score': 6},
 {'bigram': ['field', 'phd'], 'score': 2},
 {'bigram': ['degree', 'business'], 'score': 16},
 {'bigram': ['degree', 'management'], 'score': 4},
 {'bigram': ['similar', 'degree'], 'score': 4},
 {'bigram': ['intermediate', 'degree'], 'score': 2},
 {'bigram': ['degree', 'independence'], 'score': 2},
 {'bigram': ['master', 'degree'], 'score': 11},
 {'bigram': ['degree', 'preferably'], 'score': 2},
 {'bigram': ['degree', 'program'], 'score': 3},
 {'bigram': ['degree', 'relevant'], 'score': 2},
 {'bigram': ['degree', 'data'], 'score': 2},
 {'bigram': ['degree', 'environmental'], 'score': 2},
 {'bigram': ['degree', 'equivalent'], 'score': 2},
 {'bigram': ['degree', 'computer'], 'score': 4},
 {'bigram': ['cdd', 'edd'], 'score': 2},
 {'bigram': ['degree', 'year'], 'score': 2},
 {'bigram': ['advanced', 'degree'], 'score': 2},
 {'bigram': ['degree', 'plus'], 'score': 2}]

In [17]:
def process_place_of_work_data(country: str, state: str, role: str) -> List[Dict]:
    """
    Processes the place of work data and prepares it in the desired percentage format,
    considering only the counts for 'Hybrid', 'On-Site', and 'Remote'.
    If any of the categories are missing or have zero counts, 1 is added to each category
    to balance the data and calculate percentages.

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        List[Dict]: A list of dictionaries with the 'id', 'label', and 'value' (percentage) of each place of work.
    """
    # Call the QualifiedService method to get raw data
    raw_data = powData

    # Convert the raw data to a dictionary for easier access
    place_of_work_counts = {entry["_id"]: entry["count"] for entry in raw_data}

    # Define the desired place of work labels
    desired_labels = ["Hybrid", "On-Site", "Remote"]

    # Initialize counts, adding 1 to each category to balance the data
    balanced_counts = {label: place_of_work_counts.get(label, 0) + 1 for label in desired_labels}

    # Calculate the total count after adding 1 to each category
    total_count = sum(balanced_counts.values())

    # Process the data to form percentages and match the desired format
    processed_data = []
    for label in desired_labels:
        count = balanced_counts[label]
        percentage = (count / total_count * 100) if total_count > 0 else 0
        processed_data.append({
            "id": label,
            "label": label,
            "value": round(percentage, 2)  # Round to 2 decimal places
        })

    logging.info(f"Processed place of work data for country: {country}, state: {state}, role: {role}")
    return processed_data


In [18]:
process_place_of_work_data(country="United States",state= "All",role= "Data Analyst")

[{'id': 'Hybrid', 'label': 'Hybrid', 'value': 30.65},
 {'id': 'On-Site', 'label': 'On-Site', 'value': 0.5},
 {'id': 'Remote', 'label': 'Remote', 'value': 68.84}]

In [20]:
def process_state_frequency_data(country: str) -> List[Dict]:
    """
    Processes the state frequency data and prepares it in the desired percentage format.

    Args:
        country (str): The country to filter by.

    Returns:
        List[Dict]: A list of dictionaries with the 'state' and 'percentage' for each state.
    """
    # Call the QualifiedService method to get raw state frequency data
    raw_state_data = usaFreq

    # Calculate the total count of all state records
    total_count = sum(entry["count"] for entry in raw_state_data)

    # Process the data to form percentages and match the desired format
    processed_data = []
    for entry in raw_state_data:
        state = entry["state"]
        count = entry["count"]
        percentage = (count / total_count * 100) if total_count > 0 else 0
        processed_data.append({
            "state": state,
            "percentage": round(percentage, 2)  # Round to 2 decimal places
        })

    # Sort the data by state for consistency
    processed_data.sort(key=lambda x: x["state"])

    logging.info(f"Processed state frequency data for country: {country}")
    return processed_data


In [23]:
process_state_frequency_data(country="United States")

[{'state': 'AL', 'percentage': 1.11},
 {'state': 'AR', 'percentage': 0.6},
 {'state': 'AZ', 'percentage': 2.14},
 {'state': 'CA', 'percentage': 10.1},
 {'state': 'CO', 'percentage': 1.93},
 {'state': 'CT', 'percentage': 1.03},
 {'state': 'DC', 'percentage': 0.77},
 {'state': 'DE', 'percentage': 0.51},
 {'state': 'FL', 'percentage': 3.21},
 {'state': 'GA', 'percentage': 5.09},
 {'state': 'IA', 'percentage': 0.64},
 {'state': 'ID', 'percentage': 0.34},
 {'state': 'IL', 'percentage': 4.75},
 {'state': 'IN', 'percentage': 1.54},
 {'state': 'KS', 'percentage': 0.56},
 {'state': 'KY', 'percentage': 0.13},
 {'state': 'LA', 'percentage': 0.47},
 {'state': 'MA', 'percentage': 2.48},
 {'state': 'MD', 'percentage': 2.78},
 {'state': 'ME', 'percentage': 0.09},
 {'state': 'MI', 'percentage': 1.46},
 {'state': 'MN', 'percentage': 1.67},
 {'state': 'MO', 'percentage': 1.97},
 {'state': 'MS', 'percentage': 0.09},
 {'state': 'MT', 'percentage': 0.04},
 {'state': 'NC', 'percentage': 2.87},
 {'state': 'N

In [24]:
def process_state_frequency_data(country: str) -> List[Dict]:
    """
    Processes the state frequency data and prepares it in the desired percentage format.
    Merges the count for 'DC' into 'WA' (Washington).

    Args:
        country (str): The country to filter by.

    Returns:
        List[Dict]: A list of dictionaries with the 'state' and 'percentage' for each state.
    """
    # Call the QualifiedService method to get raw state frequency data
    raw_state_data = usaFreq

    # Merge the count for 'DC' into 'WA'
    state_counts = {}
    for entry in raw_state_data:
        state = entry["state"]
        count = entry["count"]
        
        # Merge 'DC' into 'WA'
        if state == "DC":
            state = "WA"
        
        if state in state_counts:
            state_counts[state] += count
        else:
            state_counts[state] = count

    # Calculate the total count of all state records
    total_count = sum(state_counts.values())

    # Process the data to form percentages and match the desired format
    processed_data = []
    for state, count in state_counts.items():
        percentage = (count / total_count * 100) if total_count > 0 else 0
        processed_data.append({
            "state": state,
            "percentage": round(percentage, 2)  # Round to 2 decimal places
        })

    # Sort the data by state for consistency
    processed_data.sort(key=lambda x: x["state"])

    logging.info(f"Processed state frequency data for country: {country}")
    return processed_data


In [25]:
process_state_frequency_data(country="United States")

[{'state': 'AL', 'percentage': 1.11},
 {'state': 'AR', 'percentage': 0.6},
 {'state': 'AZ', 'percentage': 2.14},
 {'state': 'CA', 'percentage': 10.1},
 {'state': 'CO', 'percentage': 1.93},
 {'state': 'CT', 'percentage': 1.03},
 {'state': 'DE', 'percentage': 0.51},
 {'state': 'FL', 'percentage': 3.21},
 {'state': 'GA', 'percentage': 5.09},
 {'state': 'IA', 'percentage': 0.64},
 {'state': 'ID', 'percentage': 0.34},
 {'state': 'IL', 'percentage': 4.75},
 {'state': 'IN', 'percentage': 1.54},
 {'state': 'KS', 'percentage': 0.56},
 {'state': 'KY', 'percentage': 0.13},
 {'state': 'LA', 'percentage': 0.47},
 {'state': 'MA', 'percentage': 2.48},
 {'state': 'MD', 'percentage': 2.78},
 {'state': 'ME', 'percentage': 0.09},
 {'state': 'MI', 'percentage': 1.46},
 {'state': 'MN', 'percentage': 1.67},
 {'state': 'MO', 'percentage': 1.97},
 {'state': 'MS', 'percentage': 0.09},
 {'state': 'MT', 'percentage': 0.04},
 {'state': 'NC', 'percentage': 2.87},
 {'state': 'ND', 'percentage': 0.09},
 {'state': 'N

In [33]:
def process_education_data(country: str, state: str, role: str) -> List[Dict]:
    """
    Processes the education data to prepare it in a list of dictionaries format.
    Groups the 1-grams from bigrams (ignoring the word 'degree') and sums the scores,
    then converts the scores to percentages and returns the top 4.

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        List[Dict]: A list of dictionaries containing 'id', 'label', and 'value' for the top 4 1-grams.
    """
    # Call the QualifiedService method to get raw education data
    raw_education_data = qs.get_education_data_by_country_state_role(country,state,role)

    # Dictionary to accumulate scores for each 1-gram
    education_scores = {}

    # Process the raw data to extract and accumulate scores
    for item in raw_education_data:
        bigram = item.get("bigram", [])
        score = item.get("score", 0)

        for word in bigram:
            if word.lower() != "degree":  # Ignore the word 'degree'
                if word.lower() in education_scores:
                    education_scores[word.lower()] += score
                else:
                    education_scores[word.lower()] = score

    # Calculate the total score for all 1-grams
    total_score = sum(education_scores.values())

    # Convert accumulated scores into percentages and format as required
    mockEducationData = [
        {
            "id": word.capitalize(),
            "label": word.capitalize(),  # Capitalize for label formatting
            "value": round((score / total_score * 100), 2) if total_score > 0 else 0  # Calculate percentage
        }
        for word, score in education_scores.items()
    ]

    # Sort the data by percentage in descending order and select the top 4
    mockEducationData.sort(key=lambda x: x["value"], reverse=True)
    top_4_data = mockEducationData[:4]  # Get only the top 4 entries

    logging.info(f"Processed education data for country: {country}, state: {state}, role: {role}")
    return top_4_data


In [34]:
process_education_data("United States", "All", "Data Analyst")

[{'id': 'Business', 'label': 'Business', 'value': 21.05},
 {'id': 'Master', 'label': 'Master', 'value': 14.47},
 {'id': 'Bachelor', 'label': 'Bachelor', 'value': 7.89},
 {'id': 'Management', 'label': 'Management', 'value': 5.26}]

In [49]:
from typing import Dict, List



def process_tools_data(country: str, state: str, role: str) -> List[Dict]:
    """
    Processes the tools data by extracting 1-grams from bigrams, accumulating scores,
    and calculating percentages. Special handling for 'power' + 'bi' and Microsoft-related combinations.

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        List[Dict]: A list of tools with their percentages.
    """
    raw_tools_data = qs.get_bigram_data_by_country_state_role(country, state, role).get('tools', [])
    tools_scores = {}

    for item in raw_tools_data:
        bigram = item['bigram']
        score = item['score']

        # Check for special case: "power" + "bi" -> "powerbi"
        if "power" in bigram and "bi" in bigram:
            tools_scores["powerbi"] = tools_scores.get("powerbi", 0) + score
        else:
            for word in bigram:
                if word.lower() == "microsoft":
                    # Combine 'Microsoft' with the other word in the bigram
                    combined_tool = f"microsoft {bigram[1] if bigram[0].lower() == 'microsoft' else bigram[0]}"
                    tools_scores[combined_tool] = tools_scores.get(combined_tool, 0) + score
                else:
                    tools_scores[word.lower()] = tools_scores.get(word.lower(), 0) + score

    # Calculate total score and convert to percentages
    total_score = sum(tools_scores.values())
    tools_data = [
        {"tool": tool, "percentage": round((score / total_score * 100), 2) if total_score > 0 else 0}
        for tool, score in tools_scores.items()
    ]

    # Sort and return the top 5
    tools_data.sort(key=lambda x: x["percentage"], reverse=True)
    return tools_data[:5]

def process_skills_data(country: str, state: str, role: str) -> List[Dict]:
    """
    Processes the skills data by extracting bigrams, accumulating scores, and calculating percentages.

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        List[Dict]: A list of skills with their percentages.
    """
    raw_skills_data = qs.get_bigram_data_by_country_state_role(country, state, role).get('skills', [])
    skills_scores = {}

    for item in raw_skills_data:
        bigram = " ".join(item['bigram'])  # Join bigram with a space
        score = item['score']
        skills_scores[bigram] = skills_scores.get(bigram, 0) + score

    # Calculate total score and convert to percentages
    total_score = sum(skills_scores.values())
    skills_data = [
        {"skill": skill, "percentage": round((score / total_score * 100), 2) if total_score > 0 else 0}
        for skill, score in skills_scores.items()
    ]

    # Sort and return the top 5
    skills_data.sort(key=lambda x: x["percentage"], reverse=True)
    return skills_data[:5]

def process_languages_data(country: str, state: str, role: str) -> List[Dict]:
    """
    Processes the languages data by extracting 1-grams, accumulating scores, and calculating percentages.
    Ignores specific 1-grams as specified.

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        List[Dict]: A list of languages with their percentages.
    """
    raw_languages_data = qs.get_bigram_data_by_country_state_role(country, state, role).get('languages', [])
    languages_scores = {}

    # Define 1-grams to ignore
    ignored_1grams = {"data"}  # Add any other 1-grams you want to ignore

    for item in raw_languages_data:
        for word in item['bigram']:
            if word.lower() not in ignored_1grams:  # Check if the word is not in the ignored list
                score = item['score']
                languages_scores[word.lower()] = languages_scores.get(word.lower(), 0) + score

    # Calculate total score and convert to percentages
    total_score = sum(languages_scores.values())
    languages_data = [
        {"language": language, "percentage": round((score / total_score * 100), 2) if total_score > 0 else 0}
        for language, score in languages_scores.items()
    ]

    # Sort and return the top 5
    languages_data.sort(key=lambda x: x["percentage"], reverse=True)
    return languages_data[:5]

def process_libraries_data(country: str, state: str, role: str) -> List[Dict]:
    """
    Processes the libraries data by extracting 1-grams, accumulating scores, and calculating percentages.
    Special handling for "spring" + "boot" -> "spring boot".

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        List[Dict]: A list of libraries with their percentages.
    """
    raw_libraries_data = qs.get_bigram_data_by_country_state_role(country, state, role).get('libraries', [])
    libraries_scores = {}

    for item in raw_libraries_data:
        bigram = item['bigram']
        score = item['score']

        # Check for special case: "spring" + "boot" -> "spring boot"
        if "spring" in bigram and "boot" in bigram:
            libraries_scores["spring boot"] = libraries_scores.get("spring boot", 0) + score
        else:
            for word in bigram:
                libraries_scores[word.lower()] = libraries_scores.get(word.lower(), 0) + score

    # Calculate total score and convert to percentages
    total_score = sum(libraries_scores.values())
    libraries_data = [
        {"library": library, "percentage": round((score / total_score * 100), 2) if total_score > 0 else 0}
        for library, score in libraries_scores.items()
    ]

    # Sort and return the top 5
    libraries_data.sort(key=lambda x: x["percentage"], reverse=True)
    return libraries_data[:5]

def process_bigram_data(country: str, state: str, role: str) -> Dict[str, List[Dict]]:
    """
    Calls all four processing methods and formats the data into the desired structure.

    Args:
        country (str): The country to filter by.
        state (str): The state to filter by.
        role (str): The role to filter by.

    Returns:
        Dict[str, List[Dict]]: A dictionary containing processed data for skills, tools, libraries, and languages.
    """
    # Call each method to get the processed data
    skills_data = process_skills_data(country, state, role)
    tools_data = process_tools_data(country, state, role)
    libraries_data = process_libraries_data(country, state, role)
    languages_data = process_languages_data(country, state, role)

    # Format the data into the desired structure
    mockData = {
        "skills": [{"skill": item["skill"], "percentage": item["percentage"]} for item in skills_data],
        "tools": [{"tool": item["tool"], "percentage": item["percentage"]} for item in tools_data],
        "libraries": [{"library": item["library"], "percentage": item["percentage"]} for item in libraries_data],
        "languages": [{"language": item["language"], "percentage": item["percentage"]} for item in languages_data]
    }

    logging.info(f"Processed bigram data for country: {country}, state: {state}, role: {role}")
    return mockData

In [52]:
process_bigram_data("United States", "All", "Data Analyst")

{'skills': [{'skill': 'data analysis', 'percentage': 28.84},
  {'skill': 'data visualization', 'percentage': 22.76},
  {'skill': 'machine learning', 'percentage': 13.26},
  {'skill': 'data modeling', 'percentage': 3.76},
  {'skill': 'statistical analysis', 'percentage': 3.31}],
 'tools': [{'tool': 'powerbi', 'percentage': 39.58},
  {'tool': 'tableau', 'percentage': 17.14},
  {'tool': 'looker', 'percentage': 5.3},
  {'tool': 'power', 'percentage': 3.71},
  {'tool': 'adobe', 'percentage': 2.83}],
 'libraries': [],
 'languages': [{'language': 'python', 'percentage': 27.8},
  {'language': 'sql', 'percentage': 24.15},
  {'language': 'andor', 'percentage': 5.85},
  {'language': 'r', 'percentage': 4.27},
  {'language': 'server', 'percentage': 4.15}]}

In [36]:
process_tools_data("United States", "All", "Data Analyst")

[{'tool': 'powerbi', 'percentage': 39.58},
 {'tool': 'tableau', 'percentage': 17.14},
 {'tool': 'looker', 'percentage': 5.3},
 {'tool': 'power', 'percentage': 3.71},
 {'tool': 'adobe', 'percentage': 2.83}]

In [47]:
process_languages_data("United States", "All", "Data Analyst")

[{'language': 'python', 'percentage': 27.8},
 {'language': 'sql', 'percentage': 24.15},
 {'language': 'andor', 'percentage': 5.85},
 {'language': 'r', 'percentage': 4.27},
 {'language': 'server', 'percentage': 4.15}]

In [41]:
process_skills_data("United States", "All", "Data Analyst")

[{'skill': 'data analysis', 'percentage': 28.84},
 {'skill': 'data visualization', 'percentage': 22.76},
 {'skill': 'machine learning', 'percentage': 13.26},
 {'skill': 'data modeling', 'percentage': 3.76},
 {'skill': 'statistical analysis', 'percentage': 3.31}]

In [48]:
process_libraries_data("United States", "NY", "Data Analyst")

[]

In [21]:
mongo_client.close_connection()