In [0]:
%run "../source_bronze/source_bronze_utils"

In [0]:
df_employee = spark.read.table("employee_info_abu.dim_employee")
df_employee.show()

+-----------+-------------+----------+-------+------+---+----------+
|employee_id|employee_name|department|country|salary|age| load_date|
+-----------+-------------+----------+-------+------+---+----------+
|          1|        James|      D101|     IN|  9000| 25|2025-04-19|
|          2|       Michel|      D102|     SA|  8000| 26|2025-04-19|
|          3|    James son|      D101|     IN| 10000| 35|2025-04-19|
|          4|       Robert|      D103|     MY| 11000| 34|2025-04-19|
|          5|        Scott|      D104|     MA|  6000| 36|2025-04-19|
|          6|          Gen|      D105|     JA| 21345| 24|2025-04-19|
|          7|         John|      D102|     MY| 87654| 40|2025-04-19|
|          8|        Maria|      D105|     SA| 38144| 38|2025-04-19|
|          9|        Soffy|      D103|     IN| 23456| 29|2025-04-19|
|         10|          Amy|      D103|     CN| 21345| 24|2025-04-19|
+-----------+-------------+----------+-------+------+---+----------+



In [0]:
df_department = spark.read.table("employee_info_abu.dim_depart")
df_department.show()

+-------------+---------------+----------+
|department_id|department_name| load_date|
+-------------+---------------+----------+
|         D101|          Sales|2025-04-19|
|         D102|      Marketing|2025-04-19|
|         D103|        Finance|2025-04-19|
|         D104|        Support|2025-04-19|
|         D105|             HR|2025-04-19|
+-------------+---------------+----------+



In [0]:
from pyspark.sql.functions import col
emp_df = df_employee.alias("emp")
dept_df = df_department.alias("dept")
df_employee_join = emp_df.join(
    dept_df,
    col("emp.department") == col("dept.department_id"),
    "left"
)
df_result = df_employee_join.select(
    col("emp.employee_name").alias("emp_name"),
    col("emp.salary"),
    col("dept.department_name")  
)
df_result.show()
from pyspark.sql.functions import sum

department_salary = df_result.groupBy("department_name") \
    .agg(sum("salary").alias("total_salary")) \
    .orderBy("total_salary", ascending=False)


department_salary.show()














+---------+------+---------------+
| emp_name|salary|department_name|
+---------+------+---------------+
|    James|  9000|          Sales|
|   Michel|  8000|      Marketing|
|James son| 10000|          Sales|
|   Robert| 11000|        Finance|
|    Scott|  6000|        Support|
|      Gen| 21345|             HR|
|     John| 87654|      Marketing|
|    Maria| 38144|             HR|
|    Soffy| 23456|        Finance|
|      Amy| 21345|        Finance|
+---------+------+---------------+

+---------------+------------+
|department_name|total_salary|
+---------------+------------+
|      Marketing|       95654|
|             HR|       59489|
|        Finance|       55801|
|          Sales|       19000|
|        Support|        6000|
+---------------+------------+



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType,LongType

# Define snake_case schema for the employee table
gold_schema = StructType([
    StructField("department_name", StringType(), True),
    StructField("total_salary", LongType(), True),  # Use LongType
    StructField("at_load_date", DateType(), True)
])

# Create empty DataFrame with schema
df_gold= spark.createDataFrame([], schema=gold_schema)

# Set database, table name, and location
db_name = "ben_gold_employee"
table_name = "fact_employee"
table_path = f"/FileStore/gold/employee/{db_name}/{table_name}"

# Create the database if not exists
spark.sql(f"CREATE schema IF NOT EXISTS {db_name}")

if not spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}"):
    df_gold.write.format("delta").mode("overwrite").save(table_path)
    spark.sql(f"""
        CREATE TABLE {db_name}.{table_name}
        USING DELTA
        LOCATION '{table_path}'
    """)

print(f" Empty table `{db_name}.{table_name}` created at {table_path}")



 Empty table `ben_gold_employee.fact_employee` created at /FileStore/gold/employee/ben_gold_employee/fact_employee


In [0]:
department_salary = department_salary.withColumn("at_load_date", current_date())

department_salary.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("ben_gold_employee.fact_employee")

#df = spark.read.format("delta").load(table_path)
#df.show()

In [0]:
department_salary.show()

+---------------+------------+
|department_name|total_salary|
+---------------+------------+
|      Marketing|       95654|
|             HR|       59489|
|        Finance|       55801|
|          Sales|       19000|
|        Support|        6000|
+---------------+------------+



In [0]:
%sql
select * from ben_gold_employee.fact_employee

department_name,total_salary,at_load_date
Marketing,95654,2025-04-19
HR,59489,2025-04-19
Finance,55801,2025-04-19
Sales,19000,2025-04-19
Support,6000,2025-04-19


In [0]:
df_country = spark.read.table("employee_info_abu.dim_country")
from pyspark.sql.functions import current_date
df_modified_count=df_country.withColumn("load_date",current_date())
df_modified_count.show()

