In [1]:
import sys
import os
import glob

# ====================================================
# SETUP: AUTO-DETECT SPARK & PROJECT PATH
# ====================================================

# 1. Detect Spark on the VM (Fix for "ModuleNotFoundError")
candidate_paths = [
    os.environ.get("SPARK_HOME"), 
    "/usr/local/spark",
    "/usr/lib/spark",
    "/home/talentum/spark",
    "/opt/spark"
]

SPARK_HOME = None
for path in candidate_paths:
    if path and os.path.exists(path) and os.path.exists(os.path.join(path, "python")):
        SPARK_HOME = path
        break

if SPARK_HOME:
    sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
    py4j_files = glob.glob(os.path.join(SPARK_HOME, "python", "lib", "py4j-*-src.zip"))
    if py4j_files:
        sys.path.insert(0, py4j_files[0])
else:
    print("Warning: Spark folder not found. Relying on default environment.")

# 2. Add Project Root to Path (Fix for "config" import)
PROJECT_ROOT = "/home/talentum/Distributed-log-analyzer"
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

# 3. Import Dependencies
try:
    from pyspark.sql.functions import col, to_timestamp, window, count, concat, lit, when
    from spark_jobs.common.spark_utils import get_spark_session, load_config
    print("SUCCESS: Environment is ready.")
except ImportError as e:
    print(f"FAILURE: {e}")

SUCCESS: Environment is ready.


In [2]:
# --- CONFIGURATION ---
# Change this to "HDFS", "Android", etc. to process other logs
SOURCE_NAME = "Hadoop" 

# 1. Initialize Spark
conf = load_config()
spark = get_spark_session(f"Notebook_Trends_{SOURCE_NAME}")

# 2. Define Paths
silver_path = f"{conf['storage']['refined']}/{SOURCE_NAME}"
gold_path = f"{conf['storage']['curated']}/{SOURCE_NAME}_trends"

print(f"--- Processing Trends for: {SOURCE_NAME} ---")
print(f"Reading from: {silver_path}")

# 3. Read Silver Data
try:
    df = spark.read.parquet(silver_path)
    print(f"Loaded {df.count()} records.")
except Exception as e:
    print(f"Error: Could not read data. Did you run Ingestion first? {e}")
    # Stop execution if data is missing
    raise e

# 4. Standardize Timestamp
# We need to normalize different log formats into one standard "Timestamp" object
print("Standardizing Timestamps...")

if SOURCE_NAME == "Hadoop":
    # Format: 2015-10-18 18:01:47,978
    df_clean = df.withColumn("fixed_timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss,SSS"))

elif SOURCE_NAME == "HDFS":
    # HDFS has separate date/time columns: 081109 203615
    df_clean = df.withColumn("ts_str", concat(col("date"), lit(" "), col("time"))) \
                 .withColumn("fixed_timestamp", to_timestamp(col("ts_str"), "yyMMdd HHmmss"))

elif SOURCE_NAME == "Android":
    # Android has no year: 03-17 16:13:38.811 -> Assume 2023
    df_clean = df.withColumn("fixed_timestamp", to_timestamp(concat(lit("2023-"), col("timestamp")), "yyyy-MM-dd HH:mm:ss.SSS"))

elif SOURCE_NAME in ["Linux", "Mac", "OpenSSH"]:
    # Syslog format: Jun 14 15:16:01 -> Assume 2023
    df_clean = df.withColumn("fixed_timestamp", to_timestamp(concat(lit("2023 "), col("timestamp_str")), "yyyy MMM dd HH:mm:ss"))

else:
    # Fallback for others (Apache, Zookeeper, etc.)
    # Attempt to cast existing timestamp column
    df_clean = df.withColumn("fixed_timestamp", col("timestamp").cast("timestamp"))

# Filter out any rows where timestamp failed to parse
df_clean = df_clean.filter(col("fixed_timestamp").isNotNull())

# 5. AGGREGATION (The Gold Layer Logic)
# Group by 1-hour windows and count errors
trends_df = df_clean.groupBy(window(col("fixed_timestamp"), "1 hour").alias("time_window")) \
    .agg(
        count("*").alias("total_logs"),
        count(when(col("level").isin("ERROR", "FATAL", "CRITICAL"), True)).alias("error_count"),
        count(when(col("level") == "WARN", True)).alias("warn_count")
    ) \
    .select(
        col("time_window.start").alias("window_start"),
        col("total_logs"),
        col("error_count"),
        col("warn_count")
    ) \
    .orderBy("window_start")

# 6. Preview & Save
print("--- Hourly Trends Preview ---")
trends_df.show(10, truncate=False)

print(f"Writing Gold data to {gold_path}...")
trends_df.write.mode("overwrite").parquet(gold_path)
print("Success!")

--- Processing Trends for: Hadoop ---
Reading from: /user/talentum/project_logs/refined/Hadoop
Loaded 2000 records.
Standardizing Timestamps...
--- Hourly Trends Preview ---
+-------------------+----------+-----------+----------+
|window_start       |total_logs|error_count|warn_count|
+-------------------+----------+-----------+----------+
|2015-10-18 17:30:00|2000      |152        |808       |
+-------------------+----------+-----------+----------+

Writing Gold data to /user/talentum/project_logs/curated/Hadoop_trends...
Success!
