In [1]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Downloading psycopg2_binary-2.9.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.9


In [30]:
import pandas as pd
import psycopg2
import os
# import sys
# sys.path.append('/opt/airflow/jobs/python/')

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from sqlalchemy import create_engine
# from psycopg2 import OperationalError
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload

In [6]:
spark = SparkSession.builder \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.7.0") \
        .master("local") \
        .appName('EmployeeData') \
        .getOrCreate()
print(f'The current spark version is {spark.version}')

The current spark version is 3.5.0


In [7]:
# Data warehouse configuration
pg_username = "DWH_HR_ricky"
pg_password = "DWH_HR_test"

In [8]:
jdbc_url = "jdbc:postgresql://172.18.0.3:5432/DWH_HR"
connection_properties = {
    "user":pg_username,
    "password":pg_password,
    "driver": "org.postgresql.Driver"
}
spark_df_emp = spark.read.jdbc(url=jdbc_url, table="public.fact_tbl_hr", properties=connection_properties)
print("******** PostgreSQL DB connection success! ******** ")

******** PostgreSQL DB connection success! ******** 


In [9]:
spark_df_emp.createOrReplaceTempView("employee")
spark_df_emp.printSchema()

root
 |-- employeeid: integer (nullable = true)
 |-- employeename: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- title: string (nullable = true)
 |-- employeesalary: integer (nullable = true)
 |-- bonusovertime: integer (nullable = true)
 |-- salarydate: string (nullable = true)
 |-- trainingname: string (nullable = true)
 |-- startdate: string (nullable = true)
 |-- enddate: string (nullable = true)
 |-- iscurrent: string (nullable = true)
 |-- trainingreview: string (nullable = true)
 |-- trainingrating: float (nullable = true)
 |-- performancecomment: string (nullable = true)



In [10]:
spark_df_emp.show(5)

+----------+----------------+------+---+----------+--------------------+--------------+-------------+----------+-----------------+----------+----------+---------+--------------+--------------+--------------------+
|employeeid|    employeename|gender|age|department|               title|employeesalary|bonusovertime|salarydate|     trainingname| startdate|   enddate|iscurrent|trainingreview|trainingrating|  performancecomment|
+----------+----------------+------+---+----------+--------------------+--------------+-------------+----------+-----------------+----------+----------+---------+--------------+--------------+--------------------+
|       102|  Ralph Gonzalez|  Male| 35|Operations|     Project Manager|         88926|          332|2024-09-01|     Advanced SQL|2024-06-16|2024-09-06|Completed|       Q2 2023|           2.6|    Good performance|
|       103|       Ian Perez|Female| 27|       R&D|       Data Engineer|         92656|          176|2024-09-01|Leadership Skills|2024-08-01|202

In [11]:
# Staging DB configuration
pg_username = "DWH_HR_ricky"
pg_password = "DWH_HR_test"

In [12]:
jdbc_url = "jdbc:postgresql://172.18.0.3:5432/staging_db"
connection_properties = {
    "user":pg_username,
    "password":pg_password,
    "driver": "org.postgresql.Driver"
}
spark_df_candidate = spark.read.jdbc(url=jdbc_url, table="public.data_recruitment_selection_update", properties=connection_properties)

In [13]:
spark_df_candidate.createOrReplaceTempView("candidate")
spark_df_candidate.printSchema()

root
 |-- candidateid: integer (nullable = true)
 |-- candidatename: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- applicationdate: string (nullable = true)
 |-- status: string (nullable = true)
 |-- interviewdate: string (nullable = true)
 |-- offerstatus: string (nullable = true)



In [14]:
spark_df_candidate.show(5)

