In [0]:
# Setup:

## The following code will load the necessary PySpark libraries.

# The following code will load PySpark and start a spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# The following code will load the SQL function with the alias "SQL".
from pyspark.sql import functions as SQL

In [0]:
# Data Cleaning and Preparation:

## The following code will read the files uploaded as tables into data frames.

# This function will read the "skill2vec_50K.csv.gz" file as a data frame without headers.
df = spark.read.csv("/FileStore/tables/skill2vec_50K.csv.gz", header = False)

# This function will read the "Technology_Skills.txt" file as a data frame with tab as the separator and the first row as the header.
df_text = spark.read.csv("/FileStore/tables/Technology_Skills.txt", sep = "\t", header = True)

In [0]:
#Q1)

## The following code will extract the amount of distinct job descriptions count using PySpark's SQL count distinct function.

from pyspark.sql.functions import countDistinct, col

DistinctCount = df.agg(countDistinct(col(df.columns[0])).alias("Distinct Job Descriptions Count"))

DistinctCount.show()

+-------------------------------+
|Distinct Job Descriptions Count|
+-------------------------------+
|                          50000|
+-------------------------------+



In [0]:
#Q2)

## The following code will find the frequency of each skill and show the top 10 skill frequencies using PySpark's map reduce functions.

# The data frame will be transformed into an RDD of skills using the flatMap function by applying a function to each value in the RDD and then flattening the results.
# The lambda function will iterate over each row and extract the skills while also filtering out missing values. The set function will filter out duplicate skills.
skills_rdd = df.rdd.flatMap(lambda row: set([skill for skill in row[1:] if skill is not None]))

# This function will map each skill to a tuple of (skill, 1) for counting.
count_rdd = skills_rdd.map(lambda skill: (skill, 1))

# This function will reduce the tuples by skill to get the total frequency of each skill.
freq = count_rdd.reduceByKey(lambda x, y: x + y)

# The following function will convert the resulting RDD back into a data frame with the columns "Skill" and "Freq" sorted in descending order of  frequency of skills.
frequency = freq.toDF(["Skill", "Freq"]).orderBy("Freq", ascending = False)

#The following function will show the top 10 results.
frequency.show(10)

+--------------------+----+
|               Skill|Freq|
+--------------------+----+
|                Java|1911|
|          Javascript|1770|
|               Sales|1705|
|Business Development|1545|
|    Web Technologies|1313|
|Communication Skills|1305|
|         development|1238|
|           Marketing|1184|
|             Finance|1078|
|                HTML|1067|
+--------------------+----+
only showing top 10 rows



In [0]:
#Q3)

## The following code will find the count of skills in each job descriptions and then show the most 5 occuring count of skills using PySpark's map reduce functions.

from pyspark.sql.functions import desc

# This function will transform the data frame into an RDD of tuples with job descriptions and a list of skills.
rdd = df.rdd.map(lambda x: (x[0], x[1:]))

# This code will define a map function that extracts the job description, counts the number of unique skills for the job, and returns a list of tuples with job descriptions, count, and skill.
def map_fun(row):
    job_descriptions, skills = row
    counted_skills = set()
    unique_skills = []
    for skill in skills: 
        if skill and skill not in counted_skills:
            counted_skills.add(skill)
            unique_skills.append(skill)
    count = len(unique_skills)
    return [(job_descriptions, count, skill) for skill in unique_skills]

# This code will define a reduce function that sums the skill counts for each job description.
def reduce_fun(x, y): return (x[0], x[1] + y[1], x[2] + y[2])

# This function will apply the map and reduce functions to the RDD to get a flattened RDD with job descriptions, skill count, and skill.
skill_count_rdd = rdd.flatMap(lambda x: map_fun(x)).map(lambda x: ((x[0], x[1]), x[2])).reduceByKey(lambda x, y: x + ', ' + y).map(lambda x: (x[0][0], x[0][1]))

# This function will convert the RDD back into a data frame with columns "Job Description" and "Skill Count", ordered by skill count in descending order.
skill_count_df = skill_count_rdd.toDF(["Job Description", "Skill Count"]).orderBy("Skill Count", ascending = False)

# This code will group the data frame by skill count and count the number of occurrences.
skill_count_occurrence = skill_count_df.groupBy("Skill Count").count()

# This code will order the resulting data frame by skill count frequency in descending order and show the top 5 most frequent skill counts.
skill_count_occurrence = skill_count_occurrence.orderBy(desc("count"))
skill_count_occurrence.show(5)

+-----------+-----+
|Skill Count|count|
+-----------+-----+
|         10|10477|
|          5| 3432|
|          6| 3405|
|          1| 3386|
|          7| 3345|
+-----------+-----+
only showing top 5 rows



In [0]:
#Q4)

## The following code will find the frequency of each skill and show the top 10 skill frequencies using PySpark's map reduce functions after lowercasing the skills.

from pyspark.sql.functions import lower

# This function will apply the lower function to all columns in the data frame that are not missing and create a new data frame with the updated values.
lowercase_df = df.select([lower(col).alias(col) if col in df.columns else col for col in df.columns])

# The data frame will be transformed into an RDD if skills using the flatMap function by applying a function to each value in the RDD and then flattening the results.
# The lambda function will iterate over each row and extract the skills by filtering out missing values.
skills_rdd_lowercase = lowercase_df.rdd.flatMap(lambda row: [skill for skill in row[1:] if skill is not None])

# This function will map each skill to a tuple of (skill, 1) for counting.
skills_rdd_lowercase_map = skills_rdd_lowercase.map(lambda skill: (skill, 1))

# This function will reduce the tuples by skill to get the total frequency of each skill.
skills_rdd_lowercase_reduce = skills_rdd_lowercase_map.reduceByKey(lambda x, y: x + y)

# This code will group the lowercased data frame by skill count and count the number of occurrences.
lowercase_frequency = skills_rdd_lowercase_reduce.toDF(["Skill", "Freq"]).orderBy("Freq", ascending = False)

# The following function will show the top 10 results.
lowercase_frequency.show(10)

+--------------------+----+
|               Skill|Freq|
+--------------------+----+
|                java|2759|
|          javascript|2738|
|               sales|2680|
|business development|2108|
|           marketing|1809|
|                 sql|1564|
|              jquery|1547|
|                html|1540|
|communication skills|1539|
|                 bpo|1530|
+--------------------+----+
only showing top 10 rows



In [0]:
#Q5)

## The following code will use PySpark's SQL functions to remove the non-null and duplicate skills, lowercase the data frames, join the data frames by the skills, and find the skill count before and after joint. (The questions will be divided into 3 parts to ease interpretation.)


## Part A: Transforming the dataset to find the count of skills in each row.


# This function will get a list of column names from the data frame.
skills = df.columns

# This function will create a new data frame with an array column containing the list of skills for each row.
skills_array = df.select(SQL.array(skills).alias("skills"))

# This function will add a new column to the data frame with the number of skills in the skills array.
skills_array_count = skills_array.withColumn("skills_freq", SQL.size("skills"))
 
# This function will add a new column to the data frame with an array of non-null skills for each row.
skills_array_count_nonNA = skills_array_count.withColumn("skills_nonNA", SQL.array_except("skills", SQL.array(SQL.lit(None))))
 
# This function will add a new column to the data frame with the total number of non-null skills for each row.
skills_array_count_nonNA_frequency = skills_array_count_nonNA.withColumn("skills_freq_nonNA", SQL.size(SQL.col("skills_nonNA")))


## Part B: Removing duplicate values from the rows and finding the initial skill count after lower-casing.
 
    
# This function will define a UDF that removes the first element of an array.
remove = SQL.udf(lambda x: x[1:], SQL.ArrayType(SQL.StringType()))
 
# This function will remove the first element of each skills_nonNA array and update the data frame.
frequency = skills_array_count_nonNA_frequency.withColumn('skills_nonNA', remove(SQL.col('skills_nonNA')))

# This function will add a new column to the data frame with the count of skills in the skills_nonNA array for each row.
frequency_df = frequency.withColumn("skills_freq", SQL.size("skills_nonNA"))

# This function will create a new data frame with the 'skills' column exploded into separate rows.
frequency_df_exploded = frequency_df.withColumn("skills", SQL.explode(SQL.col( "skills_nonNA" )))

# This function will select only the 'skills' column from the 'frequency_df_explode' data frame.
frequency_count_df = frequency_df_exploded.select(SQL.col("skills"))

# This function will create a new data frame with the 'skills' column in lower case.
skills_count_lowered = frequency_count_df.select(SQL.lower(frequency_count_df.skills).alias("skills_lowered"))

# This function will count the number of rows in the 'skills_count_lowered' data frame.
count_skills = skills_count_lowered.count()


## Part C: Lowercasing the new data frame and joining the data frames on skills to find the count after join.


# This function will create a new data frame with the 'Example' column in lower case.
df_text_lowered = df_text.select("*", SQL.lower(df_text.Example).alias("skills_lowered"))
 
# This function will join the 'skills_count_lowered' and 'df_text_lowered' data frames on the 'skills_lowered' column with an inner join.
joint_df = skills_count_lowered.join(df_text_lowered, df_text_lowered["skills_lowered"] == skills_count_lowered["skills_lowered"], "inner")

# This function will count the number of rows in the 'joint_df' data frame.
joint_sum_of_skill_counts = joint_df.count()
 
# This function will print the count of skills before and after the join.
print("The count of skills before the joint is", count_skills, ", and after the join it becomes", joint_sum_of_skill_counts, ".")

The count of skills before the joint is 463803 , and after the join it becomes 1101498 .


In [0]:
#Q6)

## The following code will find the total distinct skill count grouped by commodity tittle and show the 10 highest results using PySpark SQL's select, count and sort functions.
joint_df_commodity = joint_df.select(SQL.col("Commodity Title"))
joint_df_commodity.groupby("Commodity Title").count().sort("count", ascending = False).show(truncate = False, n = 10)


+-------------------------------------------------+------+
|Commodity Title                                  |count |
+-------------------------------------------------+------+
|Object or component oriented development software|324521|
|Web platform development software                |298754|
|Operating system software                        |190926|
|Development environment software                 |53013 |
|Data base management system software             |44132 |
|Analytical or scientific software                |33552 |
|Web page creation and editing software           |31682 |
|Data base user interface and query software      |29436 |
|Spreadsheet software                             |18568 |
|File versioning software                         |13846 |
+-------------------------------------------------+------+
only showing top 10 rows

