# This notebook performs the following tasks:

- Retrieves job postings data from AWS DynamoDB and S3 and merges them, removing duplicates and identifying unprocessed job postings.
- Cleans job descriptions, extracts hard skills, and removes duplicated job postings based on company, title, location, and hard skills.
- Categorizes job titles into data scientist, data engineer, data analyst, and software engineer categories, and generates embeddings for hard skills.
- Categorizes job locations into USA, Canada, or remote.
Transforms the data by converting timezones, formatting dates, and saving the transformed DataFrame to an Amazon S3 bucket as a CSV file.

### Key outputs:

- Total job postings, processed job postings, and unprocessed job postings.
- DataFrame with cleaned job descriptions, extracted hard skills, categorized job titles, and categorized locations.


In [1]:
# install sentence transformer and paprmill
!pip install -U sentence-transformers
!pip install papermill


Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com




In [2]:
#install libraries
import json
import boto3
import re
import hashlib
import datetime
import pandas as pd
import numpy as np
import pytz
import re
import boto3
import matplotlib.pyplot as plt
from io import StringIO
from collections import defaultdict
from collections import Counter
from sentence_transformers import SentenceTransformer


## This code performs the following tasks:

- Connects to an AWS DynamoDB database and scans a table named "TransformedGoogleJob" to retrieve all items.
- Converts the retrieved data to a Pandas DataFrame.
- Connects to an AWS S3 bucket named "embeddedgooglejob".
- Lists all files in the S3 bucket and reads all CSV files.
- Concatenates all CSV files into a single DataFrame named "combined_dataframe".
- Drops duplicate rows from the "combined_dataframe" based on the "id" column and creates a new DataFrame named "processed_df".
- Merges the "total_df" and "processed_df" DataFrames on the "id" column using a left join.
- Selects all rows in the merged DataFrame where the "_merge" column has a value of "left_only", indicating that they were not processed before.
- Drops the "_merge" column from the resulting DataFrame and creates a new DataFrame named "unprocessed_df".
- Copies the "unprocessed_df" DataFrame into a new DataFrame named "df".
- The final output of the code is the number of total job postings, the number of processed job postings, and the number of unprocessed job postings. The "df" DataFrame can be used to further process the unprocessed job postings.

In [3]:
# Create a DynamoDB client
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('TransformedGoogleJob')

# Scan the table
response = table.scan()
data = response['Items']

# Iterate and retrieve all items (if there are more than the initial limit)
while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    data.extend(response['Items'])

# Convert the data to a DataFrame
total_df = pd.DataFrame(data)

print('Number of total job_postings are',len(total_df))

bucket_name = "embeddedgooglejob"
s3 = boto3.client('s3')


def list_files_in_bucket(bucket):
    files = []
    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket):
        for obj in page['Contents']:
            files.append(obj['Key'])
    return files

def read_csv_from_s3(bucket, file_key):
    obj = s3.get_object(Bucket=bucket, Key=file_key)
    csv_content = obj['Body'].read().decode('utf-8')
    return pd.read_csv(StringIO(csv_content))

all_files = list_files_in_bucket(bucket_name)

# Read and concatenate all CSV files
all_dataframes = []
for file_key in all_files:
    data = read_csv_from_s3(bucket_name, file_key)
    all_dataframes.append(data)

# Concatenate all CSV files into a single DataFrame and drop duplicate rows
combined_dataframe = pd.concat(all_dataframes, ignore_index=True)
# Drop duplicate rows
processed_df = combined_dataframe.drop_duplicates(subset=['id'],keep='first',ignore_index=True)
print('Number of proceessed job_postings are',len(processed_df))

# Merge the processed and total DataFrames and select unprocessed rows
unprocessed_df = total_df.merge(processed_df[['id']], on='id', how='left', indicator=True).query("_merge == 'left_only'").drop('_merge', axis=1)
print('Number of unprocessed job_postings are',len(unprocessed_df))
# Create a copy of the unprocessed DataFrame
df=unprocessed_df.copy(deep=True)


Number of total job_postings are 3499
Number of proceessed job_postings are 3444
Number of unprocessed job_postings are 55


## This code performs the following tasks:

