# Employee Data

- employee_id
- first_name
- last_name
- phone_num
    - country_code
    - area_code
    - exchange_code
    - subscriber_number
- email_id
- address
    - street_name
    - unit_number
    - city
    - county
    - state
    - zip_code
    - extended_zip_code
        - sector
        - segment

In [121]:
# Import required modules/libraries
from pyspark.sql.types import (StructType, StructField, StringType, IntegerType)
from pyspark.sql import SparkSession, Row
from datetime import date, datetime
from pyspark.sql.functions import col, struct
import random

In [98]:
# Create a spark connection
spark = SparkSession.builder.appName("Employee Analysis").getOrCreate()

In [118]:
#Employee Schema

extended_zip_code_schema = StructType([
    StructField("sector", IntegerType()),  # Example: 54
    StructField("segment", IntegerType()) # Example: 01
])

address_schema = StructType([
    StructField("street_name", StringType(), False),
    StructField("unit_number", StringType()),
    StructField("city", StringType(), False),
    StructField("county", StringType()),
    StructField("state", StringType(), False),
    StructField("zip_code", IntegerType(), False),
    StructField("extended_zip_code", extended_zip_code_schema) # Nesting the previous structure
])

phone_num_schema = StructType([                                 # e.g., 1 (555)-123-4567
    StructField("country_code", IntegerType(), False),          # e.g., "1"
    StructField("area_code", IntegerType(), False),             # e.g., "555"
    StructField("exchange_code", IntegerType(), False),         # e.g., "123"
    StructField("subscriber_number", IntegerType(), False)      # e.g., "4567"
])

employee_schema = StructType([
    StructField("employee_id", IntegerType(), False),
    StructField("dept_id", IntegerType(), False),    
    StructField("first_name", StringType(), False),
    StructField("middle_name", StringType()),
    StructField("last_name", StringType(), False),
    StructField("phone_num", phone_num_schema, False),         # Nested Phone Structure
    StructField("email_id", StringType(), False),
    StructField("address", address_schema, False)              # Nested Address Structure
])

In [116]:
# Department Schema

dept_schema = StructType([
    StructField("dept_id", IntegerType(), False),
    StructField("dept_name", StringType(), False),
    StructField("dept_head_emp_id", IntegerType(), False),
    StructField("loc_address", address_schema, False)
])

In [119]:
# Generate synthetic data

FIRST_NAMES = ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona", "George", "Hannah", "Ivy", "Jack"]
LAST_NAMES = ["Smith", "Jones", "Williams", "Brown", "Davis", "Miller", "Wilson", "Moore", "Taylor", "Anderson"]
CITIES = ["New York", "Chicago", "Boston", "Seattle", "Austin", "Denver"]
STATES = ["NY", "IL", "MA", "WA", "TX", "CO"]
DEPARTMENTS = [
    (10, "Engineering"),
    (20, "Sales"),
    (30, "HR"),
    (40, "Finance"),
    (50, "Marketing"),
]

In [120]:
# Create a list of department IDs that we will assign to employees
DEPT_IDS = [id for id, name in DEPARTMENTS]

In [122]:
def generate_address(city, state):
    """Generates a nested address Row consistent with address_schema."""
    zip_code_base = random.randint(10000, 99999)
    return Row(
        street_name=f"{random.randint(100, 999)} {random.choice(['Oak', 'Pine', 'Main'])} St",
        unit_number=random.choice([None, str(random.randint(1, 200))]),
        city=city,
        county=f"{city} County",
        state=state,
        zip_code=zip_code_base,
        extended_zip_code=Row(
            sector=random.randint(10, 99),
            segment=random.randint(10, 99)
        )
    )

In [131]:
dept_data = []
# Ensure each department gets a location
for dept_id, dept_name in DEPARTMENTS:
    city = random.choice(CITIES)
    state = STATES[CITIES.index(city)]
    
    # Generate a unique employee ID for the department head (ensuring it's not the same as a future employee ID)
    dept_head_emp_id = 1000 + dept_id
    
    dept_data.append(Row(
        dept_id=dept_id,
        dept_name=dept_name,
        dept_head_emp_id=dept_head_emp_id,
        loc_address=generate_address(city, state)
    ))

In [124]:
print(dept_data)

