In [26]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StringType, ArrayType, IntegerType, FloatType
from pyspark.sql.functions import concat,concat_ws, lit, length,col, expr, count , row_number,collect_list,struct,udf,array,array_union, explode
from pyspark.sql.window import Window
import math
import time
import findspark
import csv
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import min

# PART 1:
## 1. Grouping the similar processes according to Jaccard Similarities
## 2. Creating the new data 

# code to start the Master:
1. Open cmd and admin
2. write "cd %SPARK_HOME%"
3. bin\spark-class2.cmd org.apache.spark.deploy.master.Master
# code to start the worker:
1. Open cmd and admin
2. write "cd %SPARK_HOME%"
3. write "bin\spark-class2.cmd org.apache.spark.deploy.worker.Worker -c 2 -m 6G spark://192.168.1.81:7077"
* in step 3:
* -c -> number of cores
* -m -> amount of RAM for the current worker
* the spark link is from the Master link ( go to the web page of the master and locate the spark link )

# Initializing the Spark Aplications

In [27]:
findspark.init()
spark = SparkSession.builder \
    .appName("part1Grouping") \
    .master("spark://192.168.1.81:7077") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "3") \
    .config("spark.driver.memory", "6g") \
    .config("spark.driver.cores", "2") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.sql.broadcastTimeout", "3600s") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true") \
    .getOrCreate()

# Converting Input File to CSV Format for Processing

In [29]:
input_file = 'reverse_cases.txt'
output_file = 'reverse_cases.csv'

with open(input_file, 'r') as file:
    lines = file.readlines()
 
processed_lines = []
for line in lines:
    line = line.strip().strip('<>')
    parts = line.split(',')
    if len(parts) != 5:
        print (parts)
        print(f"Skipping malformed line: {line}")
        continue
    try:
        processed_line = {
            'FromServer': parts[0].strip("'"),
            'ToServer': parts[1].strip(),
            'time': int(parts[2].strip()),
            'action': parts[3].strip(),
            'processId': int(parts[4].strip())
        }
        processed_lines.append(processed_line)
    except ValueError as e:
        print(f"Error processing line: {line}. Error: {e}")
        continue

# exporting to CSV
headers = ['FromServer', 'ToServer', 'time', 'action', 'processId']
with open(output_file, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=headers)
    writer.writeheader()
    for row in processed_lines:
        writer.writerow(row)
 
print(f"Data has been successfully written to {output_file}")

['']
Skipping malformed line: 
['']
Skipping malformed line: 
['']
Skipping malformed line: 
['']
Skipping malformed line: 
['']
Skipping malformed line: 
['']
Skipping malformed line: 
['']
Skipping malformed line: 
Data has been successfully written to reverse_cases.csv


In [30]:
# Load the data into a DataFrame
data_path = "reverse_cases.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)


In [31]:


# Group by processID and collect the sequence of actions
processes_df = df.groupBy("processID").agg(collect_list(struct("FromServer", "ToServer", "time", "action")).alias("actions"))

# Convert actions to string for MinHash LSH
def actions_to_string(actions):
    return "".join([f"{action['FromServer']}{action['ToServer']}" for action in actions if action['action'] != 'Response'])



actions_to_string_udf = udf(actions_to_string, StringType())
processes_df = processes_df.withColumn("actions_str", actions_to_string_udf(col("actions")))


# Calculating the K for our Shingles, based on the median length of our processes

In [32]:

df_with_length = processes_df.withColumn("length", length("actions_str"))
windowSpec = Window.orderBy("length")
df_with_length = df_with_length.withColumn("row_num", row_number().over(windowSpec))
count_df = df_with_length.count()

median_row = math.ceil(count_df / 2.0)

median_length = df_with_length.filter(col("row_num") == median_row).select("length").first()

cur_k = 5
thresholds = [(10000, 9), (5000, 8), (1000, 7), (100, 6)]
for threshold, value in thresholds:
    if median_length[0] > threshold:
        cur_k = value
        break
print("for the median ",median_length," we chose ",cur_k,"-shingles")

for the median  Row(length=44)  we chose  5 -shingles


# Computing the K-Shingles

In [33]:
# Convert actions string into shingles
def get_shingles(row, k=5):
    concatenated_str = ''.join(row)
    shingles = [concatenated_str[i:i+k] for i in range(len(concatenated_str) - (k - 1))]
    return shingles


get_shingles_udf = udf(lambda x: get_shingles(x,cur_k), ArrayType(StringType()))
processes_df = processes_df.withColumn("shingles", get_shingles_udf(col("actions_str")))


In [34]:


cv = CountVectorizer(inputCol="shingles", outputCol="features",binary=True)
cv_model = cv.fit(processes_df)
vectorized_df = cv_model.transform(processes_df)
start = time.time()
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10)
mh_model = mh.fit(vectorized_df)

hashed_df = mh_model.transform(vectorized_df)
threshold = 0.5
# Find similar candidate process IDs using MinHashLSH
similarity_df = mh_model.approxSimilarityJoin(hashed_df, hashed_df, threshold, distCol="JaccardDistance") \
    .select(col("datasetA.processID").alias("processID_A"),
            col("datasetB.processID").alias("processID_B"),
            col("JaccardDistance"),col("datasetA.features").alias("featuresA"),col("datasetB.features").alias("featuresB"))

# Filter out self-joins and duplicates
similarity_df = similarity_df.filter(col("processID_A") < col("processID_B"))

# Function to calculate Jaccard similarity
def jaccard_similarity(vec1, vec2):
    set1 = set(vec1.indices)
    set2 = set(vec2.indices)
    intersection = set1.intersection(set2)
    union = set1.union(set2)
    if len(union) == 0:
        return 0.0
    return float(len(intersection)) / len(union)

jaccard_similarity_udf = udf(jaccard_similarity, FloatType())

# Calculate Jaccard similarity for each candidate pair
similarity_df = similarity_df.withColumn("JaccardSimilarity", jaccard_similarity_udf(col("featuresA"), col("featuresB")))
# Filter pairs with Jaccard similarity above a threshold (e.g., 90%)
similarity_df_filtered = similarity_df.filter(col("JaccardSimilarity") >= 0.9)

#grouping the similar pair processes:
grouped_df = similarity_df.groupBy("processID_A").agg(collect_list("processID_B").alias("similar_processIDs"))
grouped_df = grouped_df.withColumn("all_processIDs", array_union(array(col("processID_A")), col("similar_processIDs")))
exploded_df = grouped_df.select(explode(col("all_processIDs")).alias("processID"), col("processID_A").alias("group_representative"))
sum = similarity_df_filtered.count()
end = time.time()
print("the time that it takes our model to find the pairs is ",end-start, "with number of pairs = ",sum)

the time that it takes our model to find the pairs is  5.434313058853149 with number of pairs =  0


# Calculating the time for the baseline model
### Comparing all the possible pairs without using minHashLSH

In [35]:
# start = time.time()
# original_eval = vectorized_df.alias("df1").join(vectorized_df.alias("df2")).select(col("df1.processId").alias("processIdA"),col("df1.features").alias("processAFeatures"),
#                                                                   col("df2.processId").alias("processIdB"),col("df2.features").alias("processBFeatures")).orderBy(col("processIdA"))
# original_eval = original_eval.filter(col("processIdA") < col("processIdB"))
# original_eval = original_eval.withColumn("JaccardSimilarity", jaccard_similarity_udf(col("processAFeatures"), col("processBFeatures")))
# original_eval = original_eval.filter(col("JaccardSimilarity") >= 0.9)
# sum_original = original_eval.count()
# end = time.time()
# print("the time that it takes our model to find the pairs is ",end-start, "with number of pairs = ",sum_original)

# Applying Transitivity to Grouped Data

In [36]:
# Merge overlapping groups
def merge_groups(group_list):
    groups = []
    for group in group_list:
        merged = False
        for existing_group in groups:
            if any(item in group for item in existing_group):
                existing_group.update(group)
                merged = True
                break
        if not merged:
            groups.append(set(group))
    return [list(group) for group in groups]

merge_groups_udf = udf(lambda x: merge_groups(x), ArrayType(ArrayType(IntegerType())))

grouped_lists = exploded_df.groupBy("group_representative") \
    .agg(collect_list("processID").alias("group_list")) \
    .agg(collect_list("group_list").alias("group_lists"))

merged_groups = grouped_lists.withColumn("merged_groups", merge_groups_udf(col("group_lists"))) \
    .select(explode(col("merged_groups")).alias("final_group"))

final_groups_df = merged_groups.select(concat_ws("_", col("final_group")).alias("Group"), col("final_group"))

print("the number of groups ", final_groups_df.count())

final_groups_exploded = final_groups_df.withColumn("processID", explode(col("final_group")))

filtered_df = df.join(final_groups_exploded, on="processID", how="inner")

group_representative_df = final_groups_exploded.groupBy("Group").agg(min("processID").alias("representative_processID"))

representative_processes_df = group_representative_df.join(filtered_df, filtered_df["processID"] == group_representative_df.representative_processID, "inner") \
    .select("processID", "FromServer", "ToServer", "time", "action")

the number of groups  0


# Creating the final dataset

In [37]:
processes_to_remove = final_groups_df.selectExpr("explode(final_group) as processID").distinct()

df_without_groups = df.join(processes_to_remove, "processID", "left_anti")

df_without_groups = df_without_groups.select("FromServer", "ToServer", "time", "action","processID")

constant_number = df.agg({"processID": "max"}).first()[0]
new_representative_processes_df = representative_processes_df.withColumn(
    "processID",
    expr(f"processID + {constant_number}"))

new_representative_processes_df = new_representative_processes_df.select("FromServer", "ToServer", "time", "action","processID").orderBy("time")

# Combine original DataFrame and representatives DataFrame
combined_df = df_without_groups.union(new_representative_processes_df)


# creating the txt files:
## The desired files will be in the folder output

In [38]:
def write_to_one_txt(df, local_path_name,wanted_list):
    correct_path = wanted_list + "/part1Output.txt"
    formatted_df = df.withColumn(
    "formatted_line",
    concat(lit("<"), df.FromServer, lit(","),
           df.ToServer, lit(","),
           df.time, lit(","),
           df.action, lit(","),
           df.processID, lit(">"))
)
    open(correct_path, "w")
    formatted_df.select("formatted_line").write.mode("overwrite").text(output_path)
    os.system(f'cat {local_path_name}/*.txt >> {correct_path}')
    os.system(f'rm -r {local_path_name}')
    

In [39]:
output_path = "./part1OUT1"
output_path1 = "./output"
write_to_one_txt(combined_df,output_path,output_path1)

In [40]:
# creating a dataframe only with the processes that were grouped.
df_with_groups = df.join(processes_to_remove, "processID", "semi")
df_with_groups.show()

+---------+----------+--------+----+------+
|processId|FromServer|ToServer|time|action|
+---------+----------+--------+----+------+
+---------+----------+--------+----+------+



In [41]:
exploded_final_groups_df = final_groups_df.select("Group", explode("final_group").alias("processID"))
joined_df = df_with_groups.join(exploded_final_groups_df, "processID")

In [42]:
# Function to write groups to txt file
def write_groups_to_txt(grouped_df, output_path):
    with open(output_path, "w") as file:
        for row in grouped_df.collect():
            group_name = row["Group"]
            process_ids = row["processIDs"]
            formatted_rows = row["formatted_rows"]
            
            # Ensure process_ids are unique and sorted
            process_ids = sorted(set(process_ids))
            
            file.write(f"Group: {{{', '.join(map(str, process_ids))}}}\n")
            
            for process_id in process_ids:
                file.write(f"{process_id}:\n")
                
                # Find all formatted rows for the current process ID
                rows_for_process_id = [row for row in formatted_rows if row.endswith(f",{process_id}>")]
                
                if rows_for_process_id:
                    for formatted_row in rows_for_process_id:
                        file.write(f"{formatted_row}\n")
                else:
                    file.write("<No corresponding formatted rows found>\n")
                    
            file.write("\n")

In [43]:
formatted_df = joined_df.withColumn(
    "formatted_row",
    concat_ws("", lit("<"), col("FromServer"), lit(","), col("ToServer"),
              lit(","), col("time"), lit(","), col("action"), lit(","), col("processID"), lit(">"))
)
grouped_df = formatted_df.groupBy("Group").agg(
    collect_list("processID").alias("processIDs"),
    collect_list("formatted_row").alias("formatted_rows")
)
# Output path
output_path = "./output/part1Observations.txt"

write_groups_to_txt(grouped_df, output_path)

In [44]:
spark.stop()