Importing libraries

In [16]:
import os
import boto3
import gzip
import json
import pyspark.sql.functions as F
from dotenv import load_dotenv
from io import BytesIO
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Date, Boolean, Text, BigInteger
from datetime import datetime, timedelta
# Load environment variables
load_dotenv(override=True)

True

In [277]:
## print database connection string
print(os.getenv('POSTGRES_CONNECTION_STRING'))

postgresql://airflow:airflow@localhost:5432/job_ads_db


## Preparation

Connecting to S3 bucket

In [13]:
# Creating Boto3 Session
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_REGION')
aws_bucket_name = os.getenv('AWS_BUCKET_NAME')

session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=aws_region
)
print(session)

prefix = 'DE/monthly/'

# Create an S3 client
s3 = boto3.client('s3')


Session(region_name='eu-central-1')


Creating Spark Session

In [14]:
ROOT_DIR = os.path.abspath(os.pardir)
spark = SparkSession.builder \
    .master("local") \
    .appName("DE-Project") \
    .config("spark.jars", ROOT_DIR+"/postgresql-42.6.0.jar") \
    .config("spark.driver.extraClassPath", ROOT_DIR+"/postgresql-42.6.0.jar") \
    .config("spark.executor.extraClassPath", ROOT_DIR+"/postgresql-42.6.0.jar") \
    .getOrCreate()
# .config("spark.sql.shuffle.partitions", "50") \
# .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f512b5318d0>


Connecting to Postgres

In [18]:
connection_string = os.getenv('POSTGRES_CONNECTION_STRING')
engine = create_engine(connection_string, isolation_level="AUTOCOMMIT")
print(engine)
pgconn = engine.connect()
print(pgconn)

