In [None]:
# ==================================================
#                DYNAMODB TABLES
# ==================================================
API_ID = "alek4h7jlreffeoe5tocxgnx2u"
CAREER_SALARY_TABLE      = f"careerSalary-{API_ID}-NONE"
CAREER_EDUCATION_TABLE   = "careerEducation-{API_ID}-NONE"
CAREER_SKILLS_TABLE      = "careerSkills-{API_ID}-NONE"
CAREER_DESCRIPTION_TABLE = "careerDescription-{API_ID}-NONE"
SOC_CODES_TABLE          = "socCodes-{API_ID}-NONE"
REGION = "us-west-2"
NUM_THREADS = 4

import os
import csv
import ast
import time
import threading
from decimal import Decimal
from collections import defaultdict
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from botocore.exceptions import ClientError

dynamodb = boto3.resource("dynamodb", region_name=REGION)

# ==================================================
#              COMMON UTILS / THREADING
# ==================================================

class DynamoUploader:
    def __init__(self, table, batch_size, rejected_file, overwrite_pkeys):
        self.table = table
        self.batch_size = batch_size
        self.rejected_file = rejected_file
        self.queue = Queue()
        self.rejected_rows = []
        self.lock = threading.Lock()
        self.overwrite_pkeys = overwrite_pkeys

    def process_batch(self, batch, batch_number):
        start = time.time()
        try:
            with self.table.batch_writer(overwrite_by_pkeys=self.overwrite_pkeys) as writer:
                for item in batch:
                    try:
                        writer.put_item(Item=item)
                    except Exception as e:
                        with self.lock:
                            self.rejected_rows.append({**item, 'error': str(e)})
            print(f"✅ Batch {batch_number} inserted in {time.time() - start:.2f}s ({len(batch)} records)")
        except Exception as e:
            with self.lock:
                for item in batch:
                    self.rejected_rows.append({**item, 'error': str(e)})
            print(f"❌ Batch {batch_number} failed with error: {str(e)}")

    def worker(self):
        batch_number = 1
        while True:
            batch = self.queue.get()
            if batch is None:
                break
            self.process_batch(batch, batch_number)
            batch_number += 1
            self.queue.task_done()

    def threaded_upload(self, data_iter):
        threads = []
        for _ in range(NUM_THREADS):
            t = threading.Thread(target=self.worker)
            t.start()
            threads.append(t)
        for batch in data_iter:
            self.queue.put(batch)
        self.queue.join()
        for _ in range(NUM_THREADS):
            self.queue.put(None)
        for t in threads:
            t.join()

    def save_rejected_rows(self):
        if self.rejected_rows:
            keys = self.rejected_rows[0].keys()
            with open(self.rejected_file, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=keys)
                writer.writeheader()
                writer.writerows(self.rejected_rows)
            print(f"⚠️ {len(self.rejected_rows)} records rejected. Saved to {self.rejected_file}")

# ==================================================
#         HELPERS FOR EACH DATA TYPE
# ==================================================
def to_decimal(val):
    try:
        return Decimal(str(round(float(val), 2))) if val not in [None, '', 'null'] else None
    except:
        return None

def batch_iterable(iterable, batch_size):
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

