# ESCO Occupation Classifier

Explore our dataset with SQL

In [0]:
%sql
select * from default.esco_en_dataset_csv


title,idesco_level_4,esco_level_4
B93-C04 Softwareentwickler C++ und C#/.NET (m/w),2512,Software developers
Gezocht: Oracle Developer #Freelance #PandS #Jobs #Vacatures (Req:9096–Loc:Bxl),2512,Software developers
Senior (GXP Process Excellence) Engineer,2512,Software developers
Software-Entwickler (m/w/d) Buildsystem / Integration,2512,Software developers
Business Intelligence Developer,2512,Software developers
Microsoft Dynamics NAV Functional Consultant,2512,Software developers
Senior Ecotoxicological Expert,2512,Software developers
Senior Physical Layer Software Engineer - 729984,2512,Software developers
Junior of Senior Inkoper,2512,Software developers
Software Engineer .Net (SE01),2512,Software developers


Now check the distribution of ESCO OCCUPATION in the dataset

In [0]:
%sql
select esco_level_4, count(*) from default.esco_en_dataset_csv group by esco_level_4

esco_level_4,count(1)
Advertising and marketing professionals,5000
Software developers,5000
"Mathematicians, actuaries and statisticians",5000
Industrial and production engineers,5000


Read data from database (lowercase, alias, ...)

In [0]:
dataset = spark.sql("select regexp_replace(lower(title), '[0-9]', ' ') as title_cleaned,title,idesco_level_4 as target, esco_level_4 as target_label from default.esco_en_dataset_csv")
display(dataset.select("*"))

title_cleaned,title,target,target_label
b -c softwareentwickler c++ und c#/.net (m/w),B93-C04 Softwareentwickler C++ und C#/.NET (m/w),2512,Software developers
gezocht: oracle developer #freelance #pands #jobs #vacatures (req: –loc:bxl),Gezocht: Oracle Developer #Freelance #PandS #Jobs #Vacatures (Req:9096–Loc:Bxl),2512,Software developers
senior (gxp process excellence) engineer,Senior (GXP Process Excellence) Engineer,2512,Software developers
software-entwickler (m/w/d) buildsystem / integration,Software-Entwickler (m/w/d) Buildsystem / Integration,2512,Software developers
business intelligence developer,Business Intelligence Developer,2512,Software developers
microsoft dynamics nav functional consultant,Microsoft Dynamics NAV Functional Consultant,2512,Software developers
senior ecotoxicological expert,Senior Ecotoxicological Expert,2512,Software developers
senior physical layer software engineer -,Senior Physical Layer Software Engineer - 729984,2512,Software developers
junior of senior inkoper,Junior of Senior Inkoper,2512,Software developers
software engineer .net (se ),Software Engineer .Net (SE01),2512,Software developers


### Pre-processing (clean text and reduce features)

In [0]:
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

regexTokenizer = RegexTokenizer(inputCol="title_cleaned", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = regexTokenizer.transform(dataset)
tokenized.select("title_cleaned", "words").withColumn("tokens", countTokens(col("words"))).show(truncate=False)


In [0]:
from pyspark.sql.functions import explode, desc
tokens = tokenized.select(explode(col("words")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
tokens.show()

We have to filter shor sentences, stopwords, ...

Start short string (len < 3) with a user defined funtion

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
def filter_by_len(words):
  filtered = [word for word in words if len(word) >= 3]
  return filtered

filter_by_len_udf = udf(filter_by_len, ArrayType(StringType()))

filtered = tokenized.withColumn("filtered", filter_by_len_udf(col("words")))
filtered.show()

In [0]:
filtered.printSchema()

In [0]:
tokens = filtered.select(explode(col("filtered")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
tokens.show()

Now we remove the stopwords...

In [0]:
from pyspark.ml.feature import StopWordsRemover


remover = StopWordsRemover(inputCol="filtered", outputCol="cleaned")
cleaned = remover.transform(filtered)
cleaned.show()

In [0]:
tokens = cleaned.select(explode(col("cleaned")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
tokens.show()

N-grams creation....

In [0]:
from pyspark.ml.feature import NGram
ngrams2 = NGram(n=2, inputCol="cleaned", outputCol="ngrams_2")
ngrams3 = NGram(n=3, inputCol="cleaned", outputCol="ngrams_3")
ngrams4 = NGram(n=4, inputCol="cleaned", outputCol="ngrams_4")


ngrams = ngrams2.transform(cleaned)
ngrams = ngrams3.transform(ngrams)
ngrams = ngrams4.transform(ngrams)






ngrams.show()

In [0]:

# union of the results

def union_ngrams(c1,c2,c3,c4):
  return c1 + c2 + c3 + c4

union_ngrams_udf = udf(union_ngrams, ArrayType(StringType()))

ngrams_final = ngrams.filter("cleaned is not Null").withColumn("ngrams", union_ngrams_udf(col("cleaned"), col("ngrams_2"), col("ngrams_3"), col("ngrams_4")))
ngrams_final.show()



Train a Word2Vec model...

In [0]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=300, minCount=10, inputCol="ngrams", outputCol="features")
model = word2Vec.fit(ngrams_final)

Try our model

In [0]:
model.findSynonyms("java", 10).show(truncate=False)
model.findSynonyms("senior developer", 10).show(truncate=False)

Now apply our model to the data...

In [0]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

ngrams_featured = model.transform(ngrams_final)
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(ngrams_featured)
scaledData = scalerModel.transform(ngrams_featured)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()


Convert target label to index

In [0]:
from pyspark.ml.feature import IndexToString, StringIndexer

indexer = StringIndexer(inputCol="target", outputCol="label")
indexer_model = indexer.fit(scaledData)
indexed = indexer_model.transform(scaledData)

converter = IndexToString(inputCol="prediction", outputCol="prediction_category", labels=indexer_model.labels)


Prepare train and test

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

train, test = indexed.randomSplit([0.8, 0.2], seed=12345)

nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="label", featuresCol="scaledFeatures")
paramGrid = ParamGridBuilder()\
    .addGrid(nb.smoothing, [0.1, 0.5, 1.0]) \
    .build()

tvs = TrainValidationSplit(estimator=nb,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           trainRatio=0.8)

In [0]:
model = tvs.fit(train)
predictions = model.transform(test).select("title", "label", "prediction")
predictions.show(truncate=False)

Compute accuracy on the test set

In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Test set weighted precision = " + str(precision))