In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session with Delta Lake package
spark = SparkSession.builder \
    .appName("InsertToPostgreSQL") \
    .master("spark://dbms-spark-master:7077") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://dbms-minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio_user") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio_password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()


In [2]:
# Read the lakehouse table and convert it to pandas dataframe
delta_table = "unified_cv"
minio_bucket='cleaned-bucket'
table_location = f"s3a://{minio_bucket}/unified/{delta_table}"

unified_df = spark.read.format("delta").load(table_location)
unified_df = unified_df.toPandas()

In [3]:
unified_df

Unnamed: 0,Category,Skill,Company,Project,Profession,Summary,Educations,Experiences,Skills,Awards,certifications,languages,references
0,IT Technician,,,,Information Technology Technician I,Versatile Systems Administrator possessing sup...,"[{'start_year': None, 'is_current': False, 'en...","[{'start_date': {'year': 2007, 'month': 8}, 'i...",[],[],"[{'year': None, 'issuing_organization': 'CompT...",[],[]
1,IT Technician,,,,Information Technology Manager,Possesses an extensive background in Informati...,"[{'start_year': None, 'is_current': False, 'en...","[{'start_date': {'year': 2013, 'month': 4}, 'i...","['Word', 'Excel', 'Access', 'Outlook', 'PowerP...",[],"[{'year': None, 'issuing_organization': '', 'd...",[],[]
2,System Engineer,,,,RF Systems Engineer,Multidisciplinary background: RF hardware desi...,"[{'start_year': 2011, 'is_current': False, 'en...","[{'start_date': {'year': 2014, 'month': 5}, 'i...","['Microsoft office', 'Office', 'Matlab', 'Exce...",[],[],[],[]
3,HR,,,,HR Personnel Assistant,I am a U.S. citizen who is authorized to work ...,"[{'start_year': 1998, 'is_current': False, 'en...","[{'start_date': {'year': 2013, 'month': 3}, 'i...","['Microsoft Word', 'MS Excel', 'MS Outlook', '...",[],[],[],[]
4,Designer,,,,Floral Designer,Personable Customer Service Associate dedicate...,"[{'start_year': None, 'is_current': False, 'en...","[{'start_date': {'year': 2013, 'month': 6}, 'i...","['Inventory control', 'Employee scheduling', '...",[],[],[],[]
...,...,...,...,...,...,...,...,...,...,...,...,...,...
987,Testing,"[excel, office, word]",TRANS POWER SOLUTIONS,[],,,,,,,,,
988,Testing,[xp],"Minilec India Pvt Ltd , Pirangoot.",[],,,,,,,,,
989,Testing,[],M/S Silverline Electricals Pvt. Ltd,[],,,,,,,,,
990,Testing,"[matlab, pcb, design]",RB Electronics,[],,,,,,,,,


In [6]:
# PUT SOME TRANSFORMATION HERE IF NEEDED
# For example: count of skill occurrences
import pandas as pd

skill_col = unified_df['Skill']
skill_dict = {}
for skill_list in skill_col:
    if skill_list is None: continue
    for skill in skill_list:
        skill = skill.lower()
        if skill_dict.get(skill):
            skill_dict[skill] += 1
        else:
            skill_dict[skill] = 1
skill_df = pd.DataFrame(skill_dict.items(), columns=["Skill", "Count"])
skill_df

Unnamed: 0,Skill,Count
0,javascript,90
1,jquery,56
2,python,116
3,statsmodels,4
4,aws,29
...,...,...
247,vision,8
248,cpp,10
249,webdriver,10
250,xp,20


In [None]:
! pip install psycopg2-binary
! pip install sqlalchemy

In [8]:
# Import to database
# NOTE: this function will OVERWRITE the table if already exists
def import_to_db(df, table_name):
    from sqlalchemy import create_engine
    host = "dbms-dbms-241.e.aivencloud.com"
    port = "13375"
    database = "defaultdb"
    user = ""
    password = ""
    engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}")

    df.to_sql(table_name, engine, if_exists='replace', index=False)
    engine.dispose()


In [9]:
import_to_db(unified_df, "unified_cv")

In [10]:
import_to_db(skill_df, "skill_count")