In [0]:
# delte if delta lake exit
# dbutils.fs.rm(delta_table_path,recurse=True)

Out[69]: True

In [0]:
# EXTRACT
# Import the required libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from delta import DeltaTable

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# File location and type
file_location = "/FileStore/tables/Salary_Data-8.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# Define the schema for the data
schema = StructType([
    StructField("Age", IntegerType(), nullable=True),
    StructField("Gender", StringType(), nullable=True),
    StructField("Education_Level", StringType(), nullable=True),
    StructField("Job_Title", StringType(), nullable=True),
    StructField("Years_of_Experience", DoubleType(), nullable=True),
    StructField("Salary", DoubleType(), nullable=True)
])

# Read the CSV file into a DataFrame with defined schema
df = spark.read.format(file_type) \
    .schema(schema)\
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(file_location)

# Define the Delta table path
delta_table_path = "/delta/salary_data"

# Write the DataFrame to Delta Lake with schema migration enabled
df.write.format("delta").mode("overwrite").save(delta_table_path)

# Read the data from Delta Lake into a DataFrame
df = spark.read.format("delta").load(delta_table_path)

df.describe().show()

+-------+-----------------+------+---------------+---------------+-------------------+------------------+
|summary|              Age|Gender|Education_Level|      Job_Title|Years_of_Experience|            Salary|
+-------+-----------------+------+---------------+---------------+-------------------+------------------+
|  count|             6702|  6702|           6701|           6702|               6701|              6699|
|   mean|33.62085944494181|  null|           null|           null|  8.094687360095508|115326.96477086132|
| stddev|7.614632626251299|  null|           null|           null|  6.059003056634108| 52786.18391068295|
|    min|               21|Female|     Bachelor's|Account Manager|                0.0|             350.0|
|    max|               62| Other|            phD|  Web Developer|               34.0|          250000.0|
+-------+-----------------+------+---------------+---------------+-------------------+------------------+



In [0]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Education_Level: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Years_of_Experience: double (nullable = true)
 |-- Salary: double (nullable = true)



In [0]:
# Count missing values of each column
from pyspark.sql.functions import mean, last, col, count, when

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---------------+---------+-------------------+------+
|Age|Gender|Education_Level|Job_Title|Years_of_Experience|Salary|
+---+------+---------------+---------+-------------------+------+
|  2|     2|              3|        2|                  3|     5|
+---+------+---------------+---------+-------------------+------+



In [0]:
# TRANSFRORM 

# Perform data processing on the DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, last, col, count, when, avg


# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Handling missing values
# Get means of columns 'Age', 'Years_of_Experience', 'Salary'
mean_values = df.select(
    mean('Age').alias('Age_mean'),
    mean('Salary').alias('Salary_mean'),
    mean('Years_of_Experience').alias('Years_of_Experience_mean')
).first()

age_mean = mean_values['Age_mean']
years_exp_mean = mean_values['Years_of_Experience_mean']
salary_mean = mean_values['Salary_mean']

print(years_exp_mean, salary_mean, age_mean)

# Replace null value by mean in columns 'Age', 'Years_of_Experience', 'Salary'
df = df.fillna({'Age': age_mean, 'Years_of_Experience': years_exp_mean, 'Salary': salary_mean}, subset=['Age', 'Years_of_Experience', 'Salary'])

# Get last value of column 'Gender', 'Education_Level', 'Job_Title'
last_row = df.agg(*[last(col_name).alias(col_name) for col_name in ['Gender', 'Education_Level', 'Job_Title']])

# Replace null value by last value of each column
df = df.fillna(last_row.first().asDict(), subset=['Gender', 'Education_Level', 'Job_Title'])

# show result
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

8.094687360095508 115326.96477086132 33.62085944494181
+---+------+---------------+---------+-------------------+------+
|Age|Gender|Education_Level|Job_Title|Years_of_Experience|Salary|
+---+------+---------------+---------+-------------------+------+
|  0|     0|              0|        0|                  0|     0|
+---+------+---------------+---------+-------------------+------+



In [0]:
# Perform average calculation on column "Salary"
average_salary = df.select(avg(col("Salary"))).first()[0]

# Create a new column "Above Average Salary" to highlight higher-than-average salaries
df = df.withColumn("Above_Average_Salary", when(col("Salary") > average_salary, 1).otherwise(0))

In [0]:
df.show()

+---+------+---------------+--------------------+-------------------+--------+--------------------+
|Age|Gender|Education_Level|           Job_Title|Years_of_Experience|  Salary|Above_Average_Salary|
+---+------+---------------+--------------------+-------------------+--------+--------------------+
| 32|  Male|     Bachelor's|   Software Engineer|                5.0| 90000.0|                   0|
| 28|Female|       Master's|        Data Analyst|                3.0| 65000.0|                   0|
| 45|  Male|            PhD|      Senior Manager|               15.0|150000.0|                   1|
| 36|Female|     Bachelor's|     Sales Associate|                7.0| 60000.0|                   0|
| 52|  Male|       Master's|            Director|               20.0|200000.0|                   1|
| 29|  Male|     Bachelor's|   Marketing Analyst|                2.0| 55000.0|                   0|
| 42|Female|       Master's|     Product Manager|               12.0|120000.0|                   1|


In [0]:
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(delta_table_path)


In [0]:
df.printSchema()


root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = false)
 |-- Education_Level: string (nullable = false)
 |-- Job_Title: string (nullable = false)
 |-- Years_of_Experience: double (nullable = false)
 |-- Salary: double (nullable = false)
 |-- Above_Average_Salary: integer (nullable = false)



In [0]:
### LOAD
import datetime
# Load data to PostgreSQL
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://dpg-cgcb7364dad7accgg5bg-a.frankfurt-postgres.render.com/kietitmo_db"
table = "practika"
user = "kietitmo_db_user"
password = "Ph1vwQz38o0LsO3yIRIIjdXzsHdsYodF"

# Get the current date
current_date = datetime.datetime.now().strftime("%Y%m%d")

# Construct the table name with the current date
table_name = f"practika_{current_date}"

# Write the DataFrame to the dynamically named table
df.write.format("jdbc") \
  .option("driver", driver) \
  .option("url", url) \
  .option("dbtable", table_name) \
  .mode("append") \
  .option("user", user) \
  .option("password", password) \
  .save()