In [1]:
pip install pyspark 

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.1.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("ETL").getOrCreate()

In [2]:
departments_df = spark.read.csv("D://Dataengineer//archive//departments.csv" , header = True , inferSchema = True) 
dept_emp_df = spark.read.csv("D://Dataengineer//archive//dept_emp.csv" , header = True , inferSchema = True)
dept_manager_df = spark.read.csv("D://Dataengineer//archive//dept_manager.csv" , header = True , inferSchema = True)
employees_df = spark.read.csv("D://Dataengineer//archive//employees.csv" , header = True , inferSchema = True)
salaries_df = spark.read.csv("D://Dataengineer//archive//salaries.csv" , header = True , inferSchema = True)

departments_df.show()
print("------------------------------------------------------------------------------------")
dept_emp_df.show()
print("------------------------------------------------------------------------------------")
dept_manager_df.show()
print("------------------------------------------------------------------------------------")
employees_df.show()
print("------------------------------------------------------------------------------------")
salaries_df.show()

+-------+------------------+
|dept_no|         dept_name|
+-------+------------------+
|   d009|  Customer Service|
|   d005|       Development|
|   d002|           Finance|
|   d003|   Human Resources|
|   d001|         Marketing|
|   d004|        Production|
|   d006|Quality Management|
|   d008|          Research|
|   d007|             Sales|
+-------+------------------+

------------------------------------------------------------------------------------
+------+-------+----------+----------+
|emp_no|dept_no| from_date|   to_date|
+------+-------+----------+----------+
| 10001|   d005|1986-06-26|9999-01-01|
| 10002|   d007|1996-08-03|9999-01-01|
| 10003|   d004|1995-12-03|9999-01-01|
| 10004|   d004|1986-12-01|9999-01-01|
| 10005|   d003|1989-09-12|9999-01-01|
| 10006|   d005|1990-08-05|9999-01-01|
| 10007|   d008|1989-02-10|9999-01-01|
| 10008|   d005|1998-03-11|2000-07-31|
| 10009|   d006|1985-02-18|9999-01-01|
| 10010|   d004|1996-11-24|2000-06-26|
| 10010|   d006|2000-06-26|999

### Concanting first_name and last_name to full _name column and Dropping Columns First_name and last_name  

In [3]:
from pyspark.sql.functions import lit , concat
employees_df = employees_df.withColumn("full_name" , concat(employees_df.first_name , lit(" ") , employees_df.last_name))
drop_columns = ["first_name" , "last_name"]
employees_df = employees_df.drop(*drop_columns)
employees_df.show()

+------+----------+------+----------+--------------------+
|emp_no|birth_date|gender| hire_date|           full_name|
+------+----------+------+----------+--------------------+
| 10001|1953-09-02|     M|1986-06-26|      Georgi Facello|
| 10002|1964-06-02|     F|1985-11-21|      Bezalel Simmel|
| 10003|1959-12-03|     M|1986-08-28|       Parto Bamford|
| 10004|1954-05-01|     M|1986-12-01|   Chirstian Koblick|
| 10005|1955-01-21|     M|1989-09-12|    Kyoichi Maliniak|
| 10006|1953-04-20|     F|1989-06-02|      Anneke Preusig|
| 10007|1957-05-23|     F|1989-02-10|   Tzvetan Zielinski|
| 10008|1958-02-19|     M|1994-09-15|     Saniya Kalloufi|
| 10009|1952-04-19|     F|1985-02-18|         Sumant Peac|
| 10010|1963-06-01|     F|1989-08-24|  Duangkaew Piveteau|
| 10011|1953-11-07|     F|1990-01-22|          Mary Sluis|
| 10012|1960-10-04|     M|1992-12-18|  Patricio Bridgland|
| 10013|1963-06-07|     M|1985-10-20|    Eberhardt Terkki|
| 10014|1956-02-12|     M|1987-03-11|         Berni Geni

In [4]:
departments_df.printSchema()
employees_df.printSchema()
dept_emp_df.printSchema()
dept_manager_df.printSchema()
salaries_df.printSchema()