Engine(postgresql://airflow:***@localhost:5432/job_ads_db)
<sqlalchemy.engine.base.Connection object at 0x7f511f1e1db0>


In [278]:
# Create a table if it doesn't exist
metadata = MetaData()
# table = Table('tk_2023_07', metadata)

if pgconn.dialect.has_table(pgconn, 'tk_2023_07'):
  print('Table already exists.')
else:
    print('Creating table...')
    table = Table(os.getenv("POSTGRES_TABLE"), metadata,
                Column('id', Integer, primary_key=True),
                Column('job_id', String(32)),
                Column('posting_count', Integer),
                Column('source_website_count', Integer),
                Column('date', Date),
                Column('sequence_number', BigInteger),
                Column('expiration_date', Date),
                Column('expired', Boolean),
                Column('duration', Integer),
                Column('source_url', String(255)),
                Column('source_website', String(255)),
                Column('source_type', String(2)),
                # Column('duplicate', Boolean),
                # Column('first_posting', Boolean),
                Column('posting_id', String(32)),
                Column('duplicate_on_jobsite', Boolean),
                Column('via_intermediary', Boolean),
                Column('language', String(3)),
                Column('job_title', String(255)),
                Column('profession', String(4)),
                Column('profession_group', String(4)),
                Column('profession_class', String(4)),
                Column('profession_isco_code', String(10)),
                Column('location', String(5)),
                Column('location_name', String(255)),
                Column('location_coordinates', String(30)),
                Column('location_remote_possible', Boolean),
                Column('region', String(2)),
                Column('education_level', String(2)),
                Column('advertiser_name', String(255)),
                Column('advertiser_type', String(2)),
                Column('advertiser_street', String(255)),
                Column('advertiser_postal_code', String(15)),
                Column('advertiser_location', String(255)),
                Column('advertiser_phone', String(255)),
                Column('available_contact_fields', String(100)),
                # Column('organization', Integer),
                Column('organization_name', String(255)),
                Column('organization_industry', String(2)),
                Column('organization_activity', String(10)),
                Column('organization_size', String(2)),
                Column('organization_address', String(255)),
                Column('organization_street_number', String(100)),
                Column('organization_postal_code', String(5)),
                Column('organization_location', String(5)),
                Column('organization_location_name', String(255)),
                Column('organization_region', String(2)),
                Column('contract_type', String(2)),
                Column('working_hours_type', String(1)),
                Column('hours_per_week_from', Integer),
                Column('hours_per_week_to', Integer),
                Column('employment_type', String(1)),
                Column('full_text', Text),
                Column('job_description', Text),
                Column('candidate_description', Text),
                Column('conditions_description', Text),
                # Column('professional_skill_terms', Text),
                Column('soft_skills', Text),
                Column('professional_skills', Text),
                Column('advertiser_house_number', String(15)),
                Column('advertiser_email', String(255)),
                Column('advertiser_website', String(255)),
                Column('advertiser_contact_person', String(255)),
                Column('advertiser_reference_number', String(255)),
                Column('application_description', Text),
                Column('organization_website', String(100)),
                Column('employer_description', Text),
                Column('language_skills', Text),
                Column('it_skills', Text),
                Column('organization_linkedin_id', String(255)),
                Column('organization_national_id', String(25)),
                Column('experience_years_from', Integer),
                Column('salary', Integer),
                Column('salary_from', Integer),
                Column('salary_to', Integer),
                Column('experience_years_to', Integer),
                Column('advertiser_spend', Integer),
                Column('apply_url', String(255)),
                Column('experience_level', String(17)),
                Column('location_postal_code', String(7)),
                Column('profession_kldb_code', String(5)),
                Column('profession_onet_2019_code', String(10)),
                Column('salary_from_rate', String(10)),
                Column('salary_time_scale', String(1)),
                Column('salary_to_rate', String(10))
                )

metadata.create_all(engine)
  
                                     

Table already exists.


In [279]:
# sellect these columns  'advertiser_spend', 'apply_url', 'experience_level', 'location_postal_code',
# 'profession_kldb_code', 'profession_onet_2019_code', 'salary_from_rate',
# 'salary_time_scale', 'salary_to_rate' wwhere no coolumn is null
df.select(  'advertiser_spend', 'apply_url', 'experience_level', 'location_postal_code','profession_kldb_code', 'profession_onet_2019_code', 'salary_from_rate', 'salary_time_scale', 'salary_to_rate').where(F.col('advertiser_spend').isNotNull() & F.col('apply_url').isNotNull() & F.col('experience_level').isNotNull() & F.col('location_postal_code').isNotNull() & F.col('profession_kldb_code').isNotNull() & F.col('profession_onet_2019_code').isNotNull() & F.col('salary_from_rate').isNotNull() & F.col('salary_time_scale').isNotNull() & F.col('salary_to_rate').isNotNull()).show(truncate=False)

                                                                                

+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------+--------------------+-------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------+----------------+-----------------+--------------+
|advertiser_spend|apply_url                                                                                                                                                                                                                                                                                  |experience_level              |location_postal_code|profession_kldb_code                       

In [281]:
# Ffind max llength off advertiser_spend column
df.select(F.max(F.length('advertiser_spend'))).show()

[Stage 1183:>                                                       (0 + 1) / 1]

+-----------------------------+
|max(length(advertiser_spend))|
+-----------------------------+
|                            5|
+-----------------------------+


                                                                                

In [169]:
# Show organization_national_id column where length is greater than 25
df.select('source_url').where(F.length('source_url') > 255).show(truncate=False)
# print the count of organization_national_id column where length is greater than 25
print(df.select('organization_national_id').where(F.length('organization_national_id') > 25).count())

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|source_url                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------------------------------------------------



5


                                                                                

## Extract

Outputting the list of files in the bucket

In [160]:
# Get the list of objects in the S3 bucket
response = s3.list_objects_v2(Bucket=aws_bucket_name, Prefix=prefix, Delimiter='/')
print(response)


{'ResponseMetadata': {'RequestId': 'H059ACCSCESZWVCV', 'HostId': 'cugJXxl6lGJCAq/CZTVuU+Z+EnbgZgqTxaVdHugw6kOem0YcbxrtNOsL4VXf3hVC6g+bgTydXGw=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'cugJXxl6lGJCAq/CZTVuU+Z+EnbgZgqTxaVdHugw6kOem0YcbxrtNOsL4VXf3hVC6g+bgTydXGw=', 'x-amz-request-id': 'H059ACCSCESZWVCV', 'date': 'Tue, 22 Aug 2023 11:49:21 GMT', 'x-amz-bucket-region': 'eu-central-1', 'content-type': 'application/xml', 'transfer-encoding': 'chunked', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Name': 'jobfeed-data-feeds', 'Prefix': 'DE/monthly/', 'Delimiter': '/', 'MaxKeys': 1000, 'CommonPrefixes': [{'Prefix': 'DE/monthly/2020-06/'}, {'Prefix': 'DE/monthly/2020-07/'}, {'Prefix': 'DE/monthly/2020-08/'}, {'Prefix': 'DE/monthly/2020-09/'}, {'Prefix': 'DE/monthly/2020-10/'}, {'Prefix': 'DE/monthly/2020-11/'}, {'Prefix': 'DE/monthly/2020-12/'}, {'Prefix': 'DE/monthly/2021-01/'}, {'Prefix': 'DE/monthly/2021-02/'}, {'Prefix': 'DE/monthly/2021-03/'}, {'Prefix':

Downloading the files from the bucket

In [161]:
#Number of Months to download
months = 6
# Number of files per month to download
files_per_month = 1
# current project directory parent path
ROOT_DIR = os.path.abspath(os.pardir)

# Get the list of subfolders in the S3 bucket
subfolders = [obj['Prefix'] for obj in response['CommonPrefixes']]
# Get the last N subfolders - N = months of data to download
subfolders = subfolders[-months:]


filesToLoadInDF = []
# Download files from each subfolder
for subfolder in subfolders:
    # Get the list of files in the subfolder
    response = s3.list_objects_v2(Bucket=aws_bucket_name, Prefix=subfolder)
    # Get the file paths
    files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.jsonl.gz')]
    # Only get the first N files
    files = files[:files_per_month]

    # filesToLoadInDF = [filesToLoadInDF.append(f) for f in files]

    # Create the folder in your local machine
    folder = ROOT_DIR + "/data/raw/" + aws_bucket_name + "/" + subfolder
    if not os.path.exists(folder):
        os.makedirs(folder)
    # Download and extract each file
    for file in files:
        filename = file.rsplit("/", 1)[1]
        print('Downloading file {}...'.format(filename))
        print(subfolder + filename)
        print(folder + filename)

        # Check if the file already exists
        localExtractedFilePath = os.path.join(folder + filename[:-3])
        if not os.path.exists(localExtractedFilePath):
            # Download and Save the file
            s3.download_file(Filename=folder + filename, Bucket=aws_bucket_name, Key=subfolder + filename)

            locaFilePath = os.path.join(folder + filename)
            print(localExtractedFilePath)
            filesToLoadInDF.append(localExtractedFilePath)
            # Extract the data from the gzipped file
            with gzip.open(locaFilePath, 'rb') as gz_file, open(localExtractedFilePath, 'wb') as extract_file:
                extract_file.write(gz_file.read())

            # Delete the gzipped file
            os.remove(locaFilePath)
        else:
            filesToLoadInDF.append(localExtractedFilePath)
            print('File already exists. Skipping...')

Downloading file jobs.0.jsonl.gz...
DE/monthly/2023-02/jobs.0.jsonl.gz
/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-02/jobs.0.jsonl.gz
/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-02/jobs.0.jsonl
Downloading file jobs.0.jsonl.gz...
DE/monthly/2023-03/jobs.0.jsonl.gz
/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-03/jobs.0.jsonl.gz
/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-03/jobs.0.jsonl
Downloading file jobs.0.jsonl.gz...
DE/monthly/2023-04/jobs.0.jsonl.gz
/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-04/jobs.0.jsonl.gz
/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-04/jobs.0.jsonl
Downloading file jobs.0.jsonl.gz...
DE/monthly/2023-05/jobs.0.jsonl.gz
/home/maher/Githu

Reading the files into a Spark Dataframe

In [163]:
print(filesToLoadInDF)

['/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-02/jobs.0.jsonl', '/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-03/jobs.0.jsonl', '/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-04/jobs.0.jsonl', '/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-05/jobs.0.jsonl', '/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-06/jobs.0.jsonl', '/home/maher/GithubProjects/Bigdata-Processing-pipeline/data/raw/jobfeed-data-feeds/DE/monthly/2023-07/jobs.0.jsonl']


In [164]:
df = spark.read.json(filesToLoadInDF)  # Use the extracted file paths here
df.show(truncate=False)

                                                                                

+-------------------------+----------------------------+-----------------------+-------------------+-------------------------------------+------------------------------+----------------------+---------------------------------+----------------+----------------------+----------------------------------+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [165]:
df.printSchema()

root
 |-- advertiser_contact_person: string (nullable = true)
 |-- advertiser_email: string (nullable = true)
 |-- advertiser_house_number: string (nullable = true)
 |-- advertiser_location: string (nullable = true)
 |-- advertiser_name: string (nullable = true)
 |-- advertiser_phone: string (nullable = true)
 |-- advertiser_postal_code: string (nullable = true)
 |-- advertiser_reference_number: string (nullable = true)
 |-- advertiser_spend: long (nullable = true)
 |-- advertiser_street: string (nullable = true)
 |-- advertiser_type: struct (nullable = true)
 |    |-- label: string (nullable = true)
 |    |-- value: long (nullable = true)
 |-- advertiser_website: string (nullable = true)
 |-- application_description: string (nullable = true)
 |-- apply_url: string (nullable = true)
 |-- available_contact_fields: string (nullable = true)
 |-- candidate_description: string (nullable = true)
 |-- conditions_description: string (nullable = true)
 |-- contract_type: struct (nullable = true

In [166]:
df.count()

                                                                                

600000

Extract Last 6 months Data from Postgres into Spark Dataframe

In [26]:
six_months_ago = datetime.now() - timedelta(days=70)
six_months_ago = six_months_ago.strftime("%Y-%m-%d")
# where_clause = "(SELECT * FROM tk_2023_07 WHERE date >= '" + six_months_ago + "') as tk"
where_clause = "(SELECT * FROM tk_2023_07 WHERE date >= '" + six_months_ago + "') as tk"
print(where_clause)
postgres_data = spark.read.format("jdbc") \
    .option("url", os.getenv('POSTGRES_CONNECTION_JDBC_STRING')) \
    .option("dbtable", where_clause) \
    .option("user", os.getenv('POSTGRES_USER')) \
    .option("password", os.getenv('POSTGRES_PASSWORD')) \
    .option("driver", "org.postgresql.Driver") \
    .load()

postgres_data.show(truncate=False)

(SELECT * FROM tk_2023_07 WHERE date >= '2023-06-13') as tk
+---+------+-------------+--------------------+----+---------------+---------------+-------+--------+----------+--------------+-----------+----------+--------------------+----------------+--------+---------+----------+----------------+----------------+--------------------+--------+-------------+--------------------+------------------------+------+---------------+---------------+---------------+-----------------+----------------------+-------------------+----------------+------------------------+-----------------+---------------------+---------------------+-----------------+--------------------+--------------------------+------------------------+---------------------+--------------------------+-------------------+-------------+------------------+-------------------+-----------------+---------------+---------+---------------+---------------------+----------------------+-----------+-------------------+-----------------------+----

In [27]:
# Sort the dataframes by date in descending order
df = df.orderBy('date').show(truncate=False)
postgres_data.orderBy('date').show(truncate=False)

                                                                                

+-------------------------------+--------------------------------------------+-----------------------+-------------------+---------------------------------------------+------------------------------+----------------------+---------------------------+----------------+------------------------+----------------------------------+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [41]:
postgres_data_count = postgres_data.count()
postgres_data_count

0

Compare the two dataframes and find the differences

In [42]:
# Find difference between the two dataframes when postgres_data count is greater than 0
if postgres_data_count > 0:
    changes_df = df.join(postgres_data, df.posting_id == postgres_data.posting_id, how='left_anti')
    changes_df.show(truncate=False)
    changes_df.count()
else:
    changes_df = df
    changes_df.show(truncate=False)
    changes_df.count()

+-------------------------+-----------------------------------+-----------------------+------------------------+--------------------------------------------------+----------------+----------------------+---------------------------+----------------+----------------------+----------------------------------+---------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [43]:
changes_df.count()

                                                                                

200000

## Transform

Change Column Types

In [130]:
# df.select(df.soft_skills.label.alias("labels")).show(truncate=False)

df.select(F.slice(df.soft_skills.label, 1, 5).alias("labels")).show(truncate=False)


+------------------------------------------------------------------------------------------------------+
|labels                                                                                                |
+------------------------------------------------------------------------------------------------------+
|[Zuverlässigkeit, Teamarbeit]                                                                         |
|[Freundlichkeit, Teamarbeit]                                                                          |
|[Eigenmotivation, Empathisch]                                                                         |
|null                                                                                                  |
|[Professionelle Verantwortung, Teamarbeit, Eigenmotivation]                                           |
|[Anpassungsfähigkeit, Kreativität, Überzeugungskraft, Stressbewältigung, Professionelle Verantwortung]|
|[Kommunikation]                                       

In [258]:
import pyspark.sql.types as T

  # Change the data types of the columns.
df_types_fixed= df.withColumn("job_id", df.job_id.cast(T.StringType())) \
    .withColumn("posting_count", df.posting_count.cast(T.LongType())) \
    .withColumn("source_website_count", df.source_website_count.cast(T.LongType())) \
    .withColumn("date", df.date.cast(T.DateType())) \
    .withColumn("expiration_date", df.expiration_date.cast(T.DateType())) \
    .withColumn("duration", df.duration.cast(T.LongType())) \
    .withColumn("salary", df.salary.cast(T.LongType())) \
    .withColumn("salary_from", df.salary_from.cast(T.LongType())) \
    .withColumn("salary_to", df.salary_to.cast(T.LongType())) \
    .withColumn("experience_years_from", df.experience_years_from.cast(T.LongType())) \
    .withColumn("experience_years_to", df.experience_years_to.cast(T.LongType())) \
    .withColumn("hours_per_week_from", df.hours_per_week_from.cast(T.LongType())) \
    .withColumn("hours_per_week_to", df.hours_per_week_to.cast(T.LongType())) \
    .withColumn('working_hours_type', df.working_hours_type.value.cast(T.IntegerType())) \
    .withColumn('advertiser_type', df.advertiser_type.value.cast(T.StringType())) \
    .withColumn('contract_type', df.contract_type.value.cast(T.StringType())) \
    .withColumn('education_level', df.education_level.value.cast(T.StringType())) \
    .withColumn('employment_type', df.employment_type.value.cast(T.StringType())) \
    .withColumn('experience_level', df.experience_level.value.cast(T.StringType())) \
    .withColumn("it_skills", F.concat_ws(",", df.it_skills.value)) \
    .withColumn("language_skills", F.concat_ws(",", df.language_skills.value)) \
    .withColumn('organization_activity', df.organization_activity.value.cast(T.StringType())) \
    .withColumn('organization_industry', df.organization_industry.value.cast(T.StringType())) \
    .withColumn('organization_region', df.organization_region.value.cast(T.StringType())) \
    .withColumn('organization_size', df.organization_size.value.cast(T.StringType())) \
    .withColumn('profession', df.profession.value.cast(T.StringType())) \
    .withColumn('profession_class', df.profession_class.value.cast(T.StringType())) \
    .withColumn('profession_group', df.profession_group.value.cast(T.StringType())) \
    .withColumn('profession_isco_code', df.profession_isco_code.value.cast(T.StringType())) \
    .withColumn('profession_kldb_code', df.profession_kldb_code.value.cast(T.StringType())) \
    .withColumn('profession_onet_2019_code', df.profession_onet_2019_code.value.cast(T.StringType())) \
    .withColumn("professional_skills", F.concat_ws(",", df.professional_skills.value)) \
    .withColumn('region', df.region.value.cast(T.StringType())) \
    .withColumn('organization_national_id', F.substring(df.organization_national_id, 1, 25)) \
    .withColumn("soft_skills", F.concat_ws(",", df.soft_skills.value)) \
    .withColumn('source_type', df.source_type.value.cast(T.StringType())) \
    .withColumn('advertiser_email', F.concat_ws(",", F.slice(F.split(df.advertiser_email, ','), 1, 5))) \
    .withColumn('advertiser_website', F.concat_ws(",", F.slice(F.split(df.advertiser_website, ','), 1, 5))) \
    .withColumn('advertiser_contact_person', F.substring(df.advertiser_contact_person, 1, 255)) \
    .withColumn('advertiser_reference_number', F.substring(df.advertiser_reference_number, 1, 255)) \
    .withColumn('organization_website', F.substring(df.organization_website, 1, 100)) \
    .withColumn('organization_linkedin_id', F.substring(df.organization_linkedin_id, 1, 255)) \
    .withColumn('apply_url', F.substring(df.apply_url, 1, 255)) \
    .withColumn('source_url', F.substring(df.source_url, 1, 255)) \
    .withColumn('source_website', F.substring(df.source_website, 1, 255)) \
    .withColumn('advertiser_name', F.substring(df.advertiser_name, 1, 255)) \
    .withColumn('advertiser_phone', F.concat_ws(",", F.slice(F.split(df.advertiser_phone, ','), 1, 10))) \
    
df_types_fixed.printSchema()

root
 |-- advertiser_contact_person: string (nullable = true)
 |-- advertiser_email: string (nullable = false)
 |-- advertiser_house_number: string (nullable = true)
 |-- advertiser_location: string (nullable = true)
 |-- advertiser_name: string (nullable = true)
 |-- advertiser_phone: string (nullable = false)
 |-- advertiser_postal_code: string (nullable = true)
 |-- advertiser_reference_number: string (nullable = true)
 |-- advertiser_spend: long (nullable = true)
 |-- advertiser_street: string (nullable = true)
 |-- advertiser_type: string (nullable = true)
 |-- advertiser_website: string (nullable = false)
 |-- application_description: string (nullable = true)
 |-- apply_url: string (nullable = true)
 |-- available_contact_fields: string (nullable = true)
 |-- candidate_description: string (nullable = true)
 |-- conditions_description: string (nullable = true)
 |-- contract_type: string (nullable = true)
 |-- date: date (nullable = true)
 |-- duplicate_on_jobsite: boolean (nullabl

In [256]:
df_types_fixed.show(truncate=False) 

+-------------------------+----------------------------+-----------------------+-------------------+-------------------------------------+------------------------------+----------------------+---------------------------------+----------------+----------------------+---------------+---------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------

In [242]:
df_types_fixed.select('advertiser_phone', F.concat_ws(",", F.slice(F.split(df.advertiser_phone, ','), 1, 10)).alias("websites")).where(F.length('advertiser_phone') > 255).show(truncate=False)

[Stage 616:>                                                        (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|advertiser_phone                                                                                                                                                                                                                                                       

                                                                                

In [244]:
df_types_fixed.select(F.max(F.length('advertiser_name'))).show(truncate=False)



+----------------------------+
|max(length(advertiser_name))|
+----------------------------+
|742                         |
+----------------------------+


                                                                                

In [245]:
df_types_fixed.select('advertiser_name', F.substring(df.advertiser_name, 1, 255).alias("arn")).where(F.length('advertiser_name') > 255).show(truncate=False)



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [210]:
# find value DFG Graduiertenkolleg 2720 Digital Platform Ecosystems (DPE, DFG Graduiertenkolleg 2720 Digital Platform Ecosystems
df_types_fixed.select('advertiser_reference_number').where(F.col('advertiser_reference_number').contains('DFG Graduiertenkolleg 2720 Digital Platform Ecosystems')).show(truncate=False)



+-------------------------------------------------------------------------------------------------------------------+
|advertiser_reference_number                                                                                        |
+-------------------------------------------------------------------------------------------------------------------+
|DFG Graduiertenkolleg 2720 Digital Platform Ecosystems (DPE, DFG Graduiertenkolleg 2720 Digital Platform Ecosystems|
+-------------------------------------------------------------------------------------------------------------------+


                                                                                

In [148]:
# Show organization_national_id column where length is greater than 25
df_types_fixed.select('organization_national_id').where(F.length('organization_national_id') > 25).show(truncate=False)
# print the count of organization_national_id column where length is greater than 25
print(df_types_fixed.select('organization_national_id').where(F.length('organization_national_id') > 25).count())

                                                                                

+------------------------+
|organization_national_id|
+------------------------+
+------------------------+




0


                                                                                

Check for duplicates

In [257]:
df_types_fixed.count()

                                                                                

600000

In [251]:
# Drop duplicates
# df = df.dropDuplicates()
# Drop duplicates in posting_id column and keep the latest one
df = df.dropDuplicates(['posting_id'])

In [252]:
df.count()

                                                                                

600000

Check for null values

In [260]:
def my_transpose(df, columns=None):

    # Get the column names
    if columns is None:
        columns = ["Labels", "Count"]
    labels = df.columns

    # Get the values
    counts = list(df.take(1)[0].asDict().values())

    # Combine the values and column names into a new DataFrame
    data = [[a, b] for a, b in zip(labels, counts)]

    # Create a new Spark DataFrame
    res = spark.createDataFrame(data, columns)

    # Return the new DataFrame
    return res


In [274]:
# Find count for null in each column
from pyspark.sql.functions import isnan, when, count, col
# df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df_nullCount = df.select([count(when(col(c).isNull(), c)).cast('integer').alias(c) for c in df.columns])
# print each column with null count
# df_nullCount.show(vertical=True)
df_nullCount_transpose = my_transpose(df_nullCount, columns=["Labels", "Null_Counts"])
df_nullCount_transpose.orderBy('Null_Counts', ascending=False).show()



+--------------------+-----------+
|              Labels|Null_Counts|
+--------------------+-----------+
| experience_years_to|     592751|
|experience_years_...|     566191|
|organization_nati...|     555390|
|    salary_from_rate|     551421|
|      salary_to_rate|     551336|
|   salary_time_scale|     550418|
| hours_per_week_from|     505316|
|   hours_per_week_to|     505316|
|         salary_from|     490905|
|              salary|     490469|
|           salary_to|     490469|
|    experience_level|     478283|
|organization_link...|     469321|
|    advertiser_spend|     448374|
|     language_skills|     418529|
|           it_skills|     406166|
|advertiser_refere...|     384190|
|location_postal_code|     356164|
|advertiser_contac...|     307643|
|employer_description|     307553|
+--------------------+-----------+


                                                                                

In [275]:
# df_nullCount.show(vertical=True)
# find null count percentage for each column
rows_count = df.count()
df_null_percentages = df_nullCount.select([((col(c) / rows_count) * 100).alias(c) for c in df_nullCount.columns])
df_null_percentages.show(vertical=True)




-RECORD 0-----------------------------------------
 advertiser_contact_person   | 51.27383333333333  
 advertiser_email            | 24.9245            
 advertiser_house_number     | 50.7285            
 advertiser_location         | 13.184833333333334 
 advertiser_name             | 2.0111666666666665 
 advertiser_phone            | 15.381666666666666 
 advertiser_postal_code      | 14.132166666666668 
 advertiser_reference_number | 64.03166666666667  
 advertiser_spend            | 74.729             
 advertiser_street           | 14.213833333333334 
 advertiser_type             | 0.0                
 advertiser_website          | 24.701333333333334 
 application_description     | 45.13183333333333  
 apply_url                   | 21.391166666666667 
 available_contact_fields    | 0.0                
 candidate_description       | 31.542333333333332 
 conditions_description      | 42.793             
 contract_type               | 0.0                
 date                        | 

                                                                                

In [276]:
# Find the min and max percentage of null values and also the column name
min = 0
max = 0
min_col = ""
max_col = ""
for c in df_null_percentages.columns:
    col_val  =  df_null_percentages.select(c).collect()[0][0]
    if col_val> max:
        max = col_val
        max_col = c
    if col_val < min:
        min = col_val
        min_col = c


print("Min Value is {} | {}".format(min, min_col))
print("Max Value is {} | {}".format(max, max_col))


['advertiser_contact_person', 'advertiser_email', 'advertiser_house_number', 'advertiser_location', 'advertiser_name', 'advertiser_phone', 'advertiser_postal_code', 'advertiser_reference_number', 'advertiser_spend', 'advertiser_street', 'advertiser_type', 'advertiser_website', 'application_description', 'apply_url', 'available_contact_fields', 'candidate_description', 'conditions_description', 'contract_type', 'date', 'duplicate_on_jobsite', 'duration', 'education_level', 'employer_description', 'employment_type', 'experience_level', 'experience_years_from', 'experience_years_to', 'expiration_date', 'expired', 'full_text', 'hours_per_week_from', 'hours_per_week_to', 'it_skills', 'job_description', 'job_id', 'job_title', 'language', 'language_skills', 'location', 'location_coordinates', 'location_name', 'location_postal_code', 'location_remote_possible', 'organization_activity', 'organization_address', 'organization_industry', 'organization_linkedin_id', 'organization_location', 'organi

                                                                                

51.27383333333333


                                                                                

24.9245


                                                                                

50.7285


                                                                                

13.184833333333334


                                                                                

2.0111666666666665


                                                                                

15.381666666666666


                                                                                

14.132166666666668


                                                                                

64.03166666666667


                                                                                

74.729


                                                                                

14.213833333333334


                                                                                

0.0


                                                                                

24.701333333333334


                                                                                

45.13183333333333


                                                                                

21.391166666666667


                                                                                

0.0


                                                                                

31.542333333333332


                                                                                

42.793


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


                                                                                

37.04533333333334


                                                                                

0.0


                                                                                

51.25883333333333


                                                                                

0.0


                                                                                

79.71383333333333


                                                                                

94.36516666666667


                                                                                

98.79183333333333


                                                                                

37.04533333333334


                                                                                

0.0


                                                                                

0.0


                                                                                

84.21933333333334


                                                                                

84.21933333333334


                                                                                

67.69433333333333


                                                                                

33.528999999999996


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


                                                                                

69.75483333333334


                                                                                

12.0245


                                                                                

10.097333333333333


                                                                                

10.703166666666666


                                                                                

59.36066666666666


                                                                                

0.0


                                                                                

16.743


                                                                                

20.5735


                                                                                

0.0


                                                                                

78.22016666666667


                                                                                

25.197999999999997


                                                                                

25.197999999999997


                                                                                

3.224833333333333


                                                                                

92.565


                                                                                

20.5735


                                                                                

25.197999999999997


                                                                                

0.0


                                                                                

20.5735


                                                                                

29.142666666666667


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


                                                                                

7.254333333333333


                                                                                

6.844333333333333


                                                                                

10.146666666666667


                                                                                

10.113999999999999


                                                                                

81.74483333333333


                                                                                

81.8175


                                                                                

91.90350000000001


                                                                                

91.73633333333333


                                                                                

81.74483333333333


                                                                                

91.88933333333334


                                                                                

0.0


                                                                                

28.332


                                                                                

0.0


                                                                                

0.9708333333333332


                                                                                

0.0


                                                                                

0.0


                                                                                

0.0


[Stage 1174:>                                                       (0 + 1) / 1]

0.0


                                                                                

## Load

Loading the data into Postgres

In [264]:
# Replacing empty string with None
df_types_fixed = df_types_fixed.withColumn("it_skills", F.when(df_types_fixed.it_skills == "", None).otherwise(df_types_fixed.it_skills)) \
    .withColumn("language_skills", F.when(df_types_fixed.language_skills == "", None).otherwise(df_types_fixed.language_skills)) \
    .withColumn("professional_skills", F.when(df_types_fixed.professional_skills == "", None).otherwise(df_types_fixed.professional_skills)) \
    .withColumn("soft_skills", F.when(df_types_fixed.soft_skills == "", None).otherwise(df_types_fixed.soft_skills)) \

df_types_fixed.show()



+-------------------------+--------------------+-----------------------+--------------------+--------------------+--------------------+----------------------+---------------------------+----------------+-----------------+---------------+--------------------+-----------------------+--------------------+------------------------+---------------------+----------------------+-------------+----------+--------------------+--------+---------------+--------------------+---------------+----------------+---------------------+-------------------+---------------+-------+--------------------+-------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------+--------------------+-------------+--------------------+------------------------+---------------------+--------------------+---------------------+------------------------+---------------------+--------------------------+--------------------+-------------

                                                                                

In [265]:
print(engine.url)

postgresql://airflow:***@localhost:5432/job_ads_db


In [266]:
print(len(df_types_fixed.columns))
df_types_fixed.columns

80


['advertiser_contact_person',
 'advertiser_email',
 'advertiser_house_number',
 'advertiser_location',
 'advertiser_name',
 'advertiser_phone',
 'advertiser_postal_code',
 'advertiser_reference_number',
 'advertiser_spend',
 'advertiser_street',
 'advertiser_type',
 'advertiser_website',
 'application_description',
 'apply_url',
 'available_contact_fields',
 'candidate_description',
 'conditions_description',
 'contract_type',
 'date',
 'duplicate_on_jobsite',
 'duration',
 'education_level',
 'employer_description',
 'employment_type',
 'experience_level',
 'experience_years_from',
 'experience_years_to',
 'expiration_date',
 'expired',
 'full_text',
 'hours_per_week_from',
 'hours_per_week_to',
 'it_skills',
 'job_description',
 'job_id',
 'job_title',
 'language',
 'language_skills',
 'location',
 'location_coordinates',
 'location_name',
 'location_postal_code',
 'location_remote_possible',
 'organization_activity',
 'organization_address',
 'organization_industry',
 'organization_

In [117]:
# df_final = df_types_fixed.drop('advertiser_spend', 'apply_url','experience_level','location_postal_code','profession_kldb_code','profession_onet_2019_code', 'salary_from_rate', 'salary_time_scale','salary_to_rate')

In [267]:
df_final = df_types_fixed
print(len(df_final.columns))
df_final.columns

80


['advertiser_contact_person',
 'advertiser_email',
 'advertiser_house_number',
 'advertiser_location',
 'advertiser_name',
 'advertiser_phone',
 'advertiser_postal_code',
 'advertiser_reference_number',
 'advertiser_spend',
 'advertiser_street',
 'advertiser_type',
 'advertiser_website',
 'application_description',
 'apply_url',
 'available_contact_fields',
 'candidate_description',
 'conditions_description',
 'contract_type',
 'date',
 'duplicate_on_jobsite',
 'duration',
 'education_level',
 'employer_description',
 'employment_type',
 'experience_level',
 'experience_years_from',
 'experience_years_to',
 'expiration_date',
 'expired',
 'full_text',
 'hours_per_week_from',
 'hours_per_week_to',
 'it_skills',
 'job_description',
 'job_id',
 'job_title',
 'language',
 'language_skills',
 'location',
 'location_coordinates',
 'location_name',
 'location_postal_code',
 'location_remote_possible',
 'organization_activity',
 'organization_address',
 'organization_industry',
 'organization_

In [268]:
# find max character length for each column
df_final.select([F.max(F.length(c)).alias(c) for c in df_final.columns]).show(vertical=True)



-RECORD 0-----------------------------
 advertiser_contact_person   | 121    
 advertiser_email            | 222    
 advertiser_house_number     | 9      
 advertiser_location         | 58     
 advertiser_name             | 255    
 advertiser_phone            | 219    
 advertiser_postal_code      | 7      
 advertiser_reference_number | 255    
 advertiser_spend            | 5      
 advertiser_street           | 115    
 advertiser_type             | 1      
 advertiser_website          | 178    
 application_description     | 36273  
 apply_url                   | 255    
 available_contact_fields    | 75     
 candidate_description       | 25121  
 conditions_description      | 29778  
 contract_type               | 2      
 date                        | 10     
 duplicate_on_jobsite        | 5      
 duration                    | 2      
 education_level             | 1      
 employer_description        | 95352  
 employment_type             | 1      
 experience_level        

                                                                                

In [269]:
# from pyspark.sql import DataFrameWriter
# Write the data to a table in Postgres
df_final.write.format("jdbc") \
    .option("url", os.getenv('POSTGRES_CONNECTION_JDBC_STRING')) \
    .option("dbtable", os.getenv('POSTGRES_TABLE')) \
    .option("user", os.getenv('POSTGRES_USER')) \
    .option("password", os.getenv('POSTGRES_PASSWORD')) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

                                                                                