# Loading

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, count, regexp_extract


# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Read Multiple CSV Files") \
    .getOrCreate()


In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/anomaly_label.csv")
df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/Event_occurrence_matrix.csv")
df3 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/HDFS_log_templates.csv")
df4 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/Event_traces.csv")
hdfs_logs = spark.read.format("csv").option("header", "false").load("dbfs:/FileStore/shared_uploads/nikhilsarma.gudur@sjsu.edu/HDFS.log")

# Additional files uploaded
# dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/HDFS.npz
# dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/README.md

In [0]:


# Show the first few rows of each DataFrame to confirm they have been loaded correctly
print("Anomaly Label DataFrame:")
df1.show(truncate=False)


Anomaly Label DataFrame:
+------------------------+-------+
|BlockId                 |Label  |
+------------------------+-------+
|blk_-1608999687919862906|Normal |
|blk_7503483334202473044 |Normal |
|blk_-3544583377289625738|Anomaly|
|blk_-9073992586687739851|Normal |
|blk_7854771516489510256 |Normal |
|blk_1717858812220360316 |Normal |
|blk_-2519617320378473615|Normal |
|blk_7063315473424667801 |Normal |
|blk_8586544123689943463 |Normal |
|blk_2765344736980045501 |Normal |
|blk_-2900490557492272760|Normal |
|blk_-50273257731426871  |Normal |
|blk_4394112519745907149 |Normal |
|blk_3640100967125688321 |Normal |
|blk_-40115644493265216  |Normal |
|blk_-8531310335568756456|Anomaly|
|blk_-3409923645141256069|Normal |
|blk_3974948352784823938 |Normal |
|blk_5647760196018207394 |Normal |
|blk_-202775138379690649 |Normal |
+------------------------+-------+
only showing top 20 rows



In [0]:
df1.select("BlockId").count()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-4330738738621508>:1[0m
[0;32m----> 1[0m [43mdf1[49m[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mBlockId[39m[38;5;124m"[39m)[38;5;241m.[39mcount()

[0;31mNameError[0m: name 'df1' is not defined

In [0]:
unique_blockid_count_df1 = df1.select("BlockId").distinct().count()
print(unique_blockid_count_df1)

575061


In [0]:
df2.show()


+--------------------+-------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|             BlockId|  Label|Type| E1| E2| E3| E4| E5| E6| E7| E8| E9|E10|E11|E12|E13|E14|E15|E16|E17|E18|E19|E20|E21|E22|E23|E24|E25|E26|E27|E28|E29|
+--------------------+-------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|blk_-160899968791...|Success|null|  0|  0|203|  0| 10|  7|  0|  0|  3|  0|  3|  0|  0|  0|  0|  4|  0|  4|  0|  0| 10|  1| 10|  0|  4| 10|  0|  0|  0|
|blk_7503483334202...|Success|null|  0|  2|  1|  0|  3|  0|  0|  0|  3|  0|  3|  0|  0|  0|  0|  0|  0|  0|  0|  0|  3|  1|  3|  0|  0|  3|  0|  0|  0|
|blk_-354458337728...|   Fail|  21|  0|  0|203|  0|  3|  0|  0|  0|  3|  0|  3|  0|  0|  0|  0|  0|  0|  0|  0|  1|  3|  1|  3|  0|  0|  3|  0|  0|  0|
|blk_-907399258668...|Success|null|  0|  3|  0|  0|  3|  0|  0|  0|  3|  0|  3|  0|  0| 

In [0]:
df2.select("BlockId").count()

Out[11]: 575061

In [0]:
df2.select("BlockId").first()["BlockId"]


Out[12]: 'blk_-1608999687919862906'

In [0]:
df2.collect()[5]

Out[13]: Row(BlockId='blk_1717858812220360316', Label='Success', Type=None, E1='0', E2='3', E3='1', E4='15', E5='3', E6='0', E7='0', E8='0', E9='3', E10='0', E11='3', E12='0', E13='0', E14='0', E15='0', E16='0', E17='0', E18='0', E19='0', E20='0', E21='3', E22='1', E23='3', E24='0', E25='0', E26='3', E27='0', E28='0', E29='0')

In [0]:
df1 = df1.withColumnRenamed("Label", "Log_Type")
df1.columns

Out[14]: ['BlockId', 'Log_Type']

In [0]:

merged = df1.join(df2, df1.BlockId == df2.BlockId, "inner")
merged.show()


+--------------------+--------+--------------------+-------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|             BlockId|Log_Type|             BlockId|  Label|Type| E1| E2| E3| E4| E5| E6| E7| E8| E9|E10|E11|E12|E13|E14|E15|E16|E17|E18|E19|E20|E21|E22|E23|E24|E25|E26|E27|E28|E29|
+--------------------+--------+--------------------+-------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|blk_-100004553717...|  Normal|blk_-100004553717...|Success|null|  0|  0|  0|  0|  3|  0|  0|  0|  3|  0|  3|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  3|  0|  0|  0|
|blk_-100021874837...|  Normal|blk_-100021874837...|Success|null|  0|  0|  0|  0|  3|  0|  0|  0|  3|  0|  3|  0|  0|  0|  0|  0|  0|  0|  0|  0|  3|  1|  3|  0|  0|  3|  0|  0|  0|
|blk_-100028489120...|  Normal|blk_-100028489120...|Success|null|  0|  1|  2|  1|  3|  0| 

In [0]:
merged.dtypes

Out[18]: [('BlockId', 'string'),
 ('Log_Type', 'string'),
 ('BlockId', 'string'),
 ('Label', 'string'),
 ('Type', 'string'),
 ('E1', 'string'),
 ('E2', 'string'),
 ('E3', 'string'),
 ('E4', 'string'),
 ('E5', 'string'),
 ('E6', 'string'),
 ('E7', 'string'),
 ('E8', 'string'),
 ('E9', 'string'),
 ('E10', 'string'),
 ('E11', 'string'),
 ('E12', 'string'),
 ('E13', 'string'),
 ('E14', 'string'),
 ('E15', 'string'),
 ('E16', 'string'),
 ('E17', 'string'),
 ('E18', 'string'),
 ('E19', 'string'),
 ('E20', 'string'),
 ('E21', 'string'),
 ('E22', 'string'),
 ('E23', 'string'),
 ('E24', 'string'),
 ('E25', 'string'),
 ('E26', 'string'),
 ('E27', 'string'),
 ('E28', 'string'),
 ('E29', 'string')]

In [0]:
# convert the event types datatype to int
from pyspark.sql.functions import col

columns_to_convert = [
    'E1', 'E2', 'E3', 'E4', 'E5', 'E6', 'E7', 'E8', 'E9', 'E10',
    'E11', 'E12', 'E13', 'E14', 'E15', 'E16', 'E17', 'E18', 'E19', 'E20',
    'E21', 'E22', 'E23', 'E24', 'E25', 'E26', 'E27', 'E28', 'E29'
]



for column in columns_to_convert:
    merged = merged.withColumn(column, col(column).cast('int'))

# Show the resulting DataFrame
merged.printSchema()
merged.show()

root
 |-- BlockId: string (nullable = true)
 |-- Log_Type: string (nullable = true)
 |-- BlockId: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- E1: integer (nullable = true)
 |-- E2: integer (nullable = true)
 |-- E3: integer (nullable = true)
 |-- E4: integer (nullable = true)
 |-- E5: integer (nullable = true)
 |-- E6: integer (nullable = true)
 |-- E7: integer (nullable = true)
 |-- E8: integer (nullable = true)
 |-- E9: integer (nullable = true)
 |-- E10: integer (nullable = true)
 |-- E11: integer (nullable = true)
 |-- E12: integer (nullable = true)
 |-- E13: integer (nullable = true)
 |-- E14: integer (nullable = true)
 |-- E15: integer (nullable = true)
 |-- E16: integer (nullable = true)
 |-- E17: integer (nullable = true)
 |-- E18: integer (nullable = true)
 |-- E19: integer (nullable = true)
 |-- E20: integer (nullable = true)
 |-- E21: integer (nullable = true)
 |-- E22: integer (nullable = true)
 |-- E23: integer (nu

In [0]:
merged.columns

Out[26]: ['BlockId',
 'Label',
 'BlockId',
 'Label',
 'Type',
 'E1',
 'E2',
 'E3',
 'E4',
 'E5',
 'E6',
 'E7',
 'E8',
 'E9',
 'E10',
 'E11',
 'E12',
 'E13',
 'E14',
 'E15',
 'E16',
 'E17',
 'E18',
 'E19',
 'E20',
 'E21',
 'E22',
 'E23',
 'E24',
 'E25',
 'E26',
 'E27',
 'E28',
 'E29']

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# numeric_cols = [col for col, dtype in merged.dtypes if dtype in ['int', 'double']]

# assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
# merged_vector = assembler.transform(merged).select("features")

# # Calculate the correlation matrix
# correlation_matrix = Correlation.corr(merged_vector, "features").head()[0]

# # Convert to dense matrix and display
# correlation_matrix = correlation_matrix.toArray()
# print(correlation_matrix)

numeric_cols = [col for col, dtype in merged.dtypes if dtype in ['int', 'double']]

# Check if numeric_cols is empty
if not numeric_cols:
    raise ValueError("No numeric columns found in the DataFrame.")

print(f"Numeric columns found: {numeric_cols}")

# Assemble the numeric columns into a feature vector
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
merged_vector = assembler.transform(merged).select("features")

# Calculate the correlation matrix
correlation_matrix = Correlation.corr(merged_vector, "features").head()[0]

# Convert to dense matrix and display
correlation_matrix = correlation_matrix.toArray()
print(correlation_matrix)


Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-4330738738621498>", line 20, in <module>
    raise ValueError("No numeric columns found in the DataFrame.")
ValueError: No numeric columns found in the DataFrame.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 1997, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 1112, in structured_traceback
    return FormattedTB.structured_traceback(
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/ultratb.py", line 1006, in structured_traceback
    return VerboseTB.structured_traceback(
  File "/databricks/pyth



In [0]:
anomaly_fail_df = merged.filter((merged.Log_Type == 'Anomaly')).distinct().count()
anomaly_fail_df

Out[18]: 16838

In [0]:
unique_blockid_count_df2 = df1.select("BlockId").distinct().count()
print(unique_blockid_count_df2)

In [0]:
df4.columns

Out[14]: ['BlockId', 'Label', 'Type', 'Features', 'TimeInterval', 'Latency']

In [0]:
print("HDFS Log Templates DataFrame:")
df3.show(truncate= False)


HDFS Log Templates DataFrame:
+-------+--------------------------------------------------------------------------------+
|EventId|EventTemplate                                                                   |
+-------+--------------------------------------------------------------------------------+
|E1     |[*]Adding an already existing block[*]                                          |
|E2     |[*]Verification succeeded for[*]                                                |
|E3     |[*]Served block[*]to[*]                                                         |
|E4     |[*]Got exception while serving[*]to[*]                                          |
|E5     |[*]Receiving block[*]src:[*]dest:[*]                                            |
|E6     |[*]Received block[*]src:[*]dest:[*]of size[*]                                   |
|E7     |[*]writeBlock[*]received exception[*]                                           |
|E8     |[*]PacketResponder[*]for block[*]Interrupted[*]    

In [0]:
print("Event Traces DataFrame:")
df4.show()

Event Traces DataFrame:
+--------------------+-------+----+--------------------+--------------------+-------+
|             BlockId|  Label|Type|            Features|        TimeInterval|Latency|
+--------------------+-------+----+--------------------+--------------------+-------+
|blk_-160899968791...|Success|null|[E5,E22,E5,E5,E11...|[0.0, 1.0, 0.0, 0...|   3802|
|blk_7503483334202...|Success|null|[E5,E5,E22,E5,E11...|[0.0, 0.0, 1.0, 0...|   3802|
|blk_-354458337728...|   Fail|  21|[E5,E22,E5,E5,E11...|[0.0, 1.0, 0.0, 0...|   3797|
|blk_-907399258668...|Success|null|[E5,E22,E5,E5,E11...|[0.0, 1.0, 0.0, 0...|  50448|
|blk_7854771516489...|Success|null|[E5,E5,E22,E5,E11...|[0.0, 0.0, 1.0, 4...|  50583|
|blk_1717858812220...|Success|null|[E5,E5,E22,E5,E11...|[0.0, 0.0, 11.0, ...|  50458|
|blk_-251961732037...|Success|null|[E5,E22,E5,E5,E11...|[0.0, 1.0, 9.0, 4...|  50523|
|blk_7063315473424...|Success|null|[E5,E5,E5,E22,E11...|[1.0, 0.0, 0.0, 5...|  50818|
|blk_8586544123689...|Success|

In [0]:
df4.filter(col("blockid") == 'blk_-1608999687919862906').show( truncate=False)


+------------------------+-------+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------

In [0]:
unique_blockid_count_df4 = df1.select("BlockId").distinct().count()
print(unique_blockid_count_df4)

In [0]:
hdfs_logs.show()

+--------------------+
|                 _c0|
+--------------------+
|081109 203518 143...|
|081109 203518 35 ...|
|081109 203519 143...|
|081109 203519 145...|
|081109 203519 145...|
|081109 203519 145...|
|081109 203519 145...|
|081109 203519 145...|
|081109 203519 147...|
|081109 203519 147...|
|081109 203519 29 ...|
|081109 203519 30 ...|
|081109 203519 31 ...|
|081109 203520 142...|
|081109 203520 145...|
|081109 203520 26 ...|
|081109 203521 143...|
|081109 203521 143...|
|081109 203521 144...|
|081109 203521 145...|
+--------------------+
only showing top 20 rows



In [0]:
logs = hdfs_logs.rdd.map(lambda row: row[0]).take(10)
print(logs)

['081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010', '081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906', '081109 203519 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.10.6:40524 dest: /10.250.10.6:50010', '081109 203519 145 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.14.224:42420 dest: /10.250.14.224:50010', '081109 203519 145 INFO dfs.DataNode$PacketResponder: PacketResponder 1 for block blk_-1608999687919862906 terminating', '081109 203519 145 INFO dfs.DataNode$PacketResponder: PacketResponder 2 for block blk_-1608999687919862906 terminating', '081109 203519 145 INFO dfs.DataNode$PacketResponder: Received block blk_-1608999687919862906 of size 91178 from /10.250.10.6', '081109 203519 145 INFO dfs.DataNode$

# Extracting block id from logs 

In [0]:
regex_pattern = r'(blk_-?\d+)'

data_with_blockid = hdfs_logs.withColumn("blockid", regexp_extract(hdfs_logs['_c0'], regex_pattern, 1))
    
data_with_blockid.show(40, truncate=False)



+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|_c0                                                                                                                                                                |blockid                 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010                     |blk_-1608999687919862906|
|081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906         |blk_-1608999687919862906|
|081109 203519 143 INFO dfs.DataNode$DataXcei

In [0]:
from pyspark.sql.functions import col, collect_list, udf, array_contains
from pyspark.sql.types import StringType, ArrayType

In [0]:
demo_raw = data_with_blockid.filter((col("blockid") == 'blk_-1608999687919862906') | (col("blockid") == 'blk_-3544583377289625738'))
demo_raw.show(truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|_c0                                                                                                                                                             |blockid                 |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010                  |blk_-1608999687919862906|
|081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906      |blk_-1608999687919862906|
|081109 203519 143 INFO dfs.DataNode$DataXceiver: Receiving 

In [0]:
output_path = "dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/demo_raw.csv"  # Replace with your desired output path
data_with_blockid.write.csv(output_path, header=True, mode="overwrite")

In [0]:
demo = spark.read.format("csv").option("header", "true").load(output_path)
demo.show()

+--------------------+--------------------+
|                 _c0|             blockid|
+--------------------+--------------------+
|081109 203518 143...|blk_-160899968791...|
|081109 203518 35 ...|blk_-160899968791...|
|081109 203519 143...|blk_-160899968791...|
|081109 203519 145...|blk_-160899968791...|
|081109 203519 145...|blk_-160899968791...|
|081109 203519 145...|blk_-160899968791...|
|081109 203519 145...|blk_-160899968791...|
|081109 203519 145...|blk_-160899968791...|
|081109 203519 147...|blk_-160899968791...|
|081109 203519 147...|blk_-160899968791...|
|081109 203519 29 ...|blk_-160899968791...|
|081109 203519 30 ...|blk_-160899968791...|
|081109 203519 31 ...|blk_-160899968791...|
|081109 203521 143...|blk_-160899968791...|
|081109 203521 143...|blk_-160899968791...|
|081109 203521 145...|blk_-354458337728...|
|081109 203521 147...|blk_-160899968791...|
|081109 203521 147...|blk_-160899968791...|
|081109 203521 147...|blk_-160899968791...|
|081109 203521 19 ...|blk_-16089

In [0]:
df3 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/HDFS_log_templates.csv")

In [0]:
def finding_events(logs_df):

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, collect_list, udf, flatten
    from pyspark.sql.types import StringType, ArrayType
    import re

    # Initialize Spark session
    spark = SparkSession.builder.appName("BlockID Events").getOrCreate()

    # events_df = spark.createDataFrame(event_data, event_columns)
    events_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/HDFS_log_templates.csv")

    # Broadcast the event templates
    event_templates = {row.EventId: row.EventTemplate for row in events_df.collect()}
    event_templates_broadcast = spark.sparkContext.broadcast(event_templates)

    # Define UDF to extract events
    def extract_events(log_message):
        matched_events = []
        templates = event_templates_broadcast.value
        for event_id, template in templates.items():
            pattern = template.replace("[*]", ".*")
            if re.search(pattern, log_message):
                matched_events.append(event_id)
        return matched_events

    extract_events_udf = udf(extract_events, ArrayType(StringType()))

    # Apply the UDF to extract events from log messages
    logs_with_events_df = logs_df.withColumn("events", extract_events_udf(col("_c0")))

    # Show intermediate DataFrame
    logs_with_events_df.show(truncate=False)

    # Group by blockid and aggregate the events
    aggregated_df = logs_with_events_df.groupBy("blockid").agg(collect_list("events").alias("event_lists"))

    # Flatten the nested lists
    flattened_df = aggregated_df.withColumn("event_list", flatten(col("event_lists"))).drop("event_lists")

    # Show final DataFrame
    flattened_df.show(truncate=False)
    return flattened_df
# # Stop the Spark session
# spark.stop()


In [0]:
demo = finding_events(demo_raw)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+------+
|_c0                                                                                                                                                             |blockid                 |events|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+------+
|081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010                  |blk_-1608999687919862906|[E5]  |
|081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906      |blk_-1608999687919862906|[]    |
|081109 203519 143 INFO d

In [0]:
demo.show()

+--------------------+--------------------+
|             blockid|          event_list|
+--------------------+--------------------+
|blk_-160899968791...|[E5, E5, E5, E11,...|
|blk_-354458337728...|[E5, E5, E5, E11,...|
+--------------------+--------------------+



In [0]:
def  convert_to_matrix (df):
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, collect_list, flatten, udf, coalesce, lit
    from pyspark.sql.types import StringType, IntegerType, MapType

    # Initialize Spark session
    spark = SparkSession.builder.appName("BlockID Events Transformation").getOrCreate()
    # Define UDF to count events
    def count_events(event_list):
        event_counts = {}
        for event in event_list:
            if event in event_counts:
                event_counts[event] += 1
            else:
                event_counts[event] = 1
        return event_counts

    count_events_udf = udf(count_events, MapType(StringType(), IntegerType()))

    # Apply the UDF to add a new column with event counts
    df_with_counts = df.withColumn("event_counts", count_events_udf(col("event_list")))
    df_with_counts.show(truncate=False)

    # Get a list of all possible events (you can extract this from the events DataFrame)
    all_events = ["E1", "E2", "E3", "E4", "E5", "E6", "E7", "E8", "E9", "E10", "E11", "E12", "E13", "E14", "E15", "E16", "E17", "E18", "E19", "E20", "E21", "E22", "E23", "E24", "E25", "E26", "E27", "E28", "E29"]

    # Create columns for each event with count values
    for event in all_events:
        df_with_counts = df_with_counts.withColumn(event, coalesce(col("event_counts").getItem(event), lit(0)))

    # Drop the event_counts column
    final_df = df_with_counts.drop("event_counts")

    # Show the final DataFrame
    final_df.show(truncate=False)
    return final_df

In [0]:
def combine_events()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, udf, flatten
from pyspark.sql.types import StringType, ArrayType
import re

# Initialize Spark session
spark = SparkSession.builder.appName("BlockID Events").getOrCreate()
event_columns = ["EventId", "EventTemplate"]

# logs_df = spark.createDataFrame(log_data, log_columns)
logs_df = data_with_blockid
# events_df = spark.createDataFrame(event_data, event_columns)
events_df = df3

# Show DataFrames
logs_df.show(truncate=False)
events_df.show(truncate=False)

# Broadcast the event templates
event_templates = {row.EventId: row.EventTemplate for row in events_df.collect()}
event_templates_broadcast = spark.sparkContext.broadcast(event_templates)

# Define UDF to extract events
def extract_events(log_message):
    matched_events = []
    templates = event_templates_broadcast.value
    for event_id, template in templates.items():
        pattern = template.replace("[*]", ".*")
        if re.search(pattern, log_message):
            matched_events.append(event_id)
    return matched_events

extract_events_udf = udf(extract_events, ArrayType(StringType()))

# Apply the UDF to extract events from log messages
logs_with_events_df = logs_df.withColumn("events", extract_events_udf(col("_c0")))

# Show intermediate DataFrame
logs_with_events_df.show(truncate=False)

# Group by blockid and aggregate the events
aggregated_df = logs_with_events_df.groupBy("blockid").agg(collect_list("events").alias("event_lists"))

# Flatten the nested lists
flattened_df = aggregated_df.withColumn("event_list", flatten(col("event_lists"))).drop("event_lists")

# Show final DataFrame
flattened_df.show(truncate=False)

# # Stop the Spark session
# spark.stop()


In [0]:
# extract_events_udf = udf(df3, ArrayType(StringType()))
# logs_with_events_df = extract_events_udf.withColumn("events", extract_events_udf(col("log_message")))
aggregated_df = data_with_blockid.groupBy("blockid").agg(collect_list("_c0").alias("event_lists"))

# Exctracting Events for each block id 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, udf, flatten
from pyspark.sql.types import StringType, ArrayType
import re

# Initialize Spark session
spark = SparkSession.builder.appName("BlockID Events").getOrCreate()
event_columns = ["EventId", "EventTemplate"]

# logs_df = spark.createDataFrame(log_data, log_columns)
logs_df = data_with_blockid
# events_df = spark.createDataFrame(event_data, event_columns)
events_df = df3

# Show DataFrames
logs_df.show(truncate=False)
events_df.show(truncate=False)

# Broadcast the event templates
event_templates = {row.EventId: row.EventTemplate for row in events_df.collect()}
event_templates_broadcast = spark.sparkContext.broadcast(event_templates)

# Define UDF to extract events
def extract_events(log_message):
    matched_events = []
    templates = event_templates_broadcast.value
    for event_id, template in templates.items():
        pattern = template.replace("[*]", ".*")
        if re.search(pattern, log_message):
            matched_events.append(event_id)
    return matched_events

extract_events_udf = udf(extract_events, ArrayType(StringType()))

# Apply the UDF to extract events from log messages
logs_with_events_df = logs_df.withColumn("events", extract_events_udf(col("_c0")))

# Show intermediate DataFrame
logs_with_events_df.show(truncate=False)

# Group by blockid and aggregate the events
aggregated_df = logs_with_events_df.groupBy("blockid").agg(collect_list("events").alias("event_lists"))

# Flatten the nested lists
flattened_df = aggregated_df.withColumn("event_list", flatten(col("event_lists"))).drop("event_lists")

# Show final DataFrame
flattened_df.show(truncate=False)

# # Stop the Spark session
# spark.stop()


+----------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|_c0                                                                                                                                                             |blockid                 |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010                  |blk_-1608999687919862906|
|081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906      |blk_-1608999687919862906|
|081109 203519 143 INFO dfs.DataNode$DataXceiver: Receiving 

In [0]:
# Save the DataFrame as a CSV file
output_path = "dbfs:/FileStore/shared_uploads/tanmay.k.singh@sjsu.edu/block_event.csv"
flattened_df.write.csv(output_path, header=True, mode="overwrite")

# Converting it into matrix for model 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, flatten, udf, coalesce, lit
from pyspark.sql.types import StringType, IntegerType, MapType

# Initialize Spark session
spark = SparkSession.builder.appName("BlockID Events Transformation").getOrCreate()

df = flattened_df
df.show(truncate=False)

# Define UDF to count events
def count_events(event_list):
    event_counts = {}
    for event in event_list:
        if event in event_counts:
            event_counts[event] += 1
        else:
            event_counts[event] = 1
    return event_counts

count_events_udf = udf(count_events, MapType(StringType(), IntegerType()))

# Apply the UDF to add a new column with event counts
df_with_counts = df.withColumn("event_counts", count_events_udf(col("event_list")))


# Get a list of all possible events (you can extract this from the events DataFrame)
all_events = ["E1", "E2", "E3", "E4", "E5", "E6", "E7", "E8", "E9", "E10", "E11", "E12", "E13", "E14", "E15", "E16", "E17", "E18", "E19", "E20", "E21", "E22", "E23", "E24", "E25", "E26", "E27", "E28", "E29"]

# Create columns for each event with count values
for event in all_events:
    df_with_counts = df_with_counts.withColumn(event, coalesce(col("event_counts").getItem(event), lit(0)))

# Drop the event_counts column
final_df = df_with_counts.drop("event_counts")

# Show the final DataFrame
final_df.show(truncate=False)

# Save the final DataFrame as a CSV file
# output_path = "/mnt/data/blockid_events_transformed"
# final_df.write.csv(output_path, header=True, mode="overwrite")

# # Stop the Spark session
# spark.stop()


+------------------------+------------------------------------------------------------------------------------------------------+
|blockid                 |event_list                                                                                            |
+------------------------+------------------------------------------------------------------------------------------------------+
|blk_-100004553717737248 |[E5, E5, E5, E11, E9, E11, E9, E11, E9]                                                               |
|blk_-1000218748372878626|[E5, E5, E5, E11, E9, E11, E9, E11, E9, E21, E21, E21]                                                |
|blk_-1000284891202066880|[E5, E5, E5, E11, E9, E11, E9, E11, E9, E2, E3, E3, E4, E21, E21, E21]                                |
|blk_-1000296688806487470|[E5, E5, E5, E11, E9, E11, E9, E11, E9, E21, E21, E21]                                                |
|blk_-1000583943604118788|[E5, E5, E5, E11, E9, E11, E9, E11, E9]                         

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-4054470229354866>:25[0m
[1;32m     23[0m [38;5;66;03m# Apply the UDF to add a new column with event counts[39;00m
[1;32m     24[0m df_with_counts [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mevent_counts[39m[38;5;124m"[39m, count_events_udf(col([38;5;124m"[39m[38;5;124mevent_list[39m[38;5;124m"[39m)))
[0;32m---> 25[0m df_with_counts[38;5;241m.[39mshow(truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)
[1;32m     27[0m [38;5;66;03m# Get a list of all possible events (you can extract this from the events DataFrame)[39;00m
[1;32m     28[0m all_events [38;5;241m=[39m [[38;5;124m"[39m[38;5;124mE1[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mE2[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mE3[39m[38;5;124m"[39m, [38;5;124m

In [0]:
print(data_with_blockid.count())


11175629


In [0]:
from pyspark.sql.functions import col
data_with_blockid.filter(col("blockid") == 'blk_-1608999687919862906').show( truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|_c0                                                                                                                                                                |blockid                 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------+
|081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010                     |blk_-1608999687919862906|
|081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906         |blk_-1608999687919862906|
|081109 203519 143 INFO dfs.DataNode$DataXcei

In [0]:
print(data_with_blockid.groupby("blockid").count().show())

+--------------------+-----+
|             blockid|count|
+--------------------+-----+
|blk_2329219899967...|   38|
|blk_7573381576932...|   38|
|blk_-514328661767...|   38|
|blk_-315541563754...|   25|
|blk_-626551751220...|   25|
|blk_-407011522237...|   25|
|blk_-189452694820...|   25|
|blk_-243754728424...|   27|
|blk_-562137042848...|   25|
|blk_8243751147898...|   25|
|blk_-398207552508...|   25|
|blk_-299506639704...|   24|
|blk_2176961593541...|   25|
|blk_4229018881350...|   25|
|blk_3363721847495...|   25|
|blk_7661137288132...|   25|
|blk_-772145702470...|   25|
|blk_8720307910510...|   25|
|blk_8245176888463...|   25|
|blk_-144962563959...|   25|
+--------------------+-----+
only showing top 20 rows

None


In [0]:
data_with_blockid.select("blockid").distinct().count()

Out[25]: 575061

In [0]:
hdfs_logs.columns

In [0]:
spark.stop()

In [0]:
df4.show()

+--------------------+-------+----+--------------------+--------------------+-------+
|             BlockId|  Label|Type|            Features|        TimeInterval|Latency|
+--------------------+-------+----+--------------------+--------------------+-------+
|blk_-160899968791...|Success|null|[E5,E22,E5,E5,E11...|[0.0, 1.0, 0.0, 0...|   3802|
|blk_7503483334202...|Success|null|[E5,E5,E22,E5,E11...|[0.0, 0.0, 1.0, 0...|   3802|
|blk_-354458337728...|   Fail|  21|[E5,E22,E5,E5,E11...|[0.0, 1.0, 0.0, 0...|   3797|
|blk_-907399258668...|Success|null|[E5,E22,E5,E5,E11...|[0.0, 1.0, 0.0, 0...|  50448|
|blk_7854771516489...|Success|null|[E5,E5,E22,E5,E11...|[0.0, 0.0, 1.0, 4...|  50583|
|blk_1717858812220...|Success|null|[E5,E5,E22,E5,E11...|[0.0, 0.0, 11.0, ...|  50458|
|blk_-251961732037...|Success|null|[E5,E22,E5,E5,E11...|[0.0, 1.0, 9.0, 4...|  50523|
|blk_7063315473424...|Success|null|[E5,E5,E5,E22,E11...|[1.0, 0.0, 0.0, 5...|  50818|
|blk_8586544123689...|Success|null|[E5,E5,E5,E22,E11..

In [0]:

df4 = df4.withColumn("Features", split(col("Features"), ","))

exploded_df = df4.select("BlockId", "Label", "Type", explode(
    "Features").alias("Event"), "TimeInterval", "Latency")
exploded_df = exploded_df.withColumn(
    "Event", regexp_extract(col("Event"), r"E\d+", 0))

event_counts_df = exploded_df.groupBy(
    "BlockId", "Label", "Type", "Event", "TimeInterval", "Latency").agg(count("Event").alias("Count"))

pivoted_df = event_counts_df.groupBy("BlockId", "Label", "Type", "TimeInterval", "Latency").pivot(
    "Event").agg({"Count": "first"}).na.fill(0)

pivoted_df.show()

In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.linalg import DenseVector
import numpy as np
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, FloatType, StructType, StructField, IntegerType
import random

# Assemble feature columns into a single vector
assembler = VectorAssembler(inputCols=[col for col in df.columns if col != 'Label'], outputCol="features")
df_assembled = assembler.transform(df)

# Normalize the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

# Split the data into majority and minority classes
majority_df = df_scaled.filter(df_scaled.Label == 1)
minority_df = df_scaled.filter(df_scaled.Label == 0)

# Function to generate synthetic samples
def generate_samples(minority_class, n_samples):
    synthetic_samples = []
    minority_data = minority_class.collect()
    for _ in range(n_samples):
        sample1, sample2 = random.sample(minority_data, 2)
        weight = random.random()
        synthetic_sample = DenseVector([weight * x + (1 - weight) * y for x, y in zip(sample1['scaledFeatures'], sample2['scaledFeatures'])])
        synthetic_samples.append((0, synthetic_sample))
    return synthetic_samples

# Number of synthetic samples needed
n_majority = majority_df.count()
n_minority = minority_df.count()
n_synthetic_samples = n_majority - n_minority

# Generate synthetic samples
synthetic_samples = generate_samples(minority_df, n_synthetic_samples)

# Create a DataFrame for synthetic samples
schema = StructType([
    StructField("Label", IntegerType(), False),
    StructField("scaledFeatures", ArrayType(FloatType()), False)
])
synthetic_df = spark.createDataFrame(synthetic_samples, schema)

# Combine the original majority class with the minority class and synthetic samples
balanced_df = majority_df.union(minority_df).union(synthetic_df)

# Convert scaledFeatures back to individual columns
def vector_to_columns(scaled_features):
    return scaled_features.tolist()

vector_to_columns_udf = udf(vector_to_columns, ArrayType(FloatType()))

# Add individual columns to the DataFrame
balanced_df = balanced_df.withColumn("features_list", vector_to_columns_udf("scaledFeatures"))

# Extract feature column names
feature_columns = [col for col in df.columns if col != 'Label']
for i, col_name in enumerate(feature_columns):
    balanced_df = balanced_df.withColumn(col_name, col("features_list")[i])

# Drop unnecessary columns
final_df = balanced_df.drop("features").drop("scaledFeatures").drop("features_list")

# Separate target and features
final_df = final_df.select(["Label"] + feature_columns)

# Show the final DataFrame
final_df.show()

# Now `final_df` contains the balanced dataset with target variable 'Label' and features


In [0]:
spark.stop()