# Frequent Item Sets



# Local run - OOM

In [1]:
import numpy as np
import pandas as pd
pd.set_option('max_columns', None)
import ast

In [2]:
from pyspark.sql import SparkSession

app_name = "EMSI-Tech-jobs"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql import types
import pyspark.sql.functions as F

In [6]:
SkillCounts = spark.read.parquet("data3/skill_counts.parquet")

In [7]:
SkillCounts = SkillCounts.withColumn("skill_id",F.monotonically_increasing_id())

In [None]:
SkillCounts = SkillCounts.withColumnRenamed('count','skill_count')

In [13]:
SkillCounts = SkillCounts.withColumnRenamed('skill','skill_label')

In [14]:
SkillCounts = SkillCounts['skill_id','skill_label','skill_count']

In [17]:
SkillCounts.write.format("parquet").save("data3/skills.parquet")

In [4]:
SkillCounts = spark.read.parquet("data3/skills.parquet")

In [5]:
SkillCounts.show()

+--------+--------------------+-----------+
|skill_id|         skill_label|skill_count|
+--------+--------------------+-----------+
|       0|      VMware VSphere|      25229|
|       1|Application Devel...|     225364|
|       2|         Geolocation|         57|
|       3| Continuous Delivery|      76923|
|       4|          Whitespace|        141|
|       5|Emotional Intelli...|       3273|
|       6|Capability Maturi...|       3293|
|       7|OpenEdge Advanced...|        532|
|       8|  Accounting Systems|       3410|
|       9|Program Design La...|       6832|
|      10|             Staging|      12349|
|      11| Space Manufacturing|          7|
|      12|             CANopen|        341|
|      13|    Looker Analytics|       1867|
|      14|Excel Pivot Table...|       7688|
|      15|Operational Planning|       1476|
|      16|          Dropwizard|       1827|
|      17|       Blade Servers|       3735|
|      18|            Mock Ups|       5511|
|      19|  Hyperion Smartview| 

In [None]:
SkillCounts.coalesce(1).write.csv('data3/skills.csv')

In [6]:
SkillsDict = SkillCounts.toPandas().set_index('skill_id').T.to_dict('list')

In [9]:
l = [17179869327, 60129543906, 8589936773, 60129543668]
for i in l:
    print(SkillsDict[i])

['C++ (Programming Language)', 329335]
['Software Development', 657362]
['Software Engineering', 685535]
['Java (Programming Language)', 871036]


In [29]:
#220538	[1003, 51539609299, 34359738519]
for i in [1003, 51539609299, 34359738519]:
    print(SkillsDict[i])

['HyperText Markup Language (HTML)', 339988]
['Cascading Style Sheets (CSS)', 424214]
['JavaScript (Programming Language)', 691748]


In [30]:
#142547	[17179869785, 17179870992, 25769804381]
for i in [17179869785, 17179870992, 25769804381]:
    print(SkillsDict[i])

['.NET Framework', 380282]
['C# (Programming Language)', 430314]
['SQL (Programming Language)', 783989]


In [31]:
#137793	[51539609299, 34359738519, 25769804381]
for i in [51539609299, 34359738519, 25769804381]:
    print(SkillsDict[i])

['Cascading Style Sheets (CSS)', 424214]
['JavaScript (Programming Language)', 691748]
['SQL (Programming Language)', 783989]


In [32]:
#136291	[51539609299, 34359738519, 60129543668]
for i in [51539609299, 34359738519, 60129543668]:
    print(SkillsDict[i])

['Cascading Style Sheets (CSS)', 424214]
['JavaScript (Programming Language)', 691748]
['Java (Programming Language)', 871036]


In [33]:
#135419	[1317, 51539609299, 34359738519]
for i in [1317, 51539609299, 34359738519]:
    print(SkillsDict[i])

['Angular (Web Framework)', 305540]
['Cascading Style Sheets (CSS)', 424214]
['JavaScript (Programming Language)', 691748]


In [34]:
#69010	[60129543906, 8589934749, 51539608223]
for i in [60129543906, 8589934749, 51539608223]:
    print(SkillsDict[i])

['Software Development', 657362]
['Agile Software Development', 758639]
['Management', 951370]


# Read baskets

In [10]:
baskets = pd.read_csv("data3/results/freq-skill-baskets-s0.01-c0.5.csv")

In [12]:
baskets.head()

Unnamed: 0,Frquency,Items
0,114006,[1954]
1,64698,"[1954, 51539608223]"
2,329335,[17179869327]
3,46260,"[17179869327, 17179869785]"
4,64470,"[17179869327, 51539608496]"


In [40]:
def getLabels(alist):
    labels = []
    for l in ast.literal_eval(alist):
        labels.append(SkillsDict[l][0])
    return labels

In [41]:
baskets['Items'].apply(getLabels)

0                                     [Change Management]
1                         [Change Management, Management]
2                            [C++ (Programming Language)]
3            [C++ (Programming Language), .NET Framework]
4           [C++ (Programming Language), Problem Solving]
                              ...                        
3141    [Verbal Communication Skills, Software Develop...
3142    [Verbal Communication Skills, SQL (Programming...
3143            [Verbal Communication Skills, Leadership]
3144            [Verbal Communication Skills, Management]
3145                                  [Security Controls]
Name: Items, Length: 3146, dtype: object

In [42]:
baskets['labels'] = baskets['Items'].apply(getLabels)

In [44]:
baskets.head()

Unnamed: 0,Frquency,Items,labels
0,114006,[1954],[Change Management]
1,64698,"[1954, 51539608223]","[Change Management, Management]"
2,329335,[17179869327],[C++ (Programming Language)]
3,46260,"[17179869327, 17179869785]","[C++ (Programming Language), .NET Framework]"
4,64470,"[17179869327, 51539608496]","[C++ (Programming Language), Problem Solving]"


In [45]:
baskets.to_csv("data3/baskets_decoded.csv")

In [53]:
baskets4 = baskets[baskets['labels'].map(len) > 3]

In [54]:
baskets4.to_csv("data3/baskets_gt4_decoded.csv")

# Prepare the data
Encode the skills in the skillsLists.parquet dataset as integers using the SkillsDict. We can think of each row in this dataset as a "basket" in terms of market basket analysis

In [5]:
SkillBasketsDF = spark.read.parquet("data3/skillsLists.parquet")

In [9]:
SkillBasketsDF.show()

+--------------------+
|         skills_name|
+--------------------+
|[Information Tech...|
|[Research, Manage...|
|[Management, Comm...|
|[Device Managemen...|
|[Bootstrap (Front...|
|[Strategic Planni...|
|[Performance Anal...|
|[Software Version...|
|[OSI Models, Cisc...|
|[Inventory Manage...|
|[User Interface, ...|
|[Signals Intellig...|
|[Strategic Planni...|
|[Software Design,...|
|[Agile Software D...|
|[Strategic Planni...|
|[Intrusion Preven...|
|[Research, Cyber ...|
|[Firmware, Develo...|
|     [Test Planning]|
+--------------------+
only showing top 20 rows



In [62]:
sample=sc.parallelize(SkillBasketsDF.rdd.take(2))

In [72]:
sample.collect()

[Row(skills_name=['Information Technology Consulting', 'Scrum (Software Development)', 'Software Systems', 'Capability Maturity Model Integration', 'Management', 'Emotional Intelligence', 'Budgeting', 'Leadership Development', 'Service Desk', 'Systems Analysis', 'Agile Software Development', '.NET Framework', 'Infrastructure', 'Auditing', 'Coordinating', 'Resource Management', 'Interpersonal Skills', 'Oral Communication', 'Performance Metric', 'Development Testing', 'Certified Scrum Master', 'User Requirements Documents', 'Systems Development Life Cycle', 'Model View Controller', 'Leadership', 'Program Management', 'Project Management Professional Certification', 'Strategic Thinking', 'Business Requirements', 'Stakeholder Management', 'Project Management', 'Operations', 'Web Services', 'Written Communication', 'Software Product Management', 'Software Engineering', 'Simple Object Access Protocol (SOAP)', 'SQL (Programming Language)', 'Information Technology Infrastructure Library', 'Mic

In [6]:
SkillCountsDict = SkillCounts.toPandas().set_index('skill_label').T.to_dict('list')

In [11]:
# {skill: [skill_id, skill_count]}
SkillCountsDict

{'VMware VSphere': [0, 25229],
 'Application Development': [1, 225364],
 'Geolocation': [2, 57],
 'Continuous Delivery': [3, 76923],
 'Whitespace': [4, 141],
 'Emotional Intelligence': [5, 3273],
 'Capability Maturity Model': [6, 3293],
 'OpenEdge Advanced Business Language': [7, 532],
 'Accounting Systems': [8, 3410],
 'Program Design Languages': [9, 6832],
 'Staging': [10, 12349],
 'Space Manufacturing': [11, 7],
 'CANopen': [12, 341],
 'Looker Analytics': [13, 1867],
 'Excel Pivot Tables And Charts': [14, 7688],
 'Operational Planning': [15, 1476],
 'Dropwizard': [16, 1827],
 'Blade Servers': [17, 3735],
 'Mock Ups': [18, 5511],
 'Hyperion Smartview': [19, 86],
 'Java 8': [20, 12957],
 'Warehouse Management': [21, 3049],
 'IP Routing': [22, 7143],
 'Brokerage': [23, 4891],
 'Internal Communications': [24, 3985],
 'Account Planning': [25, 1306],
 'Microsoft 365': [26, 987],
 'Runtime Systems': [27, 1026],
 'Strategic Fit': [28, 256],
 'Sqoop': [29, 9310],
 'Licensed Professional Coun

In [7]:
def encodeSkills(row):
    skill_list = row[0]
    encoded_list=[]
    for skill in skill_list:
         encoded_list.append(SkillCountsDict[skill][0])
            
    return (0,(encoded_list))

In [8]:
encodedSkillsBaskets=SkillBasketsDF.rdd.map(encodeSkills).cache()

In [9]:
basketsDF = encodedSkillsBaskets.toDF(['n','skills']).drop('n').withColumn("id",F.monotonically_increasing_id())

In [10]:
basketsDF=basketsDF['id','skills']

In [11]:
basketsDF.write.format("parquet").save("data3/encoded_skills_baskets.parquet")

In [16]:
sample=basketsDF.sample(fraction=0.1, seed=3)

In [17]:
sample.show()

+---+--------------------+
| id|              skills|
+---+--------------------+
|  2|[51539608223, 858...|
|  8|[60129542533, 171...|
| 15|[25769803778, 179...|
| 16|[51539608778, 343...|
| 36|[51539609298, 257...|
| 44|[42949674080, 171...|
| 59|[1792, 1948, 8589...|
| 61|[34359740073, 515...|
| 65|[51539609298, 343...|
| 84|[60129543915, 601...|
| 95|[17179869327, 171...|
| 97|[51539608223, 515...|
|105|[60129542674, 363...|
|107|[17179869936, 343...|
|111|[25769804962, 515...|
|115|[51539609042, 429...|
|120|       [34359738799]|
|122|[1318, 6012954227...|
|140|[25769804961, 682...|
|151|[17179869785, 858...|
+---+--------------------+
only showing top 20 rows



# FPGrowth
- https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth
- http://dx.doi.org/10.1145/1454008.1454027
- http://dx.doi.org/10.1145/335191.335372


In [18]:
from pyspark.ml.fpm import FPGrowth

df = spark.createDataFrame([
    (0, [1, 2, 5]),
    (1, [1, 2, 3, 5]),
    (2, [1, 2])
], ["id", "skills"])

df = basketsDF

fpGrowth = FPGrowth(itemsCol="skills", minSupport=0.05, minConfidence=0.1)
model = fpGrowth.fit(df)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show()

Py4JJavaError: An error occurred while calling o145.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 12.0 failed 1 times, most recent failure: Lost task 4.0 in stage 12.0 (TID 58, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.util.IdentityHashMap.resize(IdentityHashMap.java:472)
	at java.util.IdentityHashMap.put(IdentityHashMap.java:441)
	at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:174)
	at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:225)
	at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:224)
	at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
	at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
	at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
	at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.IdentityHashMap.resize(IdentityHashMap.java:472)
	at java.util.IdentityHashMap.put(IdentityHashMap.java:441)
	at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:174)
	at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:225)
	at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:224)
	at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
	at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
	at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
	at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [35]:
# model.freqItemsets.count()

In [None]:
s3://emsiskills/skills-baskets
ssh -i ~/set.pem -ND 8157 hadoop@ec2-52-213-35-134.eu-west-1.compute.amazonaws.com