+------------+------------+----------+
|country_code|country_name| load_date|
+------------+------------+----------+
|          CN|       China|2025-04-19|
|          IN|       India|2025-04-19|
|          SA|South Africa|2025-04-19|
|          JA|       Japan|2025-04-19|
|          MY|    Malaysia|2025-04-19|
|          MA|     Morocco|2025-04-19|
+------------+------------+----------+



In [0]:
from pyspark.sql.functions import col,count,avg
count_df=df_country.alias("count")
emp1_df = df_employee.alias("emp1")
dept1_df = df_department.alias("dept1")

df_join=emp1_df.join(dept1_df,col("emp1.department") == col("dept1.department_id"),
                     "left") \
               .join(count_df,col("emp1.country")==col("count.country_code"),"left")
df_join.show()
df_cleaned = df_join.filter(col("employee_id").isNotNull())

df1_result = df_cleaned.select(
    col("emp1.employee_name").alias("emp_name"),
    col("emp1.salary"),
    col("dept1.department_name"),
    col("count.country_name")

    
) 

df1_result.show()              
department = df1_result.groupBy("department_name","country_name") \
    .agg(count("*").alias("employee_count")) \
    .orderBy("department_name","country_name")


department.show()


+-----------+-------------+----------+-------+------+---+----------+-------------+---------------+----------+------------+------------+---------+
|employee_id|employee_name|department|country|salary|age| load_date|department_id|department_name| load_date|country_code|country_name|load_date|
+-----------+-------------+----------+-------+------+---+----------+-------------+---------------+----------+------------+------------+---------+
|          1|        James|      D101|     IN|  9000| 25|2025-04-19|         D101|          Sales|2025-04-19|          IN|       India|     null|
|          2|       Michel|      D102|     SA|  8000| 26|2025-04-19|         D102|      Marketing|2025-04-19|          SA|South Africa|     null|
|          3|    James son|      D101|     IN| 10000| 35|2025-04-19|         D101|          Sales|2025-04-19|          IN|       India|     null|
|          4|       Robert|      D103|     MY| 11000| 34|2025-04-19|         D103|        Finance|2025-04-19|          MY|  

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType,LongType

# Define snake_case schema for the employee table
gold1_schema = StructType([
    StructField("department_name", StringType(), True),
    StructField("country_name", StringType(), True),  
    StructField("employee_count", IntegerType(), True),
    StructField("at_load_date", DateType(), True)
])

# Create empty DataFrame with schema
df_gold1= spark.createDataFrame([], schema=gold1_schema)
db_name = "ben_gold_employee"
table_name = "fact1_employee"
table_path = f"/FileStore/gold/employee/{db_name}/{table_name}"

# Create the database if not exists
spark.sql(f"CREATE schema IF NOT EXISTS {db_name}")

# Create the table only if it doesn't exist
if not spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}"):
    df_gold1.write.format("delta").mode("overwrite").save(table_path)
    spark.sql(f"""
        CREATE TABLE {db_name}.{table_name}
        USING DELTA
        LOCATION '{table_path}'
    """)

print(f" Empty table `{db_name}.{table_name}` created at {table_path}")





 Empty table `ben_gold_employee.fact1_employee` created at /FileStore/gold/employee/ben_gold_employee/fact1_employee


In [0]:
%sql
desc extended ben_gold_employee.fact1_employee

col_name,data_type,comment
department_name,string,
country_name,string,
employee_count,int,
at_load_date,date,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,ben_gold_employee,
Table,fact1_employee,
Created Time,Sat Apr 19 17:21:50 UTC 2025,


In [0]:
#department = department.withColumn("at_load_date", current_date())
from pyspark.sql.functions import col
from pyspark.sql.types import LongType, StringType, DateType
department=department.select(col("employee_count").cast(IntegerType()))


department.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("ben_gold_employee.fact1_employee")



In [0]:
department.show()

+--------------+
|employee_count|
+--------------+
|             1|
|             1|
|             1|
|             1|
|             1|
|             1|
|             1|
|             2|
|             1|
+--------------+



In [0]:
%sql
select * from ben_gold_employee.fact1_employee

department_name,country_name,employee_count,at_load_date
,,1,
,,1,
,,1,
,,1,
,,1,
,,1,
,,1,
,,2,
,,1,


In [0]:
from pyspark.sql.functions import col,count
count1_df=df_country.alias("count1")
emp2_df = df_employee.alias("emp2")
dept2_df = df_department.alias("dept2")
df_dept_count=emp2_df.join(dept2_df,col("emp2.department") == col("dept2.department_id"),"left") \
                     .join(count1_df,col("emp2.country") == col("count1.country_code"),"left")
#df_dept_count.show() 
df_res=  df_dept_count.select(col("emp2.employee_name").alias("emp2_name"),col("country_name").alias("country"),col("dept2.department_name")) 
df_res.show()  
df_group= df_res.groupBy("department_name") \
              .agg(count("*").alias("country_name"))  
