# Step 1: Explore and Collect the Dataset (Week 1)

Download and understand the job market dataset and prepare it for further processing.

Tasks:
Download Dataset:

Download the dataset from Kaggle.
Explore the CSV file to understand its structure: job title, company, location, job description, post date, etc.
Understand the Data:

Examine key features such as:
Job Title: What roles are being posted?
Company: Which companies are posting the most jobs?
Location: Where are most jobs located?
Job Function: What are the most common job functions (e.g., IT, HR, Marketing)?
Employment Type: Full-time, part-time, contract, etc.
Skills Learned:
Exploratory Data Analysis (EDA): Learn how to understand datasets before performing operations.
Technologies:
Pandas (for data exploration in Python).

In [38]:
import pandas as pd 

chunk_size = 10000

chunks = [ ]

for index, chunk in enumerate(pd.read_csv('data/postings.csv', chunksize=chunk_size)): 
        

    chunks.append(chunk)
    
    
df = pd.concat(chunks, ignore_index=True)
    
# df.head()
# df.info()
# df.describe()
# df[['views','job_id']].head(10)

# df.head(5)

# print(df.iloc[0])
# print(df.loc[0])


# df[(df['min_salary'] > 35) & (df['pay_period'] == 'HOURLY')]

# df.groupby('pay_period').sum()

# df.head(1)
# df.info()
df.isnull().sum()
df['job_id'].max()

3906267224

In [40]:
import pandas as pd
from sqlalchemy import create_engine,  Column, Integer, String, DateTime, Float, Boolean, BigInteger
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import  declarative_base
import os
from dotenv import load_dotenv


# Define the base class for SQLAlchemy models
Base = declarative_base()

# Define your table as a model
class Posting(Base):
    __tablename__ = 'postings'  # Table name in the database
   
    id = Column(Integer, primary_key=True)  # Primary key
    job_id = Column(BigInteger, nullable=False, unique=True)   
    company_name = Column(String)    
    title = Column(String)
    description = Column(String)
    max_salary = Column(Float)               
    pay_period = Column(String)
    location = Column(String)
    company_id = Column(Float)                    
    views = Column(Float) 
    med_salary = Column(Float)
    min_salary = Column(Float)
    formatted_work_type = Column(String)
    applies = Column(Float)
    original_listed_time = Column(DateTime)
    remote_allowed = Column(String, nullable=True)
    job_posting_url = Column(String)
    application_url = Column(String)
    application_type = Column(String)
    expiry = Column(DateTime)
    closed_time = Column(DateTime)
    formatted_experience_level = Column(String)   
    skills_desc = Column(String)     
    listed_time = Column(DateTime)
    posting_domain = Column(String)     
    sponsored = Column(Boolean)
    work_type = Column(String)
    currency = Column(String)
    compensation_type = Column(String)
    normalized_salary = Column(Float)
    zip_code = Column(Float)
    fips = Column(Float)
    

def create_table(engine):
    # Create all tables defined in the Base metadata
    Base.metadata.create_all(engine)
    print("Table created successfully or already exists.")

def refresh_env_variables():
    """Refreshes the environment variables by reloading them from the .env file."""
    load_dotenv()  # Load the environment variables from the .env file

    # Optionally, you can clear specific variables if you want to ensure they are refreshed
    os.environ.pop('DB_USERNAME', None)
    os.environ.pop('DB_PASSWORD', None)
    os.environ.pop('DB_NAME', None)
    os.environ.pop('DB_HOST', None)
    os.environ.pop('DB_PORT', None)

    # Reload the environment variables
    load_dotenv()

    print("Environment variables refreshed.")
    
def setup_database_connection():

    # Retrieve PostgreSQL connection parameters from environment variables
    username = os.getenv('DB_USERNAME')
    password = os.getenv('DB_PASSWORD')
    database = os.getenv('DB_NAME')
    host = os.getenv('DB_HOST', 'localhost')  # Default to 'localhost' if not set
    port = os.getenv('DB_PORT', '5432')        # Default to '5432' if not set

    # Check if required environment variables are set
    if not username or not password or not database:
        raise ValueError("Database connection parameters are not set in the environment variables.")

    # Create a connection string
    connection_string = f'postgresql://{username}:{password}@{host}:{port}/{database}'

    print(connection_string)
    # Create a SQLAlchemy engine
    engine = create_engine(connection_string)

    # Check the database connection
    try:
        with engine.connect() as connection:
            print("Database connection successful!")
    except SQLAlchemyError as e:
        print(f"Database connection failed: {e}")
        return None  # Return None if the connection fails

    return engine  # Return the engine if the connection is successful

def read_data_from_table(engine, table_name):
    """Read all data from the specified table."""
    try:
        # Read data from the specified table into a pandas DataFrame
        df = pd.read_sql_table(table_name, con=engine)
        print(f"Data retrieved successfully from table '{table_name}':")
        print(df)
    except SQLAlchemyError as e:
        print(f"Failed to read data from table '{table_name}': {e}")
        
def insert_data_to_postgres_table(engine, table_name, if_exists): 
    pass    
        
def read_csv_chunks_and_process( engine,table_name, append_or_replace):  
    """reads csv's into chunks, transforms the chunks
    and pushes it to the database"""
    chunk_size = 10000

    for  chunk in pd.read_csv('data/postings.csv', chunksize=chunk_size): 
        chunk['original_listed_time'] = pd.to_datetime(chunk['original_listed_time'],unit='ms')
        chunk['expiry'] = pd.to_datetime(chunk['expiry'],unit='ms')
        chunk['closed_time'] = pd.to_datetime(chunk['closed_time'],unit='ms')
        chunk['listed_time'] = pd.to_datetime(chunk['listed_time'],unit='ms')
        chunk['sponsored'] = chunk['sponsored'].astype(bool)
        chunk['remote_allowed'] = chunk['remote_allowed'].astype(float)
      
        chunk.to_sql(table_name, engine, if_exists=append_or_replace, index=False)
        
        
        
def main():
    engine = setup_database_connection()
    if engine:
        create_table(engine)
        read_csv_chunks_and_process(engine, 'postings', 'append')
        read_data_from_table(engine, 'postings')

if __name__ == "__main__":
    refresh_env_variables()
    main()


Environment variables refreshed.
postgresql://postgres:root@localhost:5432/data_engineering_job_postings
Database connection successful!
Table created successfully or already exists.
Data retrieved successfully from table 'postings':
            id      job_id                     company_name  \
0            1      921716            Corcoran Sawyer Smith   
1            2     1829192                             None   
2            3    10998357           The National Exemplar    
3            4    23221523           Abrams Fensterman, LLP   
4            5    35982263                             None   
...        ...         ...                              ...   
123844  123845  3906267117                     Lozano Smith   
123845  123846  3906267126                        Pinterest   
123846  123847  3906267131                     EPS Learning   
123847  123848  3906267195  Trelleborg Applied Technologies   
123848  123849  3906267224                        Solugenix   

         

In [None]:
import pandas as pd 