- A list of job skills is defined, containing various skills related to data engineering, data science, and data analytics.
- A function named "clean_job_description" is defined, which takes a job description text as input and performs various cleaning operations, such as removing stop words, removing punctuation (except for plus, hash, and backslash), preserving C++ and C# syntax, and keeping numbers and letters.
- Another function named "extract_hardskills" is defined, which takes a job description and the list of job skills as input and returns a list of matching hard skills found in the description. This function also calls the "clean_job_description" function to clean the job description before extracting the hard skills.
- The "hard_skills" column is added to the "df" DataFrame by applying the "extract_hardskills" function to the "description" column.
- Duplicated rows in the DataFrame are identified based on the "company", "title", and "location" columns.
- Duplicated rows are grouped by "company", "title", and "location".
- The indices of the duplicated rows are printed, organized by "company", "title", and "location".
- A new column named "hard_skill_tuple" is added to the "df" DataFrame by applying the "tuple" function to the "hard_skills" column.
- Duplicate rows in the DataFrame are dropped based on the "company", "title", "location", and "hard_skill_tuple" columns, keeping the first occurrence of each duplicate row.
- Duplicated rows are again identified based on the "company", "title", and "location" columns.
- Duplicated rows are again grouped by "company", "title", and "location".
- The indices of the duplicated rows are again printed, organized by "company", "title", and "location".
- Overall, this code is designed to clean job descriptions and extract hard skills from them, and then identify and remove duplicated job postings based on various criteria, such as company, title, location, and hard skills.

In [4]:
de_skills=['airflow', 'alteryx', 'athena', 'aurora', 'aws', 'azure', 'boto3', 'cassandra', 'cloud computing', 'cloudwatch', 'cockroachdb', 'data factory', 'data lake', 'data lake analytics', 'data management', 'data mapping', 'data migration', 'data mining', 'data munging', 'data pipeline design', 'data pipelines', 'data warehousing', 'database design', 'databricks', 'django', 'docker', 'dynamodb', 'ec2', 'elasticsearch', 'emr', 'etl', 'etl design', 'etl frameworks', 'gcp', 'glue', 'google bigquery', 'google bigtable', 'google cloud compute engine', 'google cloud dataflow', 'google cloud functions', 'google cloud kubernetes engine', 'google cloud pub/sub', 'google cloud spanner', 'google cloud storage', 'google storage', 'hadoop', 'hbase', 'hive', 'impala', 'informatica', 'kinesis', 'knime', 'kubernetes', 'lambda', 'mongodb', 'mpi', 'mysql', 'olap', 'oltp', 'oracle', 'pandas', 'pentaho', 'pig', 'pl/sql', 'postgresql', 'power bi', 'pyspark', 'rds', 'redis', 'redshift', 'relational databases', 's3', 'sas', 'sas enterprise guide', 'scalable systems', 'schema design', 'snowflake', 'solr', 'spark', 'sql', 'sql anywhere', 'sql server', 'sql*plus', 'sql/psm', 'sqlite', 'ssis','scala', 'stata', 'sybase', 'talend', 'teradata', 'unix', 'yarn','distributed systems', 'mariadb', 'performance tuning']

da_skills=['agile', 'bayesian networks', 'classification', 'clustering', 'collaborative filtering', 'cognos', 'dimensional modeling', 'dimensionality reduction', 'excel', 'google analytics', 'hypothesis testing', 'looker', 'mathematics', 'matplotlib', 'power bi', 'predictive modeling', 'probability', 'r', 'random forest', 'regression analysis', 'scrum', 'spectral clustering', 'spotfire', 'sql', 'statistical analysis', 'tableau', 'text mining', 'tidyverse', 'time series analysis', 'unsupervised learning', 'web scraping', 'weka','kibana', 'splunk']

ds_skills=['bayesian networks', 'classification', 'clustering', 'collaborative filtering', 'convolutional neural networks', 'cuda', 'deep learning', 'dimensionality reduction', 'ensemble modeling', 'feature engineering', 'h2o', 'hypothesis testing', 'keras', 'machine learning','matlab', 'machine vision', 'mapreduce', 'mathematics', 'matplotlib', 'natural language processing', 'neural networks', 'nlp', 'nltk', 'numpy', 'opencv', 'pandas', 'predictive modeling', 'probability', 'random forest', 'regression analysis', 'recommender systems', 'scikit-learn', 'singular value decomposition','statistical analysis', 'support vector machines', 'tensorflow', 'text mining', 'theano', 'time series analysis', 'torch', 'unsupervised learning', 'Large Language Models', 'natural language processing', 'nlp', 'nltk', 'pytorch','logistic regression']

