##### SparkSession

In [1]:
import os
import logging
from pyspark.sql import SparkSession

#os.environ["PYSPARK_PYTHON"] = '/opt/.venv/bin/python'
#os.environ["SPARK_HOME"] = '/opt/spark'

def init_spark():
    """
    Initializes SparkSession and returns SparkSession and SparkContext objects
    
    Returns:
        SparkSession, SparkContext: objects of SparkSession and SparkContext respectively
    """
    logging.info("Initializing Spark session")
    spark = SparkSession.builder\
                        .appName("query")\
                        .master("spark://spark-master:7077")\
                        .config("spark.jars", "../spark-apps/packages/postgresql-42.2.22.jar")\
                        .config("spark.executor.memory", "512m")\
                        .getOrCreate()
    logging.info("Spark session initialized successfully")
    return spark

spark = init_spark()

23/02/13 20:44:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/13 20:44:48 WARN SparkContext: Please ensure that the number of slots available on your executors is limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource then dynamic allocation will not work properly!


In [7]:
POSTGRES_DB = os.getenv('POSTGRES_DB')
POSTGRES_USER = os.getenv('POSTGRES_USER')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD')
POSTGRES_ENDPOINT = "postgres_database:5432"

departments = spark.read \
                   .format("jdbc") \
                   .option("url", f"jdbc:postgresql://{POSTGRES_ENDPOINT}/{POSTGRES_DB}") \
                   .option("dbtable", "dbo.departments") \
                   .option("user", POSTGRES_USER) \
                   .option("password", POSTGRES_PASSWORD) \
                   .option("driver", "org.postgresql.Driver") \
                   .load()

hired_employees = spark.read \
                   .format("jdbc") \
                   .option("url", f"jdbc:postgresql://{POSTGRES_ENDPOINT}/{POSTGRES_DB}") \
                   .option("dbtable", "dbo.hired_employees") \
                   .option("user", POSTGRES_USER) \
                   .option("password", POSTGRES_PASSWORD) \
                   .option("driver", "org.postgresql.Driver") \
                   .load()

jobs = spark.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{POSTGRES_ENDPOINT}/{POSTGRES_DB}") \
               .option("dbtable", "dbo.jobs") \
               .option("user", POSTGRES_USER) \
               .option("password", POSTGRES_PASSWORD) \
               .option("driver", "org.postgresql.Driver") \
               .load()

- Number of employees hired for each job and department in 2021 divided by quarter. The table must be ordered alphabetically by department and job

In [41]:
from pyspark.sql.types import DateType
from pyspark.sql import functions as f

df_1 = hired_employees.join(departments, 
                            hired_employees.department_id == departments.id)\
                      .join(jobs,
                            hired_employees.job_id == jobs.id)\
                      .drop('id')

#df_1 = df_1.withColumn("datetime", f.to_date("datetime", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
#df_1 = df_1.where(f.year(df_1.datetime) == 2021)

DataFrame[name: string, datetime: string, department_id: bigint, job_id: bigint, department: string, job: string, fecha: date]

In [46]:
# parseamos la columna datetime
df_1 = df_1.withColumn("fecha", df_1["datetime"].cast(DateType()))

# agrupamos los registros por "department", "job" y "quarter" y count()
df = df_1.groupBy("department", "job", f.quarter("fecha").alias("quarter")) \
  .agg(f.count("name").alias("num_empleados"))

# ordenamos por "department" y "job"
df = df.orderBy("department", "job")

df.groupBy("department", "job").pivot("quarter").agg(f.sum("num_empleados")).na.fill(0).show()



+--------------------+--------------------+----+---+---+---+---+
|          department|                 job|null|  1|  2|  3|  4|
+--------------------+--------------------+----+---+---+---+---+
|           Marketing|               Nurse|   0|  0|  1|  0|  0|
|Business Development|        Accountant I|   0|  0|  0|  1|  0|
|Research and Deve...|        Geologist IV|   0|  0|  0|  1|  0|
|            Services|   Assistant Manager|   0|  1|  0|  0|  0|
|         Engineering|     Project Manager|   0|  0|  2|  1|  0|
|           Marketing|Sales Representative|   0|  0|  2|  0|  1|
|          Accounting|      Programmer III|   0|  0|  0|  1|  0|
|            Services|Occupational Ther...|   0|  0|  1|  1|  0|
|               Sales|     Web Developer I|   0|  1|  0|  0|  0|
|         Engineering|Software Test Eng...|   0|  0|  1|  0|  1|
|Business Development|     Legal Assistant|   0|  1|  0|  1|  1|
|            Training|             Actuary|   0|  0|  0|  1|  0|
|Research and Deve...|   

                                                                                

- Number of employees hired for each job and department in 2021 divided by quarter. The table must be ordered alphabetically by department and job

In [2]:
# CODE