root
 |-- dept_no: string (nullable = true)
 |-- dept_name: string (nullable = true)

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- full_name: string (nullable = true)

root
 |-- emp_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- from_date: date (nullable = true)
 |-- to_date: date (nullable = true)

root
 |-- emp_no: integer (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- from_date: date (nullable = true)
 |-- to_date: date (nullable = true)

root
 |-- emp_no: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- from_date: date (nullable = true)
 |-- to_date: date (nullable = true)



In [5]:
duplicate_rows_count = dept_emp_df.count() - dept_emp_df.distinct().count()
print(duplicate_rows_count)

print(dept_manager_df.count() - dept_manager_df.distinct().count())
print(employees_df.count() - employees_df.distinct().count())

0
0
0


## Finding Duplicates in Salary Table
##### Are there multiple salary records for the same employee and time period in the salaries table?

In [6]:
from pyspark.sql.functions import col , max , min , when , lag , lead 
from pyspark.sql import Window
window = Window.partitionBy("emp_no").orderBy("from_date")
salaries_df = salaries_df.withColumn("prev_to_date" , lag("to_date" , 1).over(window)) \
            .withColumn("next_from_date" , lead("from_date" , 1).over(window)).withColumn("prev_salary", lag("salary", 1).over(window))

duplicates = salaries_df.filter(
    (col('from_date') <= col("next_from_date")) &
    (col("prev_to_date") <= col("to_date")) &
    (col("salary") != col("prev_salary"))
)
print(salaries_df.count())
print(duplicates.count())

2844047
2251965


In [7]:
#Creating Function to check Nulls 
def get_null_count(cl , df):
    from pyspark.sql.functions import col
    null_counts = df.filter(col(cl).isNull()).count()
    return null_counts

### Checking nulls in Salary DF

In [8]:
print(get_null_count(cl = "emp_no" , df= salaries_df))
print(get_null_count(cl = "from_date" , df= salaries_df))
print(get_null_count(cl = "to_date" , df= salaries_df))

0
0
0


### Checking nulls in dept_emp , dept_manager

In [9]:
print(get_null_count(cl = "dept_no" , df= dept_emp_df))
print(get_null_count(cl = "dept_no" , df= dept_manager_df))


0
0


## Transformation 

#### Calculate the Employee tenure

In [29]:
from pyspark.sql.functions import current_date , months_between , datediff , round
employees_df = employees_df.withColumn("tenure" , round(months_between(current_date() , col('hire_date'))/12 , 2))
employees_df.show()


+------+----------+------+----------+--------------------+------+
|emp_no|birth_date|gender| hire_date|           full_name|tenure|
+------+----------+------+----------+--------------------+------+
| 10001|1953-09-02|     M|1986-06-26|      Georgi Facello| 38.19|
| 10002|1964-06-02|     F|1985-11-21|      Bezalel Simmel| 38.78|
| 10003|1959-12-03|     M|1986-08-28|       Parto Bamford| 38.02|
| 10004|1954-05-01|     M|1986-12-01|   Chirstian Koblick| 37.76|
| 10005|1955-01-21|     M|1989-09-12|    Kyoichi Maliniak| 34.98|
| 10006|1953-04-20|     F|1989-06-02|      Anneke Preusig| 35.25|
| 10007|1957-05-23|     F|1989-02-10|   Tzvetan Zielinski| 35.56|
| 10008|1958-02-19|     M|1994-09-15|     Saniya Kalloufi| 29.97|
| 10009|1952-04-19|     F|1985-02-18|         Sumant Peac| 39.54|
| 10010|1963-06-01|     F|1989-08-24|  Duangkaew Piveteau| 35.03|
| 10011|1953-11-07|     F|1990-01-22|          Mary Sluis| 34.62|
| 10012|1960-10-04|     M|1992-12-18|  Patricio Bridgland| 31.71|
| 10013|19

### Calculate longest server employee

In [66]:
from pyspark.sql.functions import max
employees_df.sort(employees_df.tenure.desc()).select('full_name' , 'tenure').head(1)

[Row(full_name='Margareta Markovitch', tenure=39.67)]

In [61]:
help(max)

Help on function max in module pyspark.sql.functions:

max(col: 'ColumnOrName') -> pyspark.sql.column.Column
    Aggregate function: returns the maximum value of the expression in a group.
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    col : :class:`~pyspark.sql.Column` or str
        target column to compute on.
    
    Returns
    -------
    :class:`~pyspark.sql.Column`
        column for computed results.
    
    Examples
    --------
    >>> df = spark.range(10)
    >>> df.select(max(col("id"))).show()
    +-------+
    |max(id)|
    +-------+
    |      9|
    +-------+



## Calculate increase Salary Percentage

In [36]:
from pyspark.sql.functions import abs , when ,lit
win = Window.partitionBy("emp_no").orderBy("from_date")
salaries_df = salaries_df.withColumn("prev_salary" ,lag("salary" , 1).over(win))
salaries_df = salaries_df.withColumn("salary_increase" , \
                                     when(col("prev_salary").isNotNull() ,
                                          round(abs((col("prev_salary")- col("salary")) / col("prev_salary")) * 100 , 2)).otherwise(lit(0)))
salaries_df.show()

+------+------+----------+----------+------------+--------------+-----------+---------------+
|emp_no|salary| from_date|   to_date|prev_to_date|next_from_date|prev_salary|salary_increase|
+------+------+----------+----------+------------+--------------+-----------+---------------+
| 10010| 72488|1996-11-24|1997-11-24|        NULL|    1997-11-24|       NULL|            0.0|
| 10010| 74347|1997-11-24|1998-11-24|  1997-11-24|    1998-11-24|      72488|           2.56|
| 10010| 75405|1998-11-24|1999-11-24|  1998-11-24|    1999-11-24|      74347|           1.42|
| 10010| 78194|1999-11-24|2000-11-23|  1999-11-24|    2000-11-23|      75405|            3.7|
| 10010| 79580|2000-11-23|2001-11-23|  2000-11-23|    2001-11-23|      78194|           1.77|
| 10010| 80324|2001-11-23|9999-01-01|  2001-11-23|          NULL|      79580|           0.93|
| 10011| 42365|1990-01-22|1991-01-22|        NULL|    1991-01-22|       NULL|            0.0|
| 10011| 44200|1991-01-22|1992-01-22|  1991-01-22|    1992-0

### Calculate Average salary deparment wise

In [55]:
from pyspark.sql.functions import avg
Avg_salary = salaries_df.join(dept_emp_df.select("emp_no" , "dept_no") , "emp_no").join(departments_df.select("dept_no" , "dept_name") , "dept_no")
Avg_salary.groupBy('dept_name').agg(avg('salary').alias('avg_salary')).show()



+------------------+------------------+
|         dept_name|        avg_salary|
+------------------+------------------+
|             Sales| 80667.60575533769|
|Quality Management| 57251.27191341599|
|           Finance| 70489.36489699609|
|        Production|59605.482461651445|
|          Research|  59665.1817012686|
|  Customer Service| 58770.36647976248|
|         Marketing| 71913.20000419153|
|       Development| 59478.90116243182|
|   Human Resources| 55574.87936969553|
+------------------+------------------+



In [18]:
help(months_between)

Help on function months_between in module pyspark.sql.functions:

months_between(date1: 'ColumnOrName', date2: 'ColumnOrName', roundOff: bool = True) -> pyspark.sql.column.Column
    Returns number of months between dates date1 and date2.
    If date1 is later than date2, then the result is positive.
    A whole number is returned if both inputs have the same day of month or both are the last day
    of their respective months. Otherwise, the difference is calculated assuming 31 days per month.
    The result is rounded off to 8 digits unless `roundOff` is set to `False`.
    
    .. versionadded:: 1.5.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    date1 : :class:`~pyspark.sql.Column` or str
        first date column.
    date2 : :class:`~pyspark.sql.Column` or str
        second date column.
    roundOff : bool, optional
        whether to round (to 8 digits) the final value or not (default: True).
    
    Returns
    -----

## 