se_skills=['app development', 'agile', 'android app development', 'angularjs', 'ariflow', 'aws', 'azure', 'clojure', 'cloud computing', 'cloudwatch', 'cuda', 'd3js', 'dask', 'devops', 'django', 'docker', 'electron', 'erlang', 'expressjs', 'fastapi', 'fiori/ui5', 'flask', 'git', 'go', 'gradle', 'graphql', 'grunt', 'gulp', 'haskell', 'hibernate', 'html/css', 'java', 'javafx', 'javascript', 'jenkins', 'jquery', 'junit', 'jupyter notebook', 'kotlin', 'kubernetes', 'lambda', 'lamp stack', 'laravel', 'linux', 'microservices', 'mockito', 'nodejs', 'nosql', 'object oriented abap', 'object-oriented programming', 'objective-c', 'opencv', 'opengl', 'openmp', 'perl', 'php', 'posix threads', 'python', 'qt', 'r', 'rabbitmq', 'rdbms', 'reactjs', 'rest apis', 'ruby', 'ruby on rails', 'rust', 'selenium', 'shell scripting', 'spring', 'spring framework', 'sql', 'sql server', 'swift', 'symfony', 't-sql', 'tableau', 'transact-sql', 't-sql', 'typescript', 'unity', 'unix', 'vuejs', 'web scraping', 'web services', 'webpack', 'xml','distributed systems', 'maven', 'performance tuning', 'pygame', 'pyqt', 'scala', 'tkinter', 'stl']

job_skills=list(set(de_skills+da_skills+ds_skills+se_skills))
print("Number of hard skills for Data Engineer is {}, Data Analyst is {}, Data Scientist is {}, Software Engineer is {}".format(len(de_skills),len(da_skills),len(ds_skills),len(se_skills)))
print("Number of total hard skills are {}".format(len(job_skills)))




Number of hard skills for Data Engineer is 96, Data Analyst is 34, Data Scientist is 47, Software Engineer is 95
Number of total hard skills are 229


In [5]:
# Hard Skills for DE, DS, DA, DE



def clean_job_description(text):
    """
    This function takes a job description text and performs the following cleaning operations:
    - Remove punctuation
    - Remove stop words
    - Preserve C++ and C# syntax
    - Keep numbers and letters
    :param text: A string of job description text
    :return: A string of cleaned job description text
    """
    # Define the stop words to remove
    stop_words = ["a", "an", "the", "and", "or", "of", "to", "in", "for", "on", "with", "at", "by", "from", "up", "as",
                  "it", "its", "their", "they", "them", "he", "him", "his", "she", "her", "hers", "you", "your", "yours",
                  "we", "us", "our", "ours", "me", "my", "mine", "i", "am", "is", "are", "was", "were", "be", "been",
                  "being", "have", "has", "had", "having", "do", "does", "did", "doing", "will", "would", "shall",
                  "should", "may", "might", "must", "can", "could", "to", "that", "this", "there", "these", "those"]
    
    # Remove punctuation except for plus, hash, backslah 
    text = re.sub(r'[^\w\s+#\/]', '', text)
    text = text.replace('\n', ' ')


    # Remove stop words
    text = ' '.join(word for word in text.split() if word.lower() not in stop_words)
    text=text.lower()
    return text

def extract_hardskills(description, job_skills):
    description = clean_job_description(description)

    # Loop through the list of job postings to extract c# and c++
    pattern = r'\bc\s*\+\+|\bc#|\bc\b'

    # Create a new list of job skills that are in the words list
    matching_skills = list(set([skill for skill in job_skills if re.search(rf"\b{re.escape(skill)}\b", description)]+re.findall(pattern, description)))

    return matching_skills

#Extract Hard skills from description and add it to dataframe
df['hard_skills'] = df['description'].apply(extract_hardskills, args=(job_skills,))

# Find duplicated rows based on 'company', 'title', and 'location' columns
duplicated_rows = df[df.duplicated(subset=['company', 'title', 'location'], keep=False)]

# Group duplicated rows by 'company', 'title', and 'location'
grouped_duplicated_rows = duplicated_rows.groupby(['company', 'title', 'location'])

# Print the indices of the duplicated rows, organized by 'company', 'title', and 'location'
for (company, title, location), group in grouped_duplicated_rows:
    print(f"Company: {company}, Title: {title}, Location: {location}")
    print(f"Indices: {list(group.index)}")
    print()

df['hard_skill_tuple'] = df['hard_skills'].apply(tuple)
df.drop_duplicates(subset=['company', 'title', 'location', 'hard_skill_tuple'], keep='first', inplace=True)
df.drop('hard_skill_tuple', axis=1, inplace=True)

