In [127]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer, StringIndexer, StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import time
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import RegressionEvaluator


In [105]:
spark = SparkSession.builder.appName("Assignment").getOrCreate()
spark

In [106]:
df = spark.read.csv('/content/linkedin_job_postings.csv', header = True)

In [107]:
df.show(5)

+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+--------+
|            job_link| last_processed_time|got_summary|got_ner|is_being_worked|           job_title|             company|        job_location|first_seen|search_city|search_country|     search_position| job_level|job_type|
+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+--------+
|https://www.linke...|2024-01-21 07:12:...|          t|      t|              f|Account Executive...|                  BD|       San Diego, CA|2024-01-15|   Coronado| United States|         Color Maker|Mid senior|  Onsite|
|https://www.linke...|2024-01-21 07:39:...|          t|      t|              f|Registered Nurse ...|   Trinity H

In [108]:
df.printSchema()

root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)



In [109]:
columns_to_remove = ['job_link', 'last_processed_time', 'job_title', 'company', 'job_location', 'first_seen', 'search_city', 'search_position']

# Remove the specified columns
df = df.drop(*columns_to_remove)

# Display the cleaned DataFrame
print(df)


DataFrame[got_summary: string, got_ner: string, is_being_worked: string, search_country: string, job_level: string, job_type: string]


In [110]:
df.show(5)

+-----------+-------+---------------+--------------+----------+--------+
|got_summary|got_ner|is_being_worked|search_country| job_level|job_type|
+-----------+-------+---------------+--------------+----------+--------+
|          t|      t|              f| United States|Mid senior|  Onsite|
|          t|      t|              f| United States|Mid senior|  Onsite|
|          t|      t|              f| United States|Mid senior|  Onsite|
|          t|      t|              f| United States|Mid senior|  Onsite|
|          f|      f|              f| United States|Mid senior|  Onsite|
+-----------+-------+---------------+--------------+----------+--------+
only showing top 5 rows



In [111]:
for column in df.columns:
    unique_values = df.select(column).distinct().collect()
    unique_values_list = [row[column] for row in unique_values]
    print(f"Unique values for column '{column}': {unique_values_list}")

Unique values for column 'got_summary': ['Oldbury, England, United Kingdom', 'Portsmouth, England, United Kingdom', 'f', 'London, England, United Kingdom', 'Newcastle upon Tyne, England, United Kingdom', 'Sunningdale, England, United Kingdom', 't', 'Thames Ditton, England, United Kingdom', 'Reading, England, United Kingdom', 'Milton Keynes, England, United Kingdom', 'Lincoln, England, United Kingdom', 'Borehamwood, England, United Kingdom', 'Coventry, England, United Kingdom', 'Madison, WI', 'Gateshead, England, United Kingdom', 'Bicester, England, United Kingdom', 'Tunbridge Wells, England, United Kingdom', 'Bromley, England, United Kingdom', 'Torbay, Newfoundland and Labrador, Canada', 'Heathrow, FL', 'Huddersfield, England, United Kingdom', 'Lebanon, IL']
Unique values for column 'got_ner': ['2024-01-13', 'f', '2024-01-12', '2024-01-15', 't', '2024-01-14', '2024-01-16']
Unique values for column 'is_being_worked': ['Worcester', 'Slough', 'f', 'High Wycombe', 'South Hampshire', 't', '

In [112]:
indexed_df = indexed_df.na.drop()

In [119]:
indexed_df = df
for column in df.columns:
    indexer = StringIndexer(inputCol=column, outputCol=column+"_indexed",handleInvalid="skip")
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)


In [120]:
indexed_df.printSchema()
indexed_df.show(5)


root
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- got_summary_indexed: double (nullable = false)
 |-- got_ner_indexed: double (nullable = false)
 |-- is_being_worked_indexed: double (nullable = false)
 |-- search_country_indexed: double (nullable = false)
 |-- job_level_indexed: double (nullable = false)
 |-- job_type_indexed: double (nullable = false)

+-----------+-------+---------------+--------------+----------+--------+-------------------+---------------+-----------------------+----------------------+-----------------+----------------+
|got_summary|got_ner|is_being_worked|search_country| job_level|job_type|got_summary_indexed|got_ner_indexed|is_being_worked_indexed|search_country_indexed|job_level_indexed|job_type_indexed|
+-----------+-------+---------------+---------

In [121]:
dependent_variable = indexed_df['job_type_indexed']

In [122]:
independent_variables = indexed_df[['got_summary_indexed', 'got_ner_indexed', 'is_being_worked_indexed', 'search_country_indexed', 'job_level_indexed']]


In [123]:
assembler = VectorAssembler(inputCols=independent_variables.columns, outputCol="features")
df_assembled = assembler.transform(indexed_df)

In [124]:
df_assembled.printSchema()

root
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)
 |-- got_summary_indexed: double (nullable = false)
 |-- got_ner_indexed: double (nullable = false)
 |-- is_being_worked_indexed: double (nullable = false)
 |-- search_country_indexed: double (nullable = false)
 |-- job_level_indexed: double (nullable = false)
 |-- job_type_indexed: double (nullable = false)
 |-- features: vector (nullable = true)



In [128]:
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=123)

In [129]:
lr = LinearRegression(featuresCol='features', labelCol='job_type_indexed')
lr_model = lr.fit(df_assembled)

In [130]:
predictions = lr_model.transform(test_data)

In [131]:
evaluator = RegressionEvaluator(labelCol="job_type_indexed", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)

In [132]:
print("Mean Squared Error (MSE) on test data = %g" % mse)

Mean Squared Error (MSE) on test data = 0.0177591


In [126]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.linalg import Vectors
glr = GeneralizedLinearRegression(family="poisson", link="log", featuresCol="features", labelCol="job_type_indexed")
model = glr.fit(df_assembled)

In [137]:
pred = model.transform(test_data)

In [139]:
evaluator = RegressionEvaluator(labelCol="job_type_indexed", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(pred)

In [140]:
print("Mean Squared Error (MSE) on test data = %g" % mse)

Mean Squared Error (MSE) on test data = 0.0201486
