In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession


config = {
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "itayb/spark:3.1.1-hadoop-3.2.0-aws",
    "spark.executor.instances": "2",
    "spark.executor.memory": "4g",
    "spark.executor.cores": "1",
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "jupyter.spark.svc.cluster.local",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.hadoop.fs.s3a.endpoint": "localstack.kube-system.svc.cluster.local:4566",
    "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.com.amazonaws.services.s3.enableV4": "true",
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
}

def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local")
    for key, value in config.items():
        conf.set(key, value)    
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

In [2]:
spark = get_spark_session("spark-workers", swan_spark_conf)

In [3]:
course_df = spark.read.csv('s3a://bds-assignment/data/coursera_data.csv',header=True)
course_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- course_title: string (nullable = true)
 |-- course_organization: string (nullable = true)
 |-- course_Certificate_type: string (nullable = true)
 |-- course_rating: string (nullable = true)
 |-- course_difficulty: string (nullable = true)
 |-- course_students_enrolled: string (nullable = true)



In [4]:
jobs_df = spark.read.csv('s3a://bds-assignment/data/data_job_post.csv',header=True)
jobs_df.printSchema()

root
 |-- jobpost: string (nullable = true)
 |-- date: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- AnnouncementCode: string (nullable = true)
 |-- Term: string (nullable = true)
 |-- Eligibility: string (nullable = true)
 |-- Audience: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- JobDescription: string (nullable = true)
 |-- JobRequirment: string (nullable = true)
 |-- RequiredQual: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- ApplicationP: string (nullable = true)
 |-- OpeningDate: string (nullable = true)
 |-- Deadline: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- AboutC: string (nullable = true)
 |-- Attach: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- IT: string (nullable = true)



In [186]:
course_df_subset = course_df.select("course_title")
jobs_df_subset = jobs_df.select("jobpost")

In [187]:
jobs_df_subset = jobs_df_subset.na.drop()
course_df_subset = course_df_subset.na.drop()

In [188]:
course_df_subset = course_df_subset.toDF(*["text"])
jobs_df_subset = jobs_df_subset.toDF(*["text"])

In [189]:
from pyspark.sql import functions as F
course_df_subset = course_df_subset.select('text').withColumn("doc_id", F.expr("uuid()"))
jobs_df_subset = jobs_df_subset.select('text').withColumn("doc_id", F.expr("uuid()"))

In [190]:
jobs_df_subset.show(2)

+--------------------+--------------------+
|                text|              doc_id|
+--------------------+--------------------+
|AMERIA Investment...|1cbf5725-d6de-414...|
|JOB TITLE:  Chief...|f1f47a8f-4d51-4bd...|
+--------------------+--------------------+
only showing top 2 rows



In [191]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, HashingTF

In [192]:
columns = ['text'] 


preProcStages = []

for col in columns:
    regexTokenizer = RegexTokenizer(gaps=False, pattern='\w+', inputCol=col, outputCol=col+'Token')
    stopWordsRemover = StopWordsRemover(inputCol=col+'Token', outputCol=col+'SWRemoved')
    countVectorizer = CountVectorizer(inputCol=col+'SWRemoved', outputCol=col+'TF')
    idf = IDF(inputCol=col+'TF', outputCol=col+'IDF') 
    preProcStages += [regexTokenizer, stopWordsRemover, countVectorizer, idf]

In [193]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=preProcStages)

In [194]:
## combine two dataframes
dataCombined = course_df_subset.union(jobs_df_subset)

In [195]:
dataCombined = dataCombined.repartition(1)

In [196]:
dataCombined.cache()

DataFrame[text: string, doc_id: string]

In [197]:
model = pipeline.fit(dataCombined)

In [198]:

dataCombined = model.transform(dataCombined)

In [199]:
dataCombined.show(2)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|              doc_id|           textToken|       textSWRemoved|              textTF|             textIDF|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Financing and Inv...|c93c686b-8fb1-492...|[financing, and, ...|[financing, inves...|(787,[589,599,692...|(787,[589,599,692...|
|Epidemiology: The...|e00b050c-1dbf-43e...|[epidemiology, th...|[epidemiology, ba...|(787,[40,43,113,7...|(787,[40,43,113,7...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [200]:
lookupTable = spark.sparkContext.broadcast(dataCombined.rdd.map(lambda x: (x['doc_id'], 
                                                           {'text':x['text'], 
                                                            'textIDF':x['textIDF']})).collectAsMap())

In [122]:
# COSINE similarity

In [231]:
import math

def cosine_similarity(X, Y):
    denom = X.norm(2) * Y.norm(2)
    if denom == 0.0:
        return float(-1.0)
    else:
        return float(X.dot(Y) / float(denom))

In [232]:
def similarities(id_course, id_job, lookupTable):
    X, Y = lookupTable.value[id_course], lookupTable.value[id_job]
  
    sim = cosine_similarity(X['textIDF'], Y['textIDF'])
    return sim

In [233]:
from pyspark.sql import Row

pairId = jobs_df_subset.select('doc_id').rdd.flatMap(list).cartesian(jobs_df_subset.select('doc_id').rdd.flatMap(list))


In [234]:
pairId.take(2)

[('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce'),
 ('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  'f1f47a8f-4d51-4bd2-b4cf-5278e30f3ad7')]

In [237]:
pairProdDF = pairId.map(lambda x : (x[0],x[1],  similarities(x[0], x[1], lookupTable)))

In [238]:
pairProdDF.take(5)

[('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  1.0),
 ('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  'f1f47a8f-4d51-4bd2-b4cf-5278e30f3ad7',
  0.0),
 ('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  '8c34a43e-306a-4c18-848c-4bce89fbf94f',
  0.0),
 ('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  'b649b6f8-9b46-4162-a55a-0e70d74267e5',
  0.7371461382089741),
 ('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  'e7e14178-ba4c-4600-b77a-860aa7f4d648',
  0.11476631991613744)]

In [239]:
jobs_df_subset.limit(1).select("doc_id").collect()

[Row(doc_id='1cbf5725-d6de-414d-9c9d-438c10bdd6ce')]

In [240]:
resultDF = pairProdDF.filter(lambda x : x[1] == "1cbf5725-d6de-414d-9c9d-438c10bdd6ce")

In [242]:
resultDF.take(10)

[('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  1.0),
 ('f1f47a8f-4d51-4bd2-b4cf-5278e30f3ad7',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.0),
 ('8c34a43e-306a-4c18-848c-4bce89fbf94f',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.0),
 ('b649b6f8-9b46-4162-a55a-0e70d74267e5',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.7371461382089741),
 ('e7e14178-ba4c-4600-b77a-860aa7f4d648',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.11476631991613744),
 ('42936415-eeb5-474e-aa68-35c061e03b61',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.0),
 ('4d5cb423-bb20-4bf4-a5fc-0b9aa5948c3c',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.0),
 ('e0d1c970-1bef-4d96-a1bd-a40dc98a40cf',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.0),
 ('60e60a75-e7a6-40a4-aafd-4cbe9842ce36',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.4047057683806313),
 ('3c8b32c1-9b08-416a-be62-d098db67d7cb',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.0)]

In [245]:
resultDF.cache()

PythonRDD[550] at RDD at PythonRDD.scala:53

In [246]:
resultDF.sortBy(lambda x : x[2],ascending = False).take(4)

[('1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  1.0),
 ('b649b6f8-9b46-4162-a55a-0e70d74267e5',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.7371461382089741),
 ('60e60a75-e7a6-40a4-aafd-4cbe9842ce36',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.4047057683806313),
 ('e1a83059-ad89-4f05-a260-1cff029a6be7',
  '1cbf5725-d6de-414d-9c9d-438c10bdd6ce',
  0.4047057683806313)]