# Find duplicated rows based on 'company', 'title', and 'location' columns
duplicated_rows = df[df.duplicated(subset=['company', 'title', 'location'], keep=False)]

# Group duplicated rows by 'company', 'title', and 'location'
grouped_duplicated_rows = duplicated_rows.groupby(['company', 'title', 'location'])

# Print the indices of the duplicated rows, organized by 'company', 'title', and 'location'
for (company, title, location), group in grouped_duplicated_rows:
    print(f"Company: {company}, Title: {title}, Location: {location}")
    print(f"Indices: {list(group.index)}")
    print()

Company: BenchSci, Title: Engineering Manager, Data, Location: Toronto, ON, Canada
Indices: [5, 1893]



## This code defines lists of job titles for different job categories, such as data scientist, data engineer, data analyst, and software engineer.

It then defines a function categorize_job_title that takes a job title as input and returns the job category it belongs to based on the lists of job titles defined earlier.

The code then applies the categorize_job_title function to the 'title' column of a Pandas DataFrame called df to create a new column 'job_category', which categorizes each job title in the DataFrame into one of the job categories.

The code then uses the SentenceTransformer library to convert the list of hard skills for each job into a single string and then into an embedding. The embedding is added as a new column 'embedding' in the DataFrame.

Finally, the code defines a function categorize_location that takes a location string and returns whether the job is in the USA, Canada, or is remote. The function is applied to the 'location' column of the DataFrame to create a new column 'location_category'.

In [6]:
data_scientist_titles = ["Data Science",
    "Data Scientist","DataScientist",
    "Machine Learning Engineer",
    "AI Engineer",
    "AI Researcher",
    "Machine Learning Researcher",
    "Computer Vision Engineer",
    "Natural Language Processing Engineer",
    "Deep Learning Engineer",
    "Data Science Specialist",
    "Statistical Modeler",
    "Applied Scientist",
    "Research Scientist",
    "Quantitative Analyst",
    "AI Scientist",
    "Algorithm Developer",
    "Machine Learning Scientist",
    "AI Developer",
    "Data Science Engineer",
    "Machine Learning Specialist",
    "Data Science Analyst",
    "Artificial Intelligence Engineer",
    "Deep Learning Specialist",
    "Predictive Modeler",
    "Quantitative Researcher",
    "AI Consultant",
    "Reinforcement Learning Engineer",
    "Data Science Consultant",
    "Machine Learning Analyst",
    "AI Analyst",
    "Data Science Researcher",
    "ML Engineer","Machine Learning","ML Ops Engineer",
    "ML Researcher",
    "Data Mining Engineer",
    "Data Science Developer",
    "AI Specialist",
    "Algorithm Engineer",
    "Deep Learning Researcher",
    "Machine Intelligence Engineer",
    "Data Analysis Scientist",
    "Advanced Analytics Engineer"
]
data_engineer_titles = ["Cloud Engineer","Engineer, Data","ETL Developer","Data Ops Lead","Data Schema Specialist",
    "Data Engineer","Data Developer",
    "Big Data Engineer",
    "Data Integration Engineer",
    "Data Pipeline Engineer",
    "Data Platform Engineer",
    "ETL Engineer",
    "Big Data Developer",
    "Data Infrastructure Engineer",
    "Data Warehouse Engineer",
    "Streaming Data Engineer",
    "Data Engineering Manager",
    "Database Engineer",
    "Cloud Data Engineer",
    "Azure Data Engineer",
    "AWS Data Engineer",
    "GCP Data Engineer",
    "Hadoop Engineer",
    "Data Architect",
    "Big Data Architect",
    "Data Engineering Consultant",
    "Data Operations Engineer",
    "Data Systems Engineer",
    "Data Analytics Engineer",
    "Real-time Data Engineer",
    "Spark Engineer",
    "NoSQL Engineer",
    "Data Lake Engineer",
    "Data Management Engineer",
    "Data Storage Engineer",
    "Distributed Data Engineer",
    "Data Processing Engineer",
    "Big Data Solutions Engineer",
    "Data Strategy Engineer",
    "Data Science Platform Engineer",
    "Data Engineering Analyst",
    "Data Modeler",
    "Data Integration Architect",
    "Data Engineering Specialist",
    "Data Migration Engineer",
    "DevOps Engineer",
    "Data Engineering Lead"
]

