In [1]:
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# --- SETUP MOCK ENVIRONMENT FOR LOCAL RUN ---
try:
    # ‡∏û‡∏¢‡∏≤‡∏¢‡∏≤‡∏° Import ‡∏Ç‡∏≠‡∏á‡∏à‡∏£‡∏¥‡∏á (‡πÄ‡∏ú‡∏∑‡πà‡∏≠‡∏£‡∏±‡∏ô‡∏ö‡∏ô Glue Docker ‡∏´‡∏£‡∏∑‡∏≠ Cloud)
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    print("‚úÖ Found AWS Glue Libraries (Running on AWS/Docker)")
except ImportError:
    # ‡∏ñ‡πâ‡∏≤‡∏£‡∏±‡∏ô‡∏ö‡∏ô Laptop ‡∏ó‡∏±‡πà‡∏ß‡πÑ‡∏õ ‡πÉ‡∏´‡πâ‡∏™‡∏£‡πâ‡∏≤‡∏á Mock Class ‡∏Ç‡∏∂‡πâ‡∏ô‡∏°‡∏≤‡πÅ‡∏ó‡∏ô
    print("‚ö†Ô∏è AWS Glue Libraries NOT found. Using MOCK objects for Local Laptop.")
    
    # 1. Mock GlueContext
    class GlueContext:
        def __init__(self, sc):
            self.sc = sc
            # ‡πÉ‡∏ä‡πâ Local SparkSession ‡πÅ‡∏ó‡∏ô Glue SparkSession
            self.spark_session = SparkSession.builder \
                .master("local[*]") \
                .appName("LocalGlueRun") \
                .getOrCreate()

    # 2. Mock Job Class
    class Job:
        def __init__(self, glue_context):
            pass
        def init(self, job_name, args):
            print(f"   -> [Mock] Job Initialized: {job_name}")
        def commit(self):
            print("   -> [Mock] Job Committed")

    # 3. Mock getResolvedOptions
    def getResolvedOptions(args, options):
        # ‡∏Ñ‡∏∑‡∏ô‡∏Ñ‡πà‡∏≤ Dictionary ‡πÄ‡∏õ‡∏•‡πà‡∏≤ ‡∏´‡∏£‡∏∑‡∏≠‡∏Ñ‡πà‡∏≤ Mock ‡∏ï‡∏≤‡∏°‡∏ó‡∏µ‡πà Key ‡∏ó‡∏µ‡πà‡∏Ç‡∏≠‡∏°‡∏≤
        return {k: 'mock_value' for k in options}

‚ö†Ô∏è AWS Glue Libraries NOT found. Using MOCK objects for Local Laptop.


In [2]:
import time
import concurrent.futures
import pandas as pd
# ‡πÑ‡∏°‡πà‡∏ï‡πâ‡∏≠‡∏á import awsglue ‡∏ï‡∏£‡∏á‡∏ô‡∏µ‡πâ‡πÅ‡∏•‡πâ‡∏ß ‡πÄ‡∏û‡∏£‡∏≤‡∏∞ Cell ‡∏ö‡∏ô‡∏à‡∏±‡∏î‡∏Å‡∏≤‡∏£‡πÉ‡∏´‡πâ‡πÅ‡∏•‡πâ‡∏ß

# ==========================================
# 1. Init Spark & Glue Wrappers
# ==========================================
sc = SparkContext.getOrCreate() # ‡πÉ‡∏ä‡πâ getOrCreate ‡∏õ‡πâ‡∏≠‡∏á‡∏Å‡∏±‡∏ô error ‡πÄ‡∏ß‡∏•‡∏≤‡∏£‡∏±‡∏ô‡∏ã‡πâ‡∏≥‡πÉ‡∏ô notebook
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Mock Arguments
args = {
    'JOB_NAME': 'local_notebook_test',
    'MAX_CONCURRENT_THREADS': '3' # ‡∏•‡∏≠‡∏á‡∏õ‡∏£‡∏±‡∏ö‡πÄ‡∏•‡∏Ç‡∏ô‡∏µ‡πâ‡πÄ‡∏•‡πà‡∏ô‡∏î‡∏π‡∏Ñ‡∏£‡∏±‡∏ö
}
job.init(args['JOB_NAME'], args)

# ==========================================
# 2. Configuration & Mock Data
# ==========================================
MAX_WORKERS = int(args['MAX_CONCURRENT_THREADS'])
S3_BUCKET_ROOT = "s3a://mock-bucket" # ‡πÉ‡∏ä‡πâ s3a protocol ‡∏™‡∏≥‡∏´‡∏£‡∏±‡∏ö local spark

def get_job_control_list():
    # ‡∏à‡∏≥‡∏•‡∏≠‡∏á Data ‡∏ä‡∏∏‡∏î‡πÄ‡∏î‡∏¥‡∏°
    return [
        {'table_id': 1, 'target_table': 'users', 'piority': 1, 'parallel_run': True},
        {'table_id': 2, 'target_table': 'products', 'piority': 1, 'parallel_run': True},
        {'table_id': 3, 'target_table': 'orders', 'piority': 2, 'parallel_run': False}, # Sequential
        {'table_id': 4, 'target_table': 'logs_a', 'piority': 2, 'parallel_run': True},
        {'table_id': 5, 'target_table': 'logs_b', 'piority': 2, 'parallel_run': True},
        {'table_id': 6, 'target_table': 'logs_c', 'piority': 2, 'parallel_run': True},
        {'table_id': 7, 'target_table': 'history', 'piority': 3, 'parallel_run': True},
    ]

# ==========================================
# 3. Core Logic
# ==========================================
def process_table(row):
    """
    ‡∏ü‡∏±‡∏á‡∏Å‡πå‡∏ä‡∏±‡∏ô‡∏ó‡∏≥‡∏á‡∏≤‡∏ô‡∏à‡∏£‡∏¥‡∏á
    """
    table_name = row['target_table']
    piority = row['piority']
    
    # ‡∏à‡∏≥‡∏•‡∏≠‡∏á‡πÄ‡∏ß‡∏•‡∏≤‡∏ó‡∏≥‡∏á‡∏≤‡∏ô (Random ‡∏ô‡∏¥‡∏î‡∏´‡∏ô‡πà‡∏≠‡∏¢‡πÄ‡∏û‡∏∑‡πà‡∏≠‡πÉ‡∏´‡πâ‡πÄ‡∏´‡πá‡∏ô‡∏†‡∏≤‡∏û Parallel)
    import random
    process_time = random.uniform(1.0, 3.0) 
    
    print(f"   ‚ñ∂ [Start] {table_name} (P:{piority})")
    time.sleep(process_time) # Simulate Data Processing
    
    # --- ‡∏à‡∏∏‡∏î‡∏ó‡∏µ‡πà‡∏ï‡πâ‡∏≠‡∏á‡πÅ‡∏Å‡πâ‡πÄ‡∏°‡∏∑‡πà‡∏≠‡∏à‡∏∞‡∏ï‡πà‡∏≠ S3 ‡∏à‡∏£‡∏¥‡∏á ---
    # df = spark.createDataFrame(...) 
    # df.write.parquet(...)
    
    return f"   ‚úÖ [Done] {table_name} ({process_time:.2f}s)"

# ==========================================
# 4. Orchestration Execution
# ==========================================
print(f"--- Starting Job with {MAX_WORKERS} Max Threads ---\n")

job_list = get_job_control_list()
all_priorities = sorted(list(set(job['piority'] for job in job_list)))

for prio in all_priorities:
    print(f"\nüì¢ PRIORITY GROUP: {prio}")
    print("-" * 30)
    
    current_batch = [job for job in job_list if job['piority'] == prio]
    parallel_jobs = [job for job in current_batch if job.get('parallel_run') == True]
    sequential_jobs = [job for job in current_batch if job.get('parallel_run') != True]
    
    # 4.1 Run Sequential
    if sequential_jobs:
        print(f"üëâ Running Sequential ({len(sequential_jobs)} jobs)...")
        for job_row in sequential_jobs:
            print(process_table(job_row))
            
    # 4.2 Run Parallel
    if parallel_jobs:
        print(f"üëâ Running Parallel ({len(parallel_jobs)} jobs)...")
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future_to_table = {executor.submit(process_table, job): job for job in parallel_jobs}
            
            for future in concurrent.futures.as_completed(future_to_table):
                try:
                    print(future.result())
                except Exception as exc:
                    print(f"‚ùå Error: {exc}")

print("\nüéâ All Jobs Completed.")
job.commit()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/23 13:50:56 WARN Utils: Your hostname, Santis-Mac-Studio.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.38 instead (on interface en0)
26/01/23 13:50:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/23 13:50:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


   -> [Mock] Job Initialized: local_notebook_test
--- Starting Job with 3 Max Threads ---


üì¢ PRIORITY GROUP: 1
------------------------------
üëâ Running Parallel (2 jobs)...
   ‚ñ∂ [Start] users (P:1)
   ‚ñ∂ [Start] products (P:1)
   ‚úÖ [Done] users (1.68s)
   ‚úÖ [Done] products (1.76s)

üì¢ PRIORITY GROUP: 2
------------------------------
üëâ Running Sequential (1 jobs)...
   ‚ñ∂ [Start] orders (P:2)
   ‚úÖ [Done] orders (2.13s)
üëâ Running Parallel (3 jobs)...
   ‚ñ∂ [Start] logs_a (P:2)
   ‚ñ∂ [Start] logs_b (P:2)
   ‚ñ∂ [Start] logs_c (P:2)
   ‚úÖ [Done] logs_a (1.67s)
   ‚úÖ [Done] logs_b (2.62s)
   ‚úÖ [Done] logs_c (2.83s)

üì¢ PRIORITY GROUP: 3
------------------------------
üëâ Running Parallel (1 jobs)...
   ‚ñ∂ [Start] history (P:3)
   ‚úÖ [Done] history (2.58s)

üéâ All Jobs Completed.
   -> [Mock] Job Committed