# -------- CAREER SALARY --------
def salary_data_iter(directory, batch_size):
    for filename in os.listdir(directory):
        if filename.endswith("_dynamodb_ready.csv"):
            filepath = os.path.join(directory, filename)
            with open(filepath, mode='r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                batch = []
                for row in reader:
                    try:
                        item = {
                            'occ_code': str(row['occ_code']),
                            'salary_key': str(row['salary_key']),
                            'a_median': to_decimal(row.get('a_median')),
                            'm_median': to_decimal(row.get('m_median')),
                            'm_pct10': to_decimal(row.get('m_pct10')),
                            'm_pct90': to_decimal(row.get('m_pct90')),
                        }
                        item = {k: v for k, v in item.items() if v is not None}
                        batch.append(item)
                        if len(batch) == batch_size:
                            yield batch
                            batch = []
                    except Exception as e:
                        continue
                if batch:
                    yield batch

# -------- CAREER EDUCATION --------
def education_data_iter(input_file, batch_size):
    CODE_TO_FIELD = {
        'Less_than_hs': 'less_than_highschool',
        'hs_or_eq': 'high_school_or_equivalent',
        'Associate_degree': 'associate_degree',
        'Bachelor_degree': 'bachelor_degree',
        'Master_degree': 'master_degree',
        'Doctorate_degree': 'doctorate_degree',
        'No_requirement': 'no_requirement',
        'Professional_degree': 'professional_degree',
    }
    edu_data = defaultdict(dict)
    with open(input_file, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            occ = row['SOC']
            code = row['ESTIMATECODE']
            field = CODE_TO_FIELD.get(code)
            if not field:
                continue
            value = row['ESTIMATE']
            edu_data[occ].setdefault('occ_code', occ)
            edu_data[occ][field] = value
    return batch_iterable(edu_data.values(), batch_size)

# -------- CAREER SKILLS --------
def skills_data_iter(input_file, batch_size):
    with open(input_file, mode='r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        batch = []
        for row_num, row in enumerate(reader, 2):
            try:
                occ_code = str(row['SOC_CODE'])
                skills_str = row['TYPICAL_SKILLS']
                skills = ast.literal_eval(skills_str)
                item = {'occ_code': occ_code, 'skills': skills}
                batch.append(item)
                if len(batch) == batch_size:
                    yield batch
                    batch = []
            except Exception as e:
                continue
        if batch:
            yield batch

# -------- CAREER DESCRIPTION --------
def description_data_iter(input_file, batch_size):
    with open(input_file, mode='r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        batch = []
        for row_num, row in enumerate(reader, 2):
            try:
                item = {'occ_code': str(row['Code']), 'description': str(row['Description'])}
                batch.append(item)
                if len(batch) == batch_size:
                    yield batch
                    batch = []
            except Exception as e:
                continue
        if batch:
            yield batch

# -------- SOC CODES --------
def soc_codes_data_iter(input_file, batch_size):
    with open(input_file, mode='r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        batch = []
        for row_num, row in enumerate(reader, 2):
            try:
                item = {'occ_code': str(row['OCC_CODE']), 'occ_title': str(row['OCC_TITLE'])}
                batch.append(item)
                if len(batch) == batch_size:
                    yield batch
                    batch = []
            except Exception as e:
                continue
        if batch:
            yield batch

# ==================================================
#                  MAIN RUNNER
# ==================================================
if __name__ == "__main__":
    print(f"Uploading Career Salary...")
    salary_uploader = DynamoUploader(
        dynamodb.Table(CAREER_SALARY_TABLE), 1000, "rejected_dynamodb_rows.csv", ['occ_code']
    )
    salary_uploader.threaded_upload(salary_data_iter("Datasets/dynamodb_ready_by_year", 1000))
    salary_uploader.save_rejected_rows()
    
    print(f"Uploading Career Education...")
    edu_uploader = DynamoUploader(
        dynamodb.Table(CAREER_EDUCATION_TABLE), 100, "rejected_career_education.csv", ['occ_code']
    )
    edu_uploader.threaded_upload(education_data_iter("Datasets/education_data.csv", 100))
    edu_uploader.save_rejected_rows()
    
    print(f"Uploading Career Skills...")
    skills_uploader = DynamoUploader(
        dynamodb.Table(CAREER_SKILLS_TABLE), 100, "rejected_career_skills.csv", ['occ_code']
    )
    skills_uploader.threaded_upload(skills_data_iter("Datasets/skills_data.csv", 100))
    skills_uploader.save_rejected_rows()
    
    print(f"Uploading Career Description...")
    desc_uploader = DynamoUploader(
        dynamodb.Table(CAREER_DESCRIPTION_TABLE), 100, "rejected_career_description.csv", ['occ_code']
    )
    desc_uploader.threaded_upload(description_data_iter("Datasets/description.csv", 100))
    desc_uploader.save_rejected_rows()
    
    print(f"Uploading SOC Codes...")
    soc_uploader = DynamoUploader(
        dynamodb.Table(SOC_CODES_TABLE), 100, "rejected_soc_codes.csv", ['occ_code']
    )
    soc_uploader.threaded_upload(soc_codes_data_iter("Datasets/unique_occ_codes.csv", 100))
    soc_uploader.save_rejected_rows()
    
    print("🚀 ALL TABLES UPLOADED. YOU'RE DONE.")