df_group.show()                       

+---------+------------+---------------+
|emp2_name|     country|department_name|
+---------+------------+---------------+
|    James|       India|          Sales|
|   Michel|South Africa|      Marketing|
|James son|       India|          Sales|
|   Robert|    Malaysia|        Finance|
|    Scott|     Morocco|        Support|
|      Gen|       Japan|             HR|
|     John|    Malaysia|      Marketing|
|    Maria|South Africa|             HR|
|    Soffy|       India|        Finance|
|      Amy|       China|        Finance|
+---------+------------+---------------+

+---------------+------------+
|department_name|country_name|
+---------------+------------+
|          Sales|           2|
|             HR|           2|
|        Finance|           3|
|      Marketing|           2|
|        Support|           1|
+---------------+------------+



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType,LongType

# Define snake_case schema for the employee table
gold2_schema = StructType([
    StructField("emp2_name", StringType(), True),
    StructField("country", StringType(), True),  
    StructField("department_name", StringType(), True),
    StructField("at_load_date", DateType(), True)
])

# Create empty DataFrame with schema
df_gold2= spark.createDataFrame([], schema=gold1_schema)
db_name = "ben_gold_employee"
table_name = "fact2_employee"
table_path = f"/FileStore/gold/employee/{db_name}/{table_name}"

# Create the database if not exists
spark.sql(f"CREATE schema IF NOT EXISTS {db_name}")

# Create the table only if it doesn't exist
if not spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}"):
    df_gold2.write.format("delta").mode("overwrite").save(table_path)
    spark.sql(f"""
        CREATE TABLE {db_name}.{table_name}
        USING DELTA
        LOCATION '{table_path}'
    """)

print(f" Empty table `{db_name}.{table_name}` created at {table_path}")

 Empty table `ben_gold_employee.fact2_employee` created at /FileStore/gold/employee/ben_gold_employee/fact2_employee


In [0]:
df_res = df_res.withColumn("at_load_date", current_date())
df_res.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("ben_gold_employee.fact2_employee")

In [0]:
%sql
select * from ben_gold_employee.fact2_employee

department_name,country_name,employee_count,at_load_date,emp2_name,country
Sales,,,2025-04-19,James,India
Marketing,,,2025-04-19,Michel,South Africa
Sales,,,2025-04-19,James son,India
Finance,,,2025-04-19,Robert,Malaysia
Support,,,2025-04-19,Scott,Morocco
HR,,,2025-04-19,Gen,Japan
Marketing,,,2025-04-19,John,Malaysia
HR,,,2025-04-19,Maria,South Africa
Finance,,,2025-04-19,Soffy,India
Finance,,,2025-04-19,Amy,China


In [0]:
from pyspark.sql.functions import avg,col    

df4_result = df_join.select(
    col("emp1.employee_name").alias("employ_name"),col("emp1.age").alias("emp_age"),
    col("dept1.department_name"))
#from pyspark.sql.functions import avg    

df_age=df4_result.groupBy("department_name") \
           .agg(avg("emp_age").alias("avg_age"))\
           .orderBy("department_name","avg_age") 
df_age.show()            

+---------------+-------+
|department_name|avg_age|
+---------------+-------+
|        Finance|   29.0|
|             HR|   31.0|
|      Marketing|   33.0|
|          Sales|   30.0|
|        Support|   36.0|
+---------------+-------+



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType,LongType

# Define snake_case schema for the employee table
gold2_schema = StructType([  
    StructField("department_name", StringType(), True),
    StructField("avg_age",IntegerType(),True),
    StructField("at_load_date", DateType(), True)
])

# Create empty DataFrame with schema
df_gold3= spark.createDataFrame([], schema=gold1_schema)
db_name = "ben_gold_employee"
table_name = "fact3_employee"
table_path = f"/FileStore/gold/employee/{db_name}/{table_name}"

# Create the database if not exists
spark.sql(f"CREATE schema IF NOT EXISTS {db_name}")

# Create the table only if it doesn't exist
if not spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}"):
    df_gold3.write.format("delta").mode("overwrite").save(table_path)
    spark.sql(f"""
        CREATE TABLE {db_name}.{table_name}
        USING DELTA
        LOCATION '{table_path}'
    """)

print(f" Empty table `{db_name}.{table_name}` created at {table_path}")

Empty table `ben_gold_employee.fact3_employee` created at /FileStore/gold/employee/ben_gold_employee/fact3_employee


In [0]:
df_age = df_age.withColumn("at_load_date", current_date())

df_age.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("ben_gold_employee.fact3_employee")


In [0]:
%sql
select * from ben_gold_employee.fact3_employee

department_name,country_name,employee_count,at_load_date,avg_age
Finance,,,2025-04-19,29.0
HR,,,2025-04-19,31.0
Marketing,,,2025-04-19,33.0
Sales,,,2025-04-19,30.0
Support,,,2025-04-19,36.0