data_analyst_titles = ["Data Analytics","Data Management Analyst","Analytic","Analyst",
    "Data Analysis",
    "Data Analyst",
    "Business Data Analyst",
    "Data Analytics Specialist",
    "Marketing Data Analyst",
    "Financial Data Analyst",
    "Operations Data Analyst",
    "Quantitative Data Analyst",
    "Healthcare Data Analyst",
    "Data Reporting Analyst",
    "Sales Data Analyst",
    "Customer Data Analyst",
    "Data Quality Analyst",
    "Data Visualization Analyst",
    "HR Data Analyst",
    "Supply Chain Data Analyst",
    "Risk Data Analyst",
    "Data Insights Analyst",
    "Data Analyst Intern",
    "Data Governance Analyst",
    "Data Analyst Consultant",
    "SQL Data Analyst",
    "Data Analyst Manager",
    "Data Research Analyst",
    "Data Analyst Lead",
    "Junior Data Analyst",
    "Senior Data Analyst",
    "Data Analyst Associate",
    "Data Analyst Coordinator",
    "Database Analyst",
    "Data Audit Analyst",
    "Data Intelligence Analyst",
    "Fraud Data Analyst",
    "Data Integration Analyst",
    "Data Systems Analyst",
    "Data Analyst I",
    "Data Analyst II",
    "Entry-Level Data Analyst",
    "Strategic Data Analyst"
]

software_engineer_titles = ["Software","Frontend","Backend","Python Engineer","Back End Developer",".Net Developer",
    "Software Engineer","Senior Fullstack Developer",".NET/Angular Engineer",
    "Software Developer","MAC Engineer with Xcode/Swift","Android Engineer","Back End Engineer",
    "Web Developer",
    "Full Stack Developer",
    "Backend Developer",
    "Frontend Developer",
    "Mobile Developer",
    "iOS Developer",
    "Android Developer",
    "Embedded Software Engineer",
    "Software Engineer Intern",
    "Junior Software Engineer",
    "Senior Software Engineer",
    "Software Architect",
    "Application Developer",
    "Java Developer","Java Engineer",
    "Python Developer",
    "C++ Developer",
    "C# Developer",
    "JavaScript Developer",
    "Ruby Developer",
    "PHP Developer",
    "Golang Developer",
    "Scala Developer",
    "Swift Developer",
    "Kotlin Developer",
    "Rust Developer",
    "Software Development Engineer",
    "Software Engineering Manager",
    "Software Test Engineer",
    "Game Developer",
    "UI Developer",
    "UX Developer",
    "Frontend Engineer",
    "Backend Engineer",
    "Full Stack Engineer",
    "Cloud Developer",
    "Systems Software Engineer",
    "Software Development Analyst"
]


job_categories = {
        'data engineer': data_engineer_titles,
        'data scientist': data_scientist_titles,
        'data analyst': data_analyst_titles,
        'software engineer': software_engineer_titles
    }


# Function to categorize the job titles
def categorize_job_title(title):
    for category, titles in job_categories.items():
        if any(t.lower() in title.lower() for t in titles):
            return category
    print(f"Uncategorized title: {title}")
    return None

# Add a new column 'job_category' to the DataFrame
df['job_category'] = df['title'].apply(categorize_job_title)

# Filter the DataFrame to include only the desired job categories
df = df[df['job_category'].isin(job_categories.keys())]

#initialize Bert Model
# Most powerful
model = SentenceTransformer('paraphrase-mpnet-base-v2')
# Lightweight
#model = SentenceTransformer('paraphrase-distilroberta-base-v1')
# Convert skills list to a single string to use sentence_transformer
def skills_to_string(skills_list, separator=', '):
    return separator.join(skills_list)


for index, job in df.iterrows():
    df.loc[index, 'STRskills'] = skills_to_string(job['hard_skills'])
    

def encode_skills(skills):
    return model.encode(skills, convert_to_numpy=True).tolist()

df['embedding'] = df['STRskills'].apply(lambda x: encode_skills(x))

#categorize into USA vs Canada vs Remote
def categorize_location(location):
    if 'Canada' in location or ', Canada' in location or 'Canadá' in location or 'CA' in location:
        return 'Canada'
    elif 'Anywhere' in location or 'Qualquer lugar' in location:
        return 'Remote'
    else:
        return 'USA'

df['location_category'] = df['location'].apply(categorize_location)

