## 1. Introduction
The purpose of this file is:
* explore the data, 
* check data and correct any errors (if there is any),
* determine caradinality between relations: one-to-one, one-to-many, or many-to-many (that may requires furtuer normalization),
* draw ERD.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import to_date, col, max as spark_max, min as spark_min
import time
import os

spark = SparkSession.builder.appName("wisrTest").getOrCreate()
print(f"Spark URL: {spark.sparkContext.uiWebUrl}")

24/09/25 04:27:57 WARN Utils: Your hostname, sam-pc resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp5s0)
24/09/25 04:27:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/25 04:27:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark URL: http://192.168.1.8:4040


## 2. Read from CSV
**need to unzip all csv files to the data folder**

In [2]:
root = 'data'
dfNames = ('departments', 'dept_emp', 'dept_manager',
           'employees', 'salaries', 'titles')
dfs = dict()
for name in dfNames:
    dfs[name] = spark.read.csv(os.path.join(
        root, f'{name}.csv'), header=True, inferSchema=False)
    print('-' * 50)
    print(f'table name: {name}')
    print(f'row count: {dfs[name].count()}')
    dfs[name].printSchema()
    dfs[name].show(1)

--------------------------------------------------
table name: departments
row count: 9
root
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)

+-------+---------+
|dept_no|dept_name|
+-------+---------+
|   d001|Marketing|
+-------+---------+
only showing top 1 row

--------------------------------------------------
table name: dept_emp
row count: 331603
root
 |-- emp_no: string (nullable = true)
 |-- dept_no: string (nullable = true)

+------+-------+
|emp_no|dept_no|
+------+-------+
| 10001|   d005|
+------+-------+
only showing top 1 row

--------------------------------------------------
table name: dept_manager
row count: 24
root
 |-- dept_no: string (nullable = true)
 |-- emp_no: string (nullable = true)

+-------+------+
|dept_no|emp_no|
+-------+------+
|   d001|110022|
+-------+------+
only showing top 1 row

--------------------------------------------------
table name: employees
row count: 300024
root
 |-- emp_no: string (nullable = true)
 |--

## 3. Convert dtype
convert all columns that is not string to the correct data type, in the meantime, check quality of the data (if convertion fails, corresponding entry will be null).
### 3.1. numeric primary keys to integer
create a new column with converted values, check if there is any null. rename if all good

In [3]:
for name in ('dept_emp', 'dept_manager', 'employees', 'salaries'):
    # invalid entries will be converted to null
    dfs[name] = dfs[name].withColumn("emp_no_int", col("emp_no").cast("int"))
    null_count = dfs[name].filter(col("emp_no_int").isNull()).count()
    print(f'null count: {null_count}')
    if null_count == 0:
        dfs[name] = dfs[name].drop(
            'emp_no').withColumnRenamed("emp_no_int", "emp_no")

null count: 0
null count: 0
null count: 0
null count: 0


### 3.2. salary to decimal

In [4]:
name = 'salaries'
dfs[name] = dfs[name].withColumn(
    "salary_dec", col("salary").cast(DecimalType(10, 2)))
null_count = dfs[name].filter(col("salary_dec").isNull()).count()
print(f'null count: {null_count}')
if null_count == 0:
    dfs[name] = dfs[name].drop(
        'salary').withColumnRenamed("salary_dec", "salary")

max_val = dfs[name].agg(spark_max("salary")).collect()[0][0]
print(f'max salary: {max_val}')
min_val = dfs[name].agg(spark_min("salary")).collect()[0][0]
print(f'min salary: {min_val}')

null count: 0
max salary: 129492.00
min salary: 40000.00


### 3.3. birth_date and hire_date to date

In [5]:
name = 'employees'
for col_name in ('birth_date', 'hire_date'):
    new_name = col_name + '_date'
    dfs[name] = dfs[name].withColumn(
        new_name, to_date(col(col_name), "M/d/yyyy"))
    null_count = dfs[name].filter(col(new_name).isNull()).count()
    print(f'null count: {null_count}')
    if null_count == 0:
        dfs[name] = dfs[name].drop(
            col_name).withColumnRenamed(new_name, col_name)

null count: 0
null count: 0


### 3.4. print new schema

In [6]:
for name in dfNames:
    print('-' * 50)
    print(f'table name: {name}')
    dfs[name].printSchema()
    dfs[name].show(1)

--------------------------------------------------
table name: departments
root
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)

