### Clean the Data using AI

**Description:**

In this section, we utilize AI to clean and preprocess the job market data, implementing optimizations to reduce API usage and ensure data quality. The process involves simplifying roles, skills, and languages using a language model with a caching mechanism to improve efficiency.

**Steps:**

1. **Setup and Configuration:**
   - Initialize the language model and caching system
   - Define standardized categories for skills and programming languages
   - Implement error handling for various data types and edge cases

2. **Caching Mechanism:**
   - Use a file-based cache to store previously processed items
   - Avoid redundant API calls for repeated terms
   - Persist cache between runs for continuous optimization

3. **Test with Random Sample:**
   - Select a random subset of 5 entries for initial testing
   - Validate the cleaning process on diverse data points
   - Review results before processing the full dataset

4. **Process the Data:**
   - Clean skills and languages using the optimized classifier
   - Handle missing values and invalid entries gracefully
   - Process data in batches with appropriate rate limiting

5. **Data Transformation:**
   - Split combined skill entries into distinct categories
   - Standardize terminology for consistency
   - Remove duplicates within each category

6. **Quality Assurance:**
   - Maintain original data while adding cleaned columns
   - Track and report processing status
   - Handle edge cases and errors without interrupting the process

7. **Save and Export:**
   - Add cleaned data as new columns in the DataFrame
   - Export the enhanced dataset to a new CSV file
   - Preserve both original and cleaned data for reference

This approach combines AI-powered cleaning with practical optimizations, ensuring efficient processing while maintaining data quality. The implementation includes safeguards against API rate limits and mechanisms to handle various data inconsistencies.

In [1]:
import os
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from collections import defaultdict
import json
import time

load_dotenv(".env")

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    temperature=0, model="gpt-4-0125-preview", 
    api_key=os.getenv("OPENAI_API_KEY"))

# Define lists for classification
roles = ['data scientist', 'data engineer', 'analyst', 'mle', 'manager', 'director', 'na']
skills = [
    'statistics', 'machine_learning', 'data_analysis', 'data_mining',
    'nlp', 'computer_vision', 'deep_learning', 'big_data', 'na'
]
languages = [
    'python', 'r', 'matlab', 'java', 'c++', 'sas', 'na'
]

class SimpleCache:
    def __init__(self, cache_file='classification_cache.json'):
        self.cache_file = cache_file
        self.cache = self.load_cache()
        
    def load_cache(self):
        try:
            with open(self.cache_file, 'r') as f:
                return json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            return {'roles': {}, 'skills_languages': {}}
    
    def save_cache(self):
        with open(self.cache_file, 'w') as f:
            json.dump(self.cache, f)
    
    def get_role(self, text):
        # Handle NaN, None, and non-string types
        if pd.isna(text) or text is None:
            return 'na'
        text_str = str(text).lower().strip()
        return self.cache['roles'].get(text_str)
    
    def get_skill_language(self, text):
        # Handle NaN, None, and non-string types
        if pd.isna(text) or text is None:
            return ('unknown', 'na')
        text_str = str(text).lower().strip()
        return self.cache['skills_languages'].get(text_str)
    
    def set_role(self, text, value):
        # Handle NaN, None, and non-string types
        if pd.isna(text) or text is None:
            return
        text_str = str(text).lower().strip()
        self.cache['roles'][text_str] = value
        self.save_cache()
    
    def set_skill_language(self, text, value):
        # Handle NaN, None, and non-string types
        if pd.isna(text) or text is None:
            return
        text_str = str(text).lower().strip()
        self.cache['skills_languages'][text_str] = value
        self.save_cache()

# Initialize cache
cache = SimpleCache()

def simplify_role(text):
    """Simplify role text using cache first"""
    # Handle NaN values
    if pd.isna(text) or text is None:
        return 'na'
    
    # Check cache first
    cached_result = cache.get_role(text)
    if cached_result:
        return cached_result
    
    # If not in cache, use API
    try:
        schema = {
            "title": "role_selector",
            "description": (
                "Match job roles to standard titles, being tolerant of typos and variations:\n"
                "Examples:\n"
                "'senior data scientist' → 'data scientist'\n"
                "'ml engineer' → 'mle'\n"
                "'data analytics manager' → 'manager'\n"
                "'chief data officer' → 'director'\n"
                f"Available roles: {roles}\n"
                "Return 'na' only if the role is completely unclear"
            ),
            "type": "object",
            "properties": {
                "selected_role": {
                    "type": "string",
                    "enum": roles,
                    "description": "Most similar role from the list, or 'na' if unclear"
                }
            },
            "required": ["selected_role"]
        }
        llm_struc = llm.with_structured_output(schema)
        result = llm_struc.invoke(str(text))
        simplified = result["selected_role"]
        
        # Cache the result
        cache.set_role(text, simplified)
        return simplified
    except Exception as e:
        print(f"Error processing role {text}: {str(e)}")
        return 'na'