Uncategorized title: Engineering Manager, Data
Uncategorized title: Data Center Electrical Engineer, Google Data Centers
Uncategorized title: ERP Programmer (MS Dynamics)
Uncategorized title: Axiom Developer
Uncategorized title: Insight Global
Uncategorized title: Unreal Programmer
Uncategorized title: Insight Global
Uncategorized title: Engineering Manager
Uncategorized title: Army Information Network Engineer
Uncategorized title: Assc Dir-Solutions Specialist
Uncategorized title: ER Data Center Process Principal (RNTBD)
Uncategorized title: Principal Data Consultant
Uncategorized title: Data Champion
Uncategorized title: Product Manager, Data Strategy
Uncategorized title: Platform Support Engineer
Uncategorized title: Product Owner - MLOps
Uncategorized title: Staff Engineer - Canada
Uncategorized title: Senior Service Engineer
Uncategorized title: Data Solution Partner (Part Time)
Uncategorized title: Angular Developer
Uncategorized title: Deep Learning Compiler Engineer II, AWS Neu

## Data Transformation Script Overview
This code is a data transformation script that applies several functions to a DataFrame object, converts the timezones, formats the dates, and then saves the transformed DataFrame into an Amazon S3 bucket as a CSV file.

## Function Description
The convert_posted_at_to_local_time() function takes a timestamp in the format "2 days ago" or "1 hour ago" and converts it to the local time. It uses the datetime module to get the current time in UTC and convert it to the local time. It then parses the input timestamp, calculates the difference between the current time and the input timestamp and returns the local posted date in the format "YYYY-MM-DD HH:MM:SS".

## Timezone Conversion
The script then defines the Montreal timezone and converts the current UTC time to Montreal local time using the pytz module. It formats the local time as a string and saves it in the variable montreal_time_str.

## DataFrame Transformation
The next block applies the convert_posted_at_to_local_time() function to the 'metadata' column of the DataFrame and creates a new column 'posted_date' that contains the converted local posted date. It also creates a new column 'scraped_date' that contains the current Montreal time as a string.

The script then prints the minimum and maximum posted date to verify that the conversion is working properly.

## CSV File Saving
The next block constructs a filename for the CSV file to be saved in the S3 bucket. It uses the current Montreal time to generate a filename in the format "processed_YYYY-MM-DD HH:MM.csv".

The last block saves the transformed DataFrame to the S3 bucket using the boto3 module. It converts the DataFrame to a CSV format, creates a CSV buffer and writes it to the S3 bucket using the put_object() function.

Finally, the script prints the total number of processed and saved job postings.

In [7]:
def convert_posted_at_to_local_time(posted_time):
    if not posted_time:
        return ''

    now = datetime.datetime.now(datetime.timezone.utc)
    local_tz = datetime.timezone(datetime.timedelta(hours=-4), 'EDT')
    local_now = now.astimezone(local_tz)

    match = re.match(r'(\d+)\s*(\w+)\s*ago', posted_time)
    if match:
        value, unit = int(match.group(1)), match.group(2)
        if unit.startswith('hour'):
            local_posted_date = local_now - datetime.timedelta(hours=value)
        elif unit.startswith('day'):
            local_posted_date = local_now - datetime.timedelta(days=value)
        elif unit.startswith('minute'):
            local_posted_date = local_now - datetime.timedelta(minutes=value)
        else:
            local_posted_date = local_now
    else:
        local_posted_date = local_now

    return local_posted_date.strftime('%Y-%m-%d %H:%M:%S')

# Get the current time in UTC
utc_now = datetime.datetime.now(pytz.utc)

# Define the Montreal timezone
montreal_tz = pytz.timezone('America/Montreal')

# Convert the current time to Montreal local time
montreal_now = utc_now.astimezone(montreal_tz)

# Format the datetime object as a string
montreal_time_str = montreal_now.strftime('%Y-%m-%d %H:%M')


# Apply the function and create the new columns
df['posted_date'] = df['metadata'].apply(lambda x: convert_posted_at_to_local_time(x.get('postedAt', '')))
df['scraped_date'] = montreal_time_str

print(min(df['posted_date']))
print(max(df['posted_date']))


# Construct the filename
file_name = f"processed_{montreal_time_str}.csv"

bucket_name = "embeddedgooglejob"


# Save the DataFrame as a CSV file in the S3 bucket
s3 = boto3.client('s3')
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
s3.put_object(Bucket=bucket_name, Key=file_name, Body=csv_buffer.getvalue())

print('Total number of new processed and saved job_postings after transformation are,', len(df))


2023-04-01 22:14:09
Total number of new processed and saved job_postings after transformation are, 13