+-------+---------+
|dept_no|dept_name|
+-------+---------+
|   d001|Marketing|
+-------+---------+
only showing top 1 row

--------------------------------------------------
table name: dept_emp
root
 |-- dept_no: string (nullable = true)
 |-- emp_no: integer (nullable = true)

+-------+------+
|dept_no|emp_no|
+-------+------+
|   d005| 10001|
+-------+------+
only showing top 1 row

--------------------------------------------------
table name: dept_manager
root
 |-- dept_no: string (nullable = true)
 |-- emp_no: integer (nullable = true)

+-------+------+
|dept_no|emp_no|
+-------+------+
|   d001|110022|
+-------+------+
only showing top 1 row

--------------------------------------------------
table name: employees
root
 |-- emp_title_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: s

## 4. Create Temp View and Do Checks

In [7]:
for name in dfNames:
    df = dfs[name]
    df.createOrReplaceTempView(name)

### 4.1. is primary key unique? select duplicate primary keys

In [8]:
dfsPK = {
    'departments': 'dept_no',
    'dept_emp': 'emp_no, dept_no',
    'dept_manager': 'emp_no, dept_no',
    'employees': 'emp_no',
    'salaries': 'emp_no',
    'titles': 'title_id'
}

for name, pk in dfsPK.items():
    query = f"""
        SELECT {pk}, 
               COUNT(*) AS dup_count 
        FROM {name} 
        GROUP BY {pk} 
        HAVING COUNT(*) > 1;
    """
    print(query)
    df = spark.sql(query).toPandas()
    display(df)
    print('-' * 50)


        SELECT dept_no, 
               COUNT(*) AS dup_count 
        FROM departments 
        GROUP BY dept_no 
        HAVING COUNT(*) > 1;
    


Unnamed: 0,dept_no,dup_count


--------------------------------------------------

        SELECT emp_no, dept_no, 
               COUNT(*) AS dup_count 
        FROM dept_emp 
        GROUP BY emp_no, dept_no 
        HAVING COUNT(*) > 1;
    


Unnamed: 0,emp_no,dept_no,dup_count


--------------------------------------------------

        SELECT emp_no, dept_no, 
               COUNT(*) AS dup_count 
        FROM dept_manager 
        GROUP BY emp_no, dept_no 
        HAVING COUNT(*) > 1;
    


Unnamed: 0,emp_no,dept_no,dup_count


--------------------------------------------------

        SELECT emp_no, 
               COUNT(*) AS dup_count 
        FROM employees 
        GROUP BY emp_no 
        HAVING COUNT(*) > 1;
    


Unnamed: 0,emp_no,dup_count


--------------------------------------------------

        SELECT emp_no, 
               COUNT(*) AS dup_count 
        FROM salaries 
        GROUP BY emp_no 
        HAVING COUNT(*) > 1;
    


Unnamed: 0,emp_no,dup_count


--------------------------------------------------

        SELECT title_id, 
               COUNT(*) AS dup_count 
        FROM titles 
        GROUP BY title_id 
        HAVING COUNT(*) > 1;
    


Unnamed: 0,title_id,dup_count


--------------------------------------------------


### 4.2. is primary key not null?

In [9]:
for name, pk in dfsPK.items():
    pk_split = [f'{p.strip()} is null' for p in pk.split(',')]
    query = f"""
        SELECT {pk} 
        FROM {name} 
        WHERE {' AND '.join(pk_split)};
    """
    print(query)
    df = spark.sql(query).toPandas()
    display(df)
    print('-' * 50)


        SELECT dept_no 
        FROM departments 
        WHERE dept_no is null;
    


Unnamed: 0,dept_no


--------------------------------------------------

        SELECT emp_no, dept_no 
        FROM dept_emp 
        WHERE emp_no is null AND dept_no is null;
    


Unnamed: 0,emp_no,dept_no


--------------------------------------------------

        SELECT emp_no, dept_no 
        FROM dept_manager 
        WHERE emp_no is null AND dept_no is null;
    


Unnamed: 0,emp_no,dept_no


--------------------------------------------------

        SELECT emp_no 
        FROM employees 
        WHERE emp_no is null;
    


Unnamed: 0,emp_no


--------------------------------------------------

        SELECT emp_no 
        FROM salaries 
        WHERE emp_no is null;
    


Unnamed: 0,emp_no


--------------------------------------------------

        SELECT title_id 
        FROM titles 
        WHERE title_id is null;
    


Unnamed: 0,title_id