def simplify_text_v2(text):
    """Enhanced version with caching"""
    # Handle NaN values
    if pd.isna(text) or text is None:
        return ("unknown", "na")
    
    # Check cache first
    cached_result = cache.get_skill_language(text)
    if cached_result:
        return tuple(cached_result) if isinstance(cached_result, list) else cached_result
    
    # If not in cache, use API
    try:
        schema = {
            "title": "skill_language_classifier",
            "description": (
                "Classify and standardize the input text:\n"
                "1. First determine if it's a skill or programming language\n"
                "2. Then match to the standard list\n"
                f"Skills: {skills}\n"
                f"Languages: {languages}\n"
                "Examples:\n"
                "'statistical analysis' → ('skill', 'statistics')\n"
                "'python coding' → ('language', 'python')\n"
                "'machine learning exp' → ('skill', 'machine_learning')"
            ),
            "type": "object",
            "properties": {
                "category": {
                    "type": "string",
                    "enum": ["skill", "language", "unknown"],
                    "description": "The category that best matches the input"
                },
                "standardized": {
                    "type": "string",
                    "enum": skills + languages,
                    "description": "The standardized term from the appropriate list"
                }
            },
            "required": ["category", "standardized"]
        }
        llm_struc = llm.with_structured_output(schema)
        result = llm_struc.invoke(str(text))
        output = (result["category"], result["standardized"])
        
        # Cache the result
        cache.set_skill_language(text, list(output))
        return output
    except Exception as e:
        print(f"Error processing {text}: {str(e)}")
        return ("unknown", "na")

def process_data(df, batch_size=50, rate_limit_delay=1):
    clean_roles = []
    clean_skills = []
    clean_languages = []
    
    # Process roles
    print("\nProcessing Roles:")
    for i, role_text in enumerate(df['roles']):
        if i > 0 and i % batch_size == 0:
            print(f"Processed {i} roles...")
            time.sleep(rate_limit_delay)
        
        simplified_role = simplify_role(role_text)
        clean_roles.append(simplified_role)
    
    # Process skills
    print("\nProcessing Skills and Languages:")
    for i, skills_text in enumerate(df['skills']):
        if i > 0 and i % batch_size == 0:
            print(f"Processed {i} skill sets...")
            time.sleep(rate_limit_delay)
        
        if pd.isna(skills_text):
            clean_skills.append('')
            clean_languages.append('')
            continue
            
        skill_items = str(skills_text).split(', ')
        skills_list = []
        languages_list = []
        
        for skill in skill_items:
            category, simplified = simplify_text_v2(skill)
            if simplified != 'na':
                if category == 'skill':
                    skills_list.append(simplified)
                elif category == 'language':
                    languages_list.append(simplified)
        
        clean_skills.append(', '.join(set(skills_list)))
        clean_languages.append(', '.join(set(languages_list)))
    
    return clean_roles, clean_skills, clean_languages

In [2]:
# Load the data
df = pd.read_csv('DataScience_jobs.csv')

# Test with random subset
print("Testing with a random subset...")
df_test = df.sample(n=5, random_state=42)  # Random 5 rows, set random_state for reproducibility
clean_roles, clean_skills, clean_languages = process_data(df_test)

# Show test results
df_test['clean_role'] = clean_roles
df_test['clean_skills'] = clean_skills
df_test['clean_languages'] = clean_languages
print("\nTest Results:")
print(df_test[['roles', 'clean_role', 'skills', 'clean_skills', 'clean_languages']])

Testing with a random subset...

Processing Roles:





Processing Skills and Languages:

Test Results:
                             roles      clean_role  \
628  Data Scientist with GCP Cloud  data scientist   
631                 Data Scientist  data scientist   
741                 Data Scientist  data scientist   
514  Data Scientist (Supply Chain)  data scientist   
365                 Data Scientist  data scientist   

                                                skills  \
628  Data Science, GCP, Natural Language Processing...   
631                                     Research, Data   
741  Statistical modeling, tableau, Coding, Analyti...   
514  Supply chain management, data science, Neural ...   
365  Computer science, data science, Data modeling,...   

                                          clean_skills clean_languages  
628  nlp, computer_vision, machine_learning, deep_l...                  
631                                                                     
741        machine_learning, data_analysis, statistics    

In [None]:
# Process the full dataset
print("Processing full dataset...")
clean_roles, clean_skills, clean_languages = process_data(df)

# Add cleaned data to DataFrame
df['clean_role'] = clean_roles
df['clean_skills'] = clean_skills
df['clean_languages'] = clean_languages

# Save the cleaned data
output_file = 'DataScience_jobs_cleaned_all.csv'
df.to_csv(output_file, index=False)
print(f"\nFull dataset processed and saved to {output_file}")

# Show a sample of the final results
print("\nSample of final cleaned data:")
print(df[['roles', 'clean_role', 'skills', 'clean_skills', 'clean_languages']].head())