[Row(dept_id=10, dept_name='Engineering', dept_head_emp_id=9010, loc_address=Row(street_name='666 Pine St', unit_number='197', city='New York', county='New York County', state='NY', zip_code=20930, extended_zip_code=Row(sector=72, segment=12))), Row(dept_id=20, dept_name='Sales', dept_head_emp_id=9020, loc_address=Row(street_name='403 Main St', unit_number='42', city='Boston', county='Boston County', state='MA', zip_code=58471, extended_zip_code=Row(sector=93, segment=30))), Row(dept_id=30, dept_name='HR', dept_head_emp_id=9030, loc_address=Row(street_name='592 Main St', unit_number='165', city='New York', county='New York County', state='NY', zip_code=49000, extended_zip_code=Row(sector=83, segment=10))), Row(dept_id=40, dept_name='Finance', dept_head_emp_id=9040, loc_address=Row(street_name='644 Main St', unit_number=None, city='Chicago', county='Chicago County', state='IL', zip_code=13146, extended_zip_code=Row(sector=36, segment=33))), Row(dept_id=50, dept_name='Marketing', dept_he

In [125]:
# Create a test data
employee_data = []
for i in range(1, 101): # 100 employees
    emp_id = 1000 + i
    dept_id = random.choice(DEPT_IDS) # Ensure dept_id is valid
    
    first = random.choice(FIRST_NAMES)
    last = random.choice(LAST_NAMES)
    
    city = random.choice(CITIES)
    state = STATES[CITIES.index(city)]
    
    employee_data.append(Row(
        employee_id=emp_id,
        dept_id=dept_id,
        first_name=first,
        middle_name=random.choice([None, "Xavier", "Yancy"]), # Allow for null middle_name
        last_name=last,
        
        # Nested Phone Structure (Non-Nullable fields generated)
        phone_num=Row(
            country_code=1,
            area_code=random.randint(200, 999),
            exchange_code=random.randint(100, 999),
            subscriber_number=random.randint(1000, 9999)
        ),
        
        email_id=f"{first.lower()}.{last.lower()}@company.com",
        
        # Nested Address Structure (Non-Nullable fields generated)
        address=generate_address(city, state)
    ))

print(employee_data)

[Row(employee_id=1001, dept_id=30, first_name='Hannah', middle_name=None, last_name='Taylor', phone_num=Row(country_code=1, area_code=600, exchange_code=372, subscriber_number=2883), email_id='hannah.taylor@company.com', address=Row(street_name='530 Pine St', unit_number=None, city='Boston', county='Boston County', state='MA', zip_code=30932, extended_zip_code=Row(sector=94, segment=54))), Row(employee_id=1002, dept_id=10, first_name='Alice', middle_name='Xavier', last_name='Moore', phone_num=Row(country_code=1, area_code=979, exchange_code=942, subscriber_number=8967), email_id='alice.moore@company.com', address=Row(street_name='126 Oak St', unit_number=None, city='New York', county='New York County', state='NY', zip_code=37130, extended_zip_code=Row(sector=20, segment=13))), Row(employee_id=1003, dept_id=50, first_name='Charlie', middle_name=None, last_name='Brown', phone_num=Row(country_code=1, area_code=640, exchange_code=291, subscriber_number=4687), email_id='charlie.brown@compan

In [127]:
df_employee = spark.createDataFrame(employee_data, schema=employee_schema)

df_employee.show()

+-----------+-------+----------+-----------+---------+-------------------+--------------------+--------------------+
|employee_id|dept_id|first_name|middle_name|last_name|          phone_num|            email_id|             address|
+-----------+-------+----------+-----------+---------+-------------------+--------------------+--------------------+
|       1001|     30|    Hannah|       NULL|   Taylor|{1, 600, 372, 2883}|hannah.taylor@com...|{530 Pine St, NUL...|
|       1002|     10|     Alice|     Xavier|    Moore|{1, 979, 942, 8967}|alice.moore@compa...|{126 Oak St, NULL...|
|       1003|     50|   Charlie|       NULL|    Brown|{1, 640, 291, 4687}|charlie.brown@com...|{299 Oak St, NULL...|
|       1004|     30|     Ethan|     Xavier|    Davis|{1, 899, 213, 4400}|ethan.davis@compa...|{744 Oak St, 105,...|
|       1005|     20|       Bob|       NULL|    Moore|{1, 817, 117, 4435}|bob.moore@company...|{760 Pine St, 116...|
|       1006|     40|       Bob|      Yancy|    Moore|{1, 474, 5

In [132]:
df_department = spark.createDataFrame(dept_data, schema=dept_schema)

df_department.show()

+-------+-----------+----------------+--------------------+
|dept_id|  dept_name|dept_head_emp_id|         loc_address|
+-------+-----------+----------------+--------------------+
|     10|Engineering|            1010|{833 Oak St, NULL...|
|     20|      Sales|            1020|{857 Main St, 94,...|
|     30|         HR|            1030|{467 Main St, NUL...|
|     40|    Finance|            1040|{627 Pine St, 96,...|
|     50|  Marketing|            1050|{495 Oak St, 110,...|
+-------+-----------+----------------+--------------------+



In [133]:
df_employee.filter(col("employee_id") == 1010).show()

+-----------+-------+----------+-----------+---------+-------------------+--------------------+--------------------+
|employee_id|dept_id|first_name|middle_name|last_name|          phone_num|            email_id|             address|
+-----------+-------+----------+-----------+---------+-------------------+--------------------+--------------------+
|       1010|     40|      Jack|     Xavier|   Taylor|{1, 971, 878, 3574}|jack.taylor@compa...|{986 Main St, 93,...|
+-----------+-------+----------+-----------+---------+-------------------+--------------------+--------------------+



In [135]:
df_employee.alias("e").join(df_department.alias("d"), on=(col("e.employee_id") == col("d.dept_head_emp_id")), how="inner").select(col("e.employee_id")).show()

+-----------+
|employee_id|
+-----------+
|       1010|
|       1020|
|       1030|
|       1040|
|       1050|
+-----------+



26/01/03 13:58:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 920639 ms exceeds timeout 120000 ms
26/01/03 13:58:13 WARN SparkContext: Killing executors is not supported by current scheduler.
26/01/03 13:58:14 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:359)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [62]:
spark.stop()