--------------------------------------------------


### 4.3. is sex only M and F?

In [10]:
spark.sql("""
    SELECT DISTINCT sex 
    FROM employees
""").toPandas()

Unnamed: 0,sex
0,F
1,M


## 5. Determine Cardinality

### 5.1. employees and salaries
looks like it's one-to-one relationship, since salary_count is all 1 and they both have same number of rows.  
if there is a 0 in salary_count, that means some employees don't have salary record;  
if there is a value higher than 1, that means an employee can have multiple records in salaries table.

In [11]:
spark.sql("""
    SELECT 
        (SELECT COUNT(*) FROM employees) AS employees_count,
        (SELECT COUNT(*) FROM salaries) AS salaries_count;
""").toPandas()

Unnamed: 0,employees_count,salaries_count
0,300024,300024


In [12]:
spark.sql("""
    SELECT emp_no, 
           COUNT(salary) AS salary_count
    FROM employees
    LEFT JOIN salaries USING (emp_no)
    GROUP BY emp_no
    HAVING salary_count != 1
    ORDER BY salary_count ASC;
""").toPandas()

Unnamed: 0,emp_no,salary_count


### 5.2. employees and titles
one-to-many relationship: one employee has one and only one title, each title can be held by multiple employees

In [13]:
spark.sql("""
    SELECT 
        (SELECT COUNT(*) FROM employees) AS employees_count,
        (SELECT COUNT(*) FROM titles) AS titles_count;
""").toPandas()

Unnamed: 0,employees_count,titles_count
0,300024,7


In [14]:
spark.sql("""
    SELECT emp_no, 
           COUNT(e.emp_title_id) AS title_count
    FROM employees e
    LEFT JOIN titles t ON e.emp_title_id = t.title_id
    GROUP BY emp_no
    HAVING title_count != 1
    ORDER BY title_count ASC;
""").toPandas()

Unnamed: 0,emp_no,title_count


In [15]:
spark.sql("""
    SELECT t.title_id, 
           COUNT(emp_no) AS emp_count
    FROM titles t
    LEFT JOIN employees e ON t.title_id = e.emp_title_id
    GROUP BY t.title_id
    ORDER BY emp_count ASC;
""").toPandas()

Unnamed: 0,title_id,emp_count
0,m0001,24
1,e0001,5835
2,e0004,15148
3,s0002,26583
4,e0002,47303
5,e0003,97747
6,s0001,107384


### 5.3. employees and dept_manager
an employee can appear 0 or 1 times in dept_manager, i.e. manage 0 or 1 departments

In [16]:
spark.sql("""
    SELECT emp_no, 
           COUNT(dept_no) AS dept_count
    FROM employees
    LEFT JOIN dept_manager USING (emp_no)
    GROUP BY emp_no
    ORDER BY dept_count DESC;
""").toPandas()

Unnamed: 0,emp_no,dept_count
0,111035,1
1,110420,1
2,110765,1
3,110854,1
4,110039,1
...,...,...
300019,235030,0
300020,70335,0
300021,96067,0
300022,296491,0


### 5.4. departments and dept_manager
similary, a department can appear in multiple rows in dept_manager, i.e. a department is managed by multiple managers. Thus one-to-many

### 5.5. employees and dept_emp
an employee can belong to 1 or 2 departments. Thus one-to-many

In [17]:
spark.sql("""
    SELECT emp_no, 
           COUNT(dept_no) AS dept_count
    FROM employees
    LEFT JOIN dept_emp USING (emp_no)
    GROUP BY emp_no
    ORDER BY dept_count DESC;
""").toPandas()

Unnamed: 0,emp_no,dept_count
0,30361,2
1,41946,2
2,249022,2
3,50223,2
4,299003,2
...,...,...
300019,235030,1
300020,70335,1
300021,96067,1
300022,296491,1


In [18]:
# for instance, an employee can belong to both production and customer service departments
spark.sql("""
    SELECT * 
    FROM dept_emp 
    JOIN departments USING (dept_no) 
    WHERE emp_no IN (30361, 41946);
""").toPandas()

Unnamed: 0,dept_no,emp_no,dept_name
0,d005,30361,Development
1,d008,30361,Research
2,d004,41946,Production
3,d009,41946,Customer Service


### 5.6. departments and dept_emp
similary, a department can appear in multiple rows in dept_emp, i.e. a department has multiple employees. Thus one-to-many.

## 6. Final ERD
![](static/ERD.svg)