+-----------+----------------+------+---+----------------+---------------+-----------+-------------+-----------+
|candidateid|   candidatename|gender|age|           title|applicationdate|     status|interviewdate|offerstatus|
+-----------+----------------+------+---+----------------+---------------+-----------+-------------+-----------+
|          1|    Kevin Wright|Female| 35| DevOps Engineer|     2024-09-02|   Rejected|             |           |
|          2|  Ralph Gonzalez|  Male| 35| Project Manager|     2023-11-20|   Rejected|   2024-05-11|      Hired|
|          3|       Ian Perez|Female| 27|   Data Engineer|     2024-08-23|   Rejected|             |      Hired|
|          4|  Gregory Romero|  Male| 59|      HR Manager|     2024-05-31|Interviewed|             |      Hired|
|          5|Dawn Johnson DDS|  Male| 55|Security Analyst|     2024-02-17|   Rejected|   2024-01-18|      Hired|
+-----------+----------------+------+---+----------------+---------------+-----------+----------

In [15]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df_emp = spark_df_emp.toPandas()
# displaying the DataFrame
display(pandas_df_emp)

Unnamed: 0,employeeid,employeename,gender,age,department,title,employeesalary,bonusovertime,salarydate,trainingname,startdate,enddate,iscurrent,trainingreview,trainingrating,performancecomment
0,102,Ralph Gonzalez,Male,35,Operations,Project Manager,88926,332,2024-09-01,Advanced SQL,2024-06-16,2024-09-06,Completed,Q2 2023,2.6,Good performance
1,103,Ian Perez,Female,27,R&D,Data Engineer,92656,176,2024-09-01,Leadership Skills,2024-08-01,2024-09-05,Ongoing,Q3 2023,1.5,Good performance
2,104,Gregory Romero,Male,59,Marketing,HR Manager,60965,4268,2024-09-01,Leadership Skills,2024-04-03,2024-09-11,Completed,Q4 2023,4.2,Good performance
3,105,Dawn Johnson DDS,Male,55,Legal,Security Analyst,110724,2853,2024-08-30,Leadership Skills,2024-04-25,2024-09-01,Ongoing,Q1 2023,4.7,Needs improvement
4,107,Sarah Perry DDS,Female,42,Legal,Network Administrator,66286,755,2024-09-03,Leadership Skills,2024-05-14,2024-08-22,Completed,Q3 2023,4.7,Excellent performance
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
551,1194,David Mcbride,Male,44,Finance,HR Manager,80800,3435,2024-08-30,Cloud Computing,2024-03-18,2024-09-12,Completed,Q2 2023,1.5,Needs improvement
552,1195,Michelle Edwards,Female,23,Operations,Business Analyst,72341,923,2024-09-10,Leadership Skills,2024-08-26,2024-08-22,Completed,Q3 2023,2.5,Very good performance
553,1197,Loretta Chang,Male,31,Sales,Technical Support,92534,3918,2024-08-19,Advanced SQL,2024-07-31,2024-09-02,Completed,Q1 2023,4.9,Very good performance
554,1198,Samantha Serrano,Female,38,Customer Service,Software Engineer,61338,666,2024-08-26,Cloud Computing,2024-04-17,2024-08-21,Ongoing,Q2 2023,2.4,Needs improvement


In [47]:
# Convert Spark DataFrame to Pandas DataFrame
pandas_df_candidate= spark_df_candidate.toPandas()
# displaying the DataFrame
display(pandas_df_candidate)

Unnamed: 0,candidateid,candidatename,gender,age,title,applicationdate,status,interviewdate,offerstatus
0,1,Kevin Wright,Female,35,DevOps Engineer,2024-09-02,Rejected,,
1,2,Ralph Gonzalez,Male,35,Project Manager,2023-11-20,Rejected,2024-05-11,Hired
2,3,Ian Perez,Female,27,Data Engineer,2024-08-23,Rejected,,Hired
3,4,Gregory Romero,Male,59,HR Manager,2024-05-31,Interviewed,,Hired
4,5,Dawn Johnson DDS,Male,55,Security Analyst,2024-02-17,Rejected,2024-01-18,Hired
...,...,...,...,...,...,...,...,...,...
1095,1096,Christopher Jackson,Male,22,Web Developer,2023-09-27,Interviewed,2024-07-15,
1096,1097,Loretta Chang,Male,31,Technical Support,2024-04-26,Rejected,,Hired
1097,1098,Samantha Serrano,Female,38,Software Engineer,2023-11-04,Rejected,,Hired
1098,1099,Maurice Pearson,Male,36,Security Analyst,2023-12-07,Interviewed,2024-01-13,Hired


In [44]:
# Save Pandas DataFrame to CSV
csv_filename_1 = "employee_data.csv"
pandas_df_emp.to_csv(csv_filename_1, index=False)

In [48]:
csv_filename_2 = "candidate_data.csv"
pandas_df_candidate.to_csv(csv_filename_1, index=False)

In [40]:
# Google Sheets API setup
SCOPES = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive.file']
# Get the current working directory
current_dir = os.getcwd()
print(current_dir)
# Load credentials from the same directory as the script
creds = Credentials.from_authorized_user_file(os.path.join(current_dir, 'client_secret.json'))
# Authorize the Google Sheets API
client = gspread.authorize(creds)
service = build('sheets', 'v4', credentials=creds)
drive_service = build('drive', 'v3', credentials=creds)

/home/jovyan


ValueError: Authorized user info was not in the expected format, missing fields client_secret, client_id, refresh_token.

In [41]:
# Create a new Google Sheet
sheet_1 = service.spreadsheets().create(body={
    'properties': {'title': 'Employee Data'}
}).execute()
sheet_id_1 = sheet_1['spreadsheetId']

NameError: name 'service' is not defined

In [None]:
# Create a new Google Sheet
sheet_2 = service.spreadsheets().create(body={
    'properties': {'title': 'Candidate Data'}
}).execute()
sheet_id_2 = sheet_2['spreadsheetId']

In [42]:
# Upload CSV to Google Drive
file_metadata_1 = {'name': csv_filename_1, 'parents': [sheet_id_1]}
media_1 = MediaFileUpload(csv_filename_1, resumable=True)
file = drive_service.files().create(body=file_metadata_1, media_body=media, fields='id').execute()

NameError: name 'csv_filename' is not defined

In [None]:
# Upload CSV to Google Drive
file_metadata_2 = {'name': csv_filename_2, 'parents': [sheet_id_2]}
media_2 = MediaFileUpload(csv_filename_2, resumable=True)
file = drive_service.files().create(body=file_metadata_2, media_body=media_2, fields='id').execute()

In [46]:
# Import CSV data to the Google Sheet
request_body_1 = {
    'requests': [{
        'pasteData': {
            'coordinate': {
                'sheetId': 0,
                'rowIndex': 0,
                'columnIndex': 0
            },
            'data': open(csv_filename_1, 'r').read(),
            'type': 'PASTE_NORMAL',
            'delimiter': ','
        }
    }]
}
service.spreadsheets().batchUpdate(spreadsheetId=sheet_id_1, body=request_body_1).execute()

NameError: name 'service' is not defined

In [None]:
# Import CSV data to the Google Sheet
request_body_2 = {
    'requests': [{
        'pasteData': {
            'coordinate': {
                'sheetId': 0,
                'rowIndex': 0,
                'columnIndex': 0
            },
            'data': open(csv_filename_2, 'r').read(),
            'type': 'PASTE_NORMAL',
            'delimiter': ','
        }
    }]
}
service.spreadsheets().batchUpdate(spreadsheetId=sheet_id_2, body=request_body_2).execute()

In [None]:
print(f"Data uploaded to Google Sheet: https://docs.google.com/spreadsheets/d/{sheet_id_1}")

In [None]:
print(f"Data uploaded to Google Sheet: https://docs.google.com/spreadsheets/d/{sheet_id_2}")