In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.types import ArrayType, FloatType, StringType, DoubleType
import heapq
import operator

In [8]:
spark = SparkSession.builder.appName("tfidf").config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.2.0").getOrCreate()
df =  spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://user:indeedjob@ec2-52-55-205-188.compute-1.amazonaws.com/appDatabase.jobs").load()
#(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 2000)
data = df.select('_id','job_title','job_description_cleaned')
#hashtf = HashingTF(numFeatures=2**16, inputCol="job_description_cleaned", outputCol='tf')
countVectors = CountVectorizer(inputCol="job_description_cleaned", outputCol="tf", vocabSize=2**16, minDF=5)
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
#label_stringIdx = StringIndexer(inputCol = "job_title", outputCol = "label")
pipeline = Pipeline(stages=[countVectors,idf])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show()

+----------------+--------------------+-----------------------+--------------------+--------------------+
|             _id|           job_title|job_description_cleaned|                  tf|            features|
+----------------+--------------------+-----------------------+--------------------+--------------------+
|000004eaddcb4ae7|Research Account ...|   [current, employe...|(65536,[0,1,5,7,1...|(65536,[0,1,5,7,1...|
|000016cd690914cd|Recruitment Coord...|   [temporaryrespons...|(65536,[1,4,9,12,...|(65536,[1,4,9,12,...|
|0000c4848b2b952d|Electrical Contro...|   [yearsr, electric...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|
|0000ef8321b7ed98|Product Developme...|   [like, join, fort...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|
|00010db38efb0901| Systems Engineer II|   [overview, new, i...|(65536,[0,1,2,4,8...|(65536,[0,1,2,4,8...|
|00015de142c7152f|Marketing Data an...|   [role, looking, m...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|
|00019ff96d26f26a| Procurement Analyst|   [yea

In [9]:
stages = pipelineFit.stages
vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
vectorizers[0].vocabulary

['experience',
 'work',
 'will',
 'data',
 'team',
 'skills',
 'business',
 'management',
 'development',
 'ability',
 'systems',
 'required',
 'support',
 'software',
 'years',
 'technical',
 'design',
 'knowledge',
 'requirements',
 'including',
 'engineering',
 'information',
 'new',
 'product',
 'services',
 'quality',
 'working',
 'solutions',
 'position',
 'job',
 'project',
 'must',
 'environment',
 'related',
 'degree',
 'technology',
 'system',
 'customer',
 'strong',
 'provide',
 'security',
 'company',
 'analysis',
 'process',
 'responsibilities',
 'service',
 'develop',
 'andor',
 'customers',
 'us',
 'preferred',
 'qualifications',
 'communication',
 'projects',
 'ensure',
 'status',
 'computer',
 'opportunity',
 'products',
 'may',
 'tools',
 'within',
 'teams',
 'performance',
 'program',
 'applications',
 'time',
 'test',
 'application',
 'responsible',
 'employment',
 'testing',
 'training',
 'maintain',
 'education',
 'duties',
 'understanding',
 'processes',
 'scienc

In [10]:
def example(arr,vocab):
    ind = sorted(range(len(arr)), key=lambda i: arr[i])[-100:]
    ans = []
    for i in ind:
        ans.append(vocab[i])
    return ans

def make_topic_word(vocab):
     return udf(lambda arr: example(arr, vocab))

In [11]:
#udfSum = udf(lambda x: example(x), ArrayType(FloatType())) 
vector_udf = udf(lambda vector: vector.toArray().tolist(),ArrayType(DoubleType()))
df = dataset.withColumn("position_array", vector_udf(dataset.features)) 
df.show(5)
df.printSchema()

+----------------+--------------------+-----------------------+--------------------+--------------------+--------------------+
|             _id|           job_title|job_description_cleaned|                  tf|            features|      position_array|
+----------------+--------------------+-----------------------+--------------------+--------------------+--------------------+
|000004eaddcb4ae7|Research Account ...|   [current, employe...|(65536,[0,1,5,7,1...|(65536,[0,1,5,7,1...|[0.28210629980809...|
|000016cd690914cd|Recruitment Coord...|   [temporaryrespons...|(65536,[1,4,9,12,...|(65536,[1,4,9,12,...|[0.0, 0.369906709...|
|0000c4848b2b952d|Electrical Contro...|   [yearsr, electric...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|[0.14105314990404...|
|0000ef8321b7ed98|Product Developme...|   [like, join, fort...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|[0.14105314990404...|
|00010db38efb0901| Systems Engineer II|   [overview, new, i...|(65536,[0,1,2,4,8...|(65536,[0,1,2,4,8...|[0.564

In [12]:
words_udf = udf(lambda arr: example(arr,vectorizers[0].vocabulary),ArrayType(StringType()))
df2 = df.withColumn("imp_words",make_topic_word(vectorizers[0].vocabulary)(df.position_array))
df2.show(5)

+----------------+--------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+
|             _id|           job_title|job_description_cleaned|                  tf|            features|      position_array|           imp_words|
+----------------+--------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+
|000004eaddcb4ae7|Research Account ...|   [current, employe...|(65536,[0,1,5,7,1...|(65536,[0,1,5,7,1...|[0.28210629980809...|[report, performi...|
|000016cd690914cd|Recruitment Coord...|   [temporaryrespons...|(65536,[1,4,9,12,...|(65536,[1,4,9,12,...|[0.0, 0.369906709...|[type, full, qual...|
|0000c4848b2b952d|Electrical Contro...|   [yearsr, electric...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|[0.14105314990404...|[degree, requirem...|
|0000ef8321b7ed98|Product Developme...|   [like, join, fort...|(65536,[0,1,2,3,4...|(65536,[0,1,2,3,4...|[0.1410

In [13]:
df2.printSchema()

root
 |-- _id: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_description_cleaned: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- position_array: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- imp_words: string (nullable = true)



In [14]:
df2.select('_id','imp_words').write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("uri","mongodb://user:indeedjob@ec2-52-55-205-188.compute-1.amazonaws.com/appDatabase.jobs2").save()

In [None]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
#print("Training Dataset Count: " + str(trainingData.count()))
#print("Test Dataset Count: " + str(testData.count()))

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
#predictions.filter(predictions['prediction'] == 0) \
    #.select("job_description_cleaned","job_title","probability","label","prediction") \
    #.orderBy("probability", ascending=False) \
    #.show(n = 10, truncate = 30)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [None]:
k = [2,10,5,15,6]
sorted(range(len(k)), key=lambda i: k[i])[-3:]