In [None]:
from spark_utils import quick_start
spark = quick_start("TestConnection")

In [None]:
# Test basic S3 connectivity
spark.sql("CREATE DATABASE IF NOT EXISTS test_db LOCATION 's3a://delta-lake/test_db/'")

# Test regular Parquet (should work)
data = [(1, "Alice", 25), (2, "Bob", 30)]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.write.mode("overwrite").parquet("s3a://delta-lake/test_parquet")

# Read it back
test_df = spark.read.parquet("s3a://delta-lake/test_parquet")
test_df.show()

In [None]:
# Just read the existing Delta table directly
df = spark.read.format("delta").load("s3a://delta-lake/bronze/github/keyword_extractions")
df.show()

In [None]:
# Working executor info for Spark 4.0
sc = spark.sparkContext

print("=== SPARK CLUSTER INFO ===")
print(f"Application ID: {sc.applicationId}")
print(f"Application Name: {sc.appName}")
print(f"Spark Version: {sc.version}")
print(f"Default parallelism: {sc.defaultParallelism}")
print(f"Master: {sc.master}")

# Test actual parallel execution
print("\n=== TESTING PARALLEL EXECUTION ===")
import time

start_time = time.time()
# Create work that will be distributed across executors
test_data = sc.parallelize(range(1000), 8)
result = test_data.map(lambda x: x * x).filter(lambda x: x % 2 == 0).count()
end_time = time.time()

print(f"Processed 1000 numbers across {test_data.getNumPartitions()} partitions")
print(f"Result: {result} even squares")
print(f"Processing time: {end_time - start_time:.2f} seconds")

# Check what we can access
print("\n=== AVAILABLE STATUS TRACKER METHODS ===")
status_tracker = sc.statusTracker()
available_methods = [method for method in dir(status_tracker) if not method.startswith('_')]
print(f"Available methods: {available_methods}")

print(f"\n🌐 For detailed executor info, open: http://localhost:4041")
print("   Go to 'Executors' tab to see:")
print("   - Number of executors")
print("   - Cores per executor") 
print("   - Memory usage")
print("   - Task distribution")

In [None]:
import jupyter_core
import jupyterlab
print(f"Jupyter Core: {jupyter_core.__version__}")
print(f"JupyterLab: {jupyterlab.__version__}")

In [None]:
# Use the correct method names for Spark 4.0
status_tracker = sc.statusTracker()

print("=== CURRENT ACTIVITY ===")
try:
    active_jobs = status_tracker.getActiveJobsIds()
    active_stages = status_tracker.getActiveStageIds()
    print(f"Active jobs: {len(active_jobs)}")
    print(f"Active stages: {len(active_stages)}")
except Exception as e:
    print(f"Status check: {e}")

# Check your Delta Lake data processing
print("\n=== DELTA LAKE PROCESSING TEST ===")
df = spark.read.format("delta").load("s3a://delta-lake/bronze/github/keyword_extractions")

start_time = time.time()
count = df.count()
end_time = time.time()

print(f"Delta table rows: {count}")
print(f"Read time: {end_time - start_time:.2f} seconds")
print(f"Partitions used: {df.rdd.getNumPartitions()}")

In [None]:
# Let's see the full picture of your worker utilization
print("=== CLUSTER RESOURCE SUMMARY ===")
print("Master: spark://spark-master:7077")
print(f"Available cores: {sc.defaultParallelism}")
print(f"Partition strategy: {df.rdd.getNumPartitions()} partitions")

# Test with different partition counts
print("\n=== PARTITION OPTIMIZATION TEST ===")
df = spark.read.format("delta").load("s3a://delta-lake/bronze/github/keyword_extractions")

# Try repartitioning to use more parallelism
df_repartitioned = df.repartition(4)
print(f"Repartitioned to: {df_repartitioned.rdd.getNumPartitions()} partitions")

start_time = time.time()
sample_data = df_repartitioned.sample(0.1).collect()
end_time = time.time()

print(f"Sample processing time: {end_time - start_time:.2f} seconds")
print(f"Sample size: {len(sample_data)} rows")