<a href="https://colab.research.google.com/github/simran-dk/777-Term-Project-Team9/blob/main/Prediction_using_dt_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fb59b76d13590b3520f3f3961e94b5d22fdb9b1693aadb3acb160e919cf857ff
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
# Mount data from drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [62]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, expr, when, udf, explode_outer, collect_list, reverse
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, ArrayType, DoubleType
from datetime import datetime

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Process JSON Files") \
    .getOrCreate()

# Read all JSON files from a directory
json_directory_path = "/content/drive/My Drive/raw_data"
df = spark.read.option("multiline", "true").json(json_directory_path)

def clean_major(degree_list):
    if not degree_list:
        return None
    major = degree_list.pop(-1)
    if any(char.isnumeric() for char in major):
        return None
    return major

def convert_to_std_date(date_str):
    if date_str is None:
        return None

    try:
        date_obj = datetime.strptime(date_str, "%b %Y")
    except ValueError:
        try:
            # Handle the case where only the year is provided
            date_obj = datetime.strptime(date_str, "%Y")
        except ValueError:
            return None

    standard_date = date_obj.strftime("%Y-%m-%d")
    return standard_date


def convert_to_number(s):
    if s is None or s != s:  # Check for None or NaN (NaN != NaN)
        return 0
    try:
        s = s.strip().replace(',', '')
        if 'K' in s:
            s = s.replace('K', '')
            return float(float(s) * 1000)
        elif '+' in s:
            s = s.replace('+', '')
            return float(s)
    except ValueError:
        return 0
    return float(s)


def clean_degree_level(degree: str) -> str:
    if not degree:
        return None
    degree = degree.strip().lower()
    if any([d in degree for d in ["master of science", "master", "ms", "m.s", "m.a.", "ma"]]):
        return "master"
    elif any([d in degree for d in ['doctor', 'phd']]):
        return "PhD"
    elif any([d in degree for d in ['bs', 'b.s', 'bachelor', 'b.a', 'ba']]):
        return "bachelor"
    elif any([d in degree for d in ["associate's", "associate"]]):
      return "associate"
    return degree

def clean_job_level(position) -> str:
    if not position:
        return None
    job_title = position.job_title.strip().lower()
    if "intern" in job_title:
        return 'intern'
    elif any([t in job_title for t in ["student"]]):
        return "student"
    elif job_title == 'ta' or any([t in job_title for t in ["teacher's assistant", "course assistant",
                                                            'teacher assistant', "teaching assistant",
                                                            'teachers assistant', "learning assistant"]]):
        return "TA"
    elif any([t in job_title for t in ["research assistant"]]):
        return "research assistant"
    elif any([t in job_title for t in ["jr.", "junior", 'entry', 'associate']]):
        return "entry level"
    elif any([t in job_title for t in ["senior", 'sr.']]):
        return "senior"
    elif any([t in job_title for t in ["director", 'vp', 'vice president']]):
        return "director/vp"
    elif any([t in job_title for t in ["tutor"]]):
        return "tutor"
    elif any([t in job_title for t in ["consultant", "contractor"]]):
        return "consultant"
    elif any([t in job_title for t in ["co-founder", 'founder', 'ceo', 'president']]):
        return "ceo/founder/president"
    return f"not_classified"

def get_position_levels_list(experience_list):
    if not experience_list:
        return None
    positions = []
    for i, experience in enumerate(experience_list):
        if not experience:
            continue
        for position in experience.positions:
            job_level = clean_job_level(position)
            positions.append(job_level)
    return positions

clean_major_udf = udf(clean_major,StringType())
clean_degree_level_udf = udf(clean_degree_level, StringType())
convert_to_std_date_udf = udf(convert_to_std_date, StringType())
convert_to_number_udf = udf(convert_to_number, DoubleType())
get_position_levels_list_udf = udf(get_position_levels_list, ArrayType(StringType()))

# Explode education degree array to separate rows for each degree
df_degree_info = df.withColumn("degree_info", explode_outer(df.education.degree))

# Extract majors and degree levels
df_majors = df_degree_info \
    .withColumn("degree_name", col("degree_info").getItem(0)[0]) \
    .withColumn("major", col("degree_info").getItem(0)[1]) \
    .groupBy("id") \
    .agg(collect_list("major").alias("majors"), collect_list("degree_name").alias("degree_levels"))

# Explode experiences array to separate rows for each job position
df_positions = df.withColumn("experience_info", explode_outer(col("experiences"))) \
         .withColumn("positions", explode_outer(col("experience_info.positions"))) \
         .withColumn("job_title", get_position_levels_list_udf(col("positions.job_title"))) \
         .withColumn("company_name", col("experience_info.company_name")) \
         .groupBy("id") \
         .agg(collect_list("job_title").alias("positions"), collect_list("company_name").alias("companies"))

# Extract activity followers and connections
df_activities = df.withColumn("followers", convert_to_number_udf(col("activity.followers"))) \
                   .withColumn("connections", convert_to_number_udf(col("activity.connections"))) \
                   .select("id", "followers", "connections")

df_position = df.withColumn("position_levels", get_position_levels_list_udf(df.experiences))\
                .select("id", "position_levels")

# Join the dataframes to get the final dataframe
df_f = df_majors.join(df_positions, on="id", how="inner").join(df_position, on="id", how="inner").join(df_activities, on="id", how="inner").select("id", "majors", "degree_levels", "position_levels", "companies", "followers", "connections")

df_final=df_f.withColumn("previous_positions", reverse(expr("filter(position_levels, x -> x != position_levels[0])"))) \
                   .withColumn("current_position", expr("position_levels[0]")) \
                   .select("id", "majors", "degree_levels", "previous_positions", "current_position", "companies", "followers", "connections")\

# Show the final dataframe
df_final.show(truncate=False)
# Stop SparkSession
#.withColumn("Start date of Job", convert_to_std_date_udf(df.experiences.positions.start[0]))\
#.drop(*["activity", "last_generated_at", "education","experiences","skills"])

+------------------------------------+------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----------+
|id                                  |majors                                                                              |degree_levels                                                               

In [66]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Position Prediction") \
    .getOrCreate()

# Assuming your Spark DataFrame is called 'df_final'
# Explode the 'majors' array column
exploded_df = df_final.select(explode("majors").alias("majors"), "degree_levels", "previous_positions", "current_position", "followers", "connections") \
    .select("majors", explode("degree_levels").alias("degree_levels"), "previous_positions", "current_position", "followers", "connections") \
    .select("majors", "degree_levels", explode("previous_positions").alias("previous_positions"), "current_position", "followers", "connections")

# Apply the imputer model to the training
# Update categorical columns to include the exploded 'major' column
categorical_cols = ['majors', 'degree_levels', 'previous_positions','followers','connections']

# Apply StringIndexer to each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(exploded_df) for col in categorical_cols]
indexed_df = exploded_df
for indexer in indexers:
    indexed_df = indexer.transform(indexed_df)

# Index the label column 'current_position'
label_indexer = StringIndexer(inputCol="current_position", outputCol="label").fit(indexed_df)
indexed_df = label_indexer.transform(indexed_df)

feature_cols=['majors_index', 'degree_levels_index', 'previous_positions_index','followers','connections']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
selected_df = assembler.transform(indexed_df).select("features", "label")

# Split data into training and testing sets
train_df, test_df = selected_df.randomSplit([0.8, 0.2], seed=42)

# Check if train_df has any rows
if train_df.count() > 0:
    # Initialize and train the decision tree classifier with increased maxBins
    dt_classifier = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxBins=2000)
    dt_model = dt_classifier.fit(train_df)
else:
    print("The training data is empty. Unable to train the model.")

In [103]:
# Make predictions on the test data
predictions = dt_model.transform(test_df)

# Evaluate the predictions
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Convert indexed labels back to original labels for actual labels
label_converter_actual = IndexToString(inputCol="label", outputCol="actual_position", labels=label_indexer.labels)
predictions_with_actual = label_converter_actual.transform(predictions)

# Convert predicted labels back to original labels
label_converter_predicted = IndexToString(inputCol="prediction", outputCol="predicted_position", labels=label_indexer.labels)
predictions_with_labels = label_converter_predicted.transform(predictions_with_actual)

# Show actual and predicted labels along with accuracy
print("Accuracy:", accuracy)

Accuracy: 0.6858974358974359


In [102]:
from pyspark.sql import Row

# Create a test DataFrame with the provided input
test_input = [
    Row(majors=["computer science", "applied data analytics"], degree_levels=["B.Tech", "MS"], previous_positions=["TA","Intern","Junior Developer"], followers=280, connections=500)
]

test_input_df = spark.createDataFrame(test_input)
exploded_df = test_input_df.select(explode("majors").alias("majors"), "degree_levels", "previous_positions", "followers", "connections") \
    .select("majors", explode("degree_levels").alias("degree_levels"), "previous_positions", "followers", "connections") \
    .select("majors", "degree_levels", explode("previous_positions").alias("previous_positions"), "followers", "connections")

# Apply the imputer model to the training
# Update categorical columns to include the exploded 'major' column
categorical_cols = ['majors', 'degree_levels', 'previous_positions','followers','connections']

# Apply StringIndexer to each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(exploded_df) for col in categorical_cols]
indexed_df = exploded_df
for indexer in indexers:
    indexed_df = indexer.transform(indexed_df)


feature_cols=['majors_index', 'degree_levels_index', 'previous_positions_index','followers','connections']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
selected_df = assembler.transform(indexed_df).select("features")

predictions = dt_model.transform(selected_df)

from pyspark.ml.feature import IndexToString

# Reverse index the predicted labels to get the actual current_position names
label_converter = IndexToString(inputCol="prediction", outputCol="predicted_position", labels=label_indexer.labels)
predictions_with_names = label_converter.transform(predictions)

# Show the prediction with current_position names
from pyspark.sql.functions import size

# Initialize a dictionary to store the maximum number of elements in each array column
max_elements_per_column = {}

# Iterate over the columns
for column in test_input_df.columns:
    if isinstance(test_input_df.schema[column].dataType, ArrayType):
        # Find the maximum number of elements in the current array column
        max_elements = test_input_df.selectExpr(f"max(size({column})) as max_elements").collect()[0]["max_elements"]
        max_elements_per_column[column] = max_elements

# Print the maximum number of elements in each array column
max_index=0
for column, max_elements in max_elements_per_column.items():
  if max_index<=max_elements:
    max_index=max_elements

# Convert the predicted_position column to a list
predicted_positions_list = predictions_with_names.select("predicted_position").rdd.flatMap(lambda x: x).collect()

# Print the predicted position at the specified index
if max_index - 1 < len(predicted_positions_list):
    predicted_position_at_index = predicted_positions_list[max_index - 1]
    print(f"The predicted position is: {predicted_position_at_index}")
else:
    print(f"There is no predicted position.")


The predicted position is: senior
