### **Summary of the Code**

This script processes Zeek log files, extracts metadata and data, and prepares them for use in a graph-oriented format by converting them into triples (subject-predicate-object). It then saves the results in various formats for further processing.

---

### **Key Steps in the Code**

1. **Import Libraries**:
   - Utilizes `os`, `gzip`, `pandas`, `re`, and `json` to handle file operations, data processing, and JSON manipulation.

2. **Parse Zeek Logs (`parse_zeek_log`)**:
   - **Reads the log file**: Opens and decompresses the `.gz` Zeek log file.
   - **Extracts metadata**: Parses lines starting with `#` to form a metadata dictionary (e.g., `#fields`, `#types`).
   - **Extracts headers**: Identifies the column names for the data rows using the `#fields` line.
   - **Extracts data**: Parses data rows (lines that donâ€™t start with `#`) into a pandas DataFrame.
   - **Cleans and processes data**:
     - Replaces placeholders (`(empty)`, `-`) with `None`.
     - Converts column data types based on metadata (e.g., timestamps, intervals, counts).

3. **Save Data (`save_to_parquet_and_json`)**:
   - **Saves the DataFrame**: Writes the parsed data to a Parquet file for efficient storage.
   - **Saves metadata**: Exports metadata to a JSON file.

4. **Generate Subject-Predicate-Object (SPO) Triples**:
   - Extracts key columns (`id.orig_h`, `id.resp_h`, `proto`, `service`) to form triples:
     - `id.orig_h` as `s` (subject).
     - `proto:service` as `p` (predicate).
     - `id.resp_h` as `o` (object).
   - Renames columns (`s`, `p`, `o`) and filters for relevant fields.

5. **Save Triples and Metadata**:
   - Saves the SPO triples as a CSV file (without headers).
   - Saves metadata as a JSON file.

6. **Prepare for Knowledge Graph Creation**:
   - Includes Jupyter magic commands for using `kg` CLI to add the data and metadata to a graph database.

---

### **Outputs**

1. **Parquet File**:
   - The processed Zeek log data is saved as a `.parquet` file for compact storage.
2. **JSON Metadata**:
   - Extracted metadata is saved in a structured `.json` file.
3. **SPO Triples**:
   - The triples (subject-predicate-object) are saved as `data.csv` for use in a graph database.

---

### **Jupyter Commands for Graph Operations**
Install:

```sh
pip install git+https://github.com/wasit7/kgsearch.git
```

The script includes magic commands for integrating the processed triples into a knowledge graph using a CLI tool (e.g., `kg`):
1. **Add Data**:
   ```bash
   !kg add -f data.csv
   ```
   Adds the SPO triples from `data.csv` to the graph database.

2. **Add Metadata**:
   ```bash
   !kg meta -f metadata.json
   ```
   Adds the metadata from `metadata.json` to describe the dataset.

3. **Start Knowledge Graph**:
   ```bash
   !kg start
   ```
   Initializes and starts the graph database.

---

### **Enhancements for Usability**
1. **Parameterize File Paths**:
   Allow dynamic input for file paths to process multiple logs.
2. **Error Handling**:
   Add error handling for missing files or parsing errors.
3. **Additional Metadata**:
   Enrich metadata with derived statistics (e.g., row counts, unique IPs).

---

This script is ready to parse Zeek logs, prepare graph-ready data, and integrate it into a graph database. Let me know if you'd like help extending it!

In [1]:
import os
import gzip
import pandas as pd
import re
import json
import numpy as np


def parse_zeek_log(file_path):
    """
    Parses a Zeek log file into a pandas DataFrame along with metadata.
    """
    with gzip.open(file_path, 'rt') as f:
        lines = f.readlines()
    
    # Extract metadata
    metadata = {}
    for line in lines:
        if line.startswith("#"):
            parts = line[1:].split("\t", 1)
            if len(parts) == 2:
                metadata[parts[0].strip()] = parts[1].strip()
    
    # Extract headers
    headers_line = next(line for line in lines if line.startswith("#fields"))
    headers = headers_line.split("\t")[1:]  # Extract headers after #fields
    headers = [header.strip() for header in headers]

    # Extract data rows
    data_lines = [line.strip().split("\t") for line in lines if not line.startswith("#")]
    
    # Create DataFrame
    df = pd.DataFrame(data_lines, columns=headers)

    # Replace Zeek placeholders for missing data
    df.replace({'(empty)': None, '-': None}, inplace=True)

    # Convert specific types based on metadata if available
    if "#types" in metadata:
        types = metadata["#types"].split("\t")
        for col, dtype in zip(headers, types):
            if dtype == "time":
                df[col] = pd.to_datetime(df[col], unit="s")
            elif dtype in {"interval", "count"}:
                df[col] = pd.to_numeric(df[col], errors="coerce")
            elif dtype == "bool":
                df[col] = df[col] == "T"

    return df, metadata

def save_to_parquet_and_json(df, metadata, output_dir, base_name):
    """
    Saves the DataFrame to Parquet and metadata to JSON.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Save DataFrame as Parquet
    parquet_file = os.path.join(output_dir, f"{base_name}.parquet")
    df.to_parquet(parquet_file, index=False)
    print(f"Data saved to {parquet_file}")

    # Save metadata as JSON
    metadata_file = os.path.join(output_dir, f"{base_name}_metadata.json")
    with open(metadata_file, "w") as f:
        json.dump(metadata, f, indent=4)
    print(f"Metadata saved to {metadata_file}")

# Example usage
file_path = "../zeek/logs/2024-12-16/conn.10:00:00-11:00:00.log.gz"
output_dir = "./_landing"

# Parse the log file
df, metadata = parse_zeek_log(file_path)
# Save results to Parquet and JSON
save_to_parquet_and_json(df, metadata, output_dir, "conn_log")


Data saved to ./_landing/conn_log.parquet
Metadata saved to ./_landing/conn_log_metadata.json


In [2]:
df_spo = df[['id.orig_h','id.resp_h','proto','service']].copy()
df_spo['edge'] = (df_spo['proto'] + ':' + df_spo['service']).fillna('')
df_spo['edge']= df['proto']+':'+ df['service']
df_spo=df_spo.rename(columns={'id.orig_h': 's', 'edge': 'p', 'id.resp_h':'o'})
df_spo=df_spo[['s','p','o']]
df_spo

Unnamed: 0,s,p,o
0,192.168.127.114,udp:dns,192.168.127.134
1,192.168.127.114,udp:dns,192.168.127.134
2,192.168.127.114,udp:dns,192.168.127.134
3,192.168.127.114,udp:quic,172.217.25.202
4,192.168.127.114,udp:dns,192.168.127.134
...,...,...,...
2791,fe80::10c1:6dde:581e:e273,udp:dns,ff02::fb
2792,192.168.28.229,udp:quic,216.58.199.227
2793,192.168.28.229,udp:quic,216.58.200.3
2794,192.168.28.11,,255.255.255.255


In [3]:
csv_file= 'data.csv'
# df_spo.to_csv(csv_file, header=False, index=False)
metadata_file='metadata.json'

df_spo.to_csv(csv_file, header=False, index=False)
with open(metadata_file, 'w', encoding='utf-8') as f:
    json.dump(metadata, f, indent=2)

In [None]:
!kg add -f data.csv
!kg meta -f metadata.json
!kg start

ðŸŽ‰ Starting the app.
 * Serving Flask app 'kgsearch.app.app' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off
 * Running on http://127.0.0.1:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [17/Dec/2024 14:32:30] "GET /search/1/1/1/192.168.127.114 HTTP/1.1" 200 -
127.0.0.1 - - [17/Dec/2024 14:32:33] "GET /search/1/1/1/192.168.127.114; HTTP/1.1" 200 -
127.0.0.1 - - [17/Dec/2024 14:32:35] "GET /search/1/1/1/192.168.127.114; HTTP/1.1" 200 -
