# Large-Scale Log Processing

* <b>pl.DataFrame</b> v <b>pl.LazyFrame</b> - complex, pipeline queries
* <b>pl.scan_csv()</b> v <b>pl.read_csv()</b>
* visual query optimization w/ <b>.describe_optimized_plan()</b>
* demonstrate <b>.collect()</b> w/ and w/out <b>GPU</b>

# Data --> NASA HTTP logs

* Convert to Apache Parquet for faster querying
* <b>Parse timestamps
* <b>Filter efficiently
* <b>Groupby IPs, URLs, or status codes

In [1]:
!wget -qqq ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz
!wget -qqq ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz

In [2]:
import polars as pl
import gzip
import time
import re

In [16]:
lines = []

with gzip.open("NASA_access_log_Aug95.gz", "rb") as f:
    # Decode as UTF-8 but replace any invalid sequences
    lines += [line.decode("utf-8", errors="replace") for line in f]

with gzip.open("NASA_access_log_Jul95.gz", "rb") as f:
    # Decode as UTF-8 but replace any invalid sequences
    lines += [line.decode("utf-8", errors="replace") for line in f]

In [26]:
# Regex pattern for log parsing
log_pattern = re.compile(
    r'^(?P<host>\S+) '               # Hostname or IP
    r'\S+ '                           # Skip 'ident' column
    r'\S+ '                           # Skip 'authuser' column
    r'\[(?P<datetime>[^\]]+)\] '     # Timestamp inside square brackets
    r'"(?P<request>[^"]+)" '         # Request method/path/protocol
    r'(?P<status>\d{3}) '            # HTTP Status code
    r'(?P<bytes>\S+)'                # Bytes transferred (or '-')
)

In [27]:
# Function to parse a log line into a dictionary
def parse_log_line(line: str):
    match = log_pattern.match(line)
    if match:
        data = match.groupdict()
        data["bytes"] = int(data["bytes"]) if data["bytes"].isdigit() else 0
        data["status"] = int(data["status"])
        return data
    else:
        return {"host": None, "datetime": None, "request": None, "status": None, "bytes": None}

In [28]:
# Convert the raw column into structured data
df = (
    pl.DataFrame({"raw_line": lines})
    .with_columns(
        pl.col("raw_line").map_elements(parse_log_line, return_dtype=pl.Struct)  # Apply parsing
    )
    .unnest("raw_line")  # Extract dictionary into separate columns
)

In [29]:
df.sample(50)

host,datetime,request,status,bytes
str,str,str,i64,i64
"""156.98.204.13""","""06/Jul/1995:13:51:07 -0400""","""GET /shuttle/countdown/ HTTP/1…",200,3985
"""ns.scn.de""","""14/Aug/1995:09:14:09 -0400""","""GET /history/apollo/images/apo…",304,0
"""husehime.students.chiba-u.ac.j…","""04/Jul/1995:22:54:31 -0400""","""GET /images/ksclogo-medium.gif…",200,5866
"""cs1-02.mil.ptd.net""","""25/Aug/1995:23:18:08 -0400""","""GET /shuttle/missions/51-b/mis…",200,6387
"""cmos.engr.siu.edu""","""30/Aug/1995:12:29:32 -0400""","""GET /history/apollo/apollo-13/…",200,12859
…,…,…,…,…
"""cs27port.netvoyage.net""","""09/Jul/1995:23:05:20 -0400""","""GET /images/ksclogo-medium.gif…",200,5866
"""slip136-137.pt.uk.ibm.net""","""18/Aug/1995:03:45:30 -0400""","""GET /images/MOSAIC-logosmall.g…",200,363
"""136.205.124.45""","""17/Jul/1995:12:43:57 -0400""","""GET /history/apollo/apollo-13/…",200,18114
"""131.176.12.122""","""28/Aug/1995:03:24:47 -0400""","""GET /history/apollo/apollo.htm…",200,3260


In [None]:
# -------------------------------------------------------------------------
# 2. Read NASA Log lines into Polars Eager DataFrame
# -------------------------------------------------------------------------
def read_logs_eager(filepath: str) -> pl.DataFrame:
    parsed_rows = []
    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            row = parse_log_line(line)
            if row:
                parsed_rows.append(row)
    
    # Create an eager Polars DataFrame immediately
    df_eager = pl.DataFrame(parsed_rows)
    return df_eager

In [None]:
# -------------------------------------------------------------------------
# 3. Demonstrate Eager processing
# -------------------------------------------------------------------------
def process_eager(df: pl.DataFrame) -> pl.DataFrame:
    # Example: Filter out status < 400, group by host, and sum bytes
    result = (
        df.filter(pl.col("status") < 400)
          .groupby("host")
          .agg([
              pl.count().alias("num_requests"),
              pl.sum("bytes").alias("total_bytes")
          ])
          .sort("total_bytes", descending=True)
    )
    return result

In [None]:
# -------------------------------------------------------------------------
# 4. Demonstrate Lazy processing
# -------------------------------------------------------------------------
def read_logs_lazy(filepath: str) -> pl.LazyFrame:
    # We'll read lines similarly but create a LazyFrame from them
    # Note: Polars does not (yet) have a direct "read lines lazily" method,
    # so we parse into a standard DF, then convert to LazyFrame. 
    # Alternatively, you could build a LazyFrame from a scan of a CSV
    # if the logs were converted to CSV, or from an IPC/Parquet file.

    parsed_rows = []
    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            row = parse_log_line(line)
            if row:
                parsed_rows.append(row)

    df = pl.DataFrame(parsed_rows)
    # Convert to lazy
    return df.lazy()

In [None]:
def process_lazy(lf: pl.LazyFrame) -> pl.DataFrame:
    # Build up the query plan
    result_lf = (
        lf.filter(pl.col("status") < 400)
          .groupby("host")
          .agg([
              pl.count().alias("num_requests"),
              pl.sum("bytes").alias("total_bytes")
          ])
          .sort("total_bytes", descending=True)
    )
    # Collect triggers the actual computation
    return result_lf.collect()

In [None]:
# -------------------------------------------------------------------------
# 5. Main Benchmark
# -------------------------------------------------------------------------
if __name__ == "__main__":
    log_file = "nasa_aug_95.log"  # Adjust to your path

    # --- Eager ---
    start_eager = time.time()
    df_eager = read_logs_eager(log_file)
    eager_result = process_eager(df_eager)
    end_eager = time.time()
    eager_time = end_eager - start_eager

    print(f"[EAGER] Processed rows: {df_eager.shape[0]}")
    print(f"[EAGER] Time elapsed: {eager_time:.4f} seconds")
    print(eager_result.head(5))

    # --- Lazy ---
    start_lazy = time.time()
    lf = read_logs_lazy(log_file)
    lazy_result = process_lazy(lf)
    end_lazy = time.time()
    lazy_time = end_lazy - start_lazy

    print(f"[LAZY] Processed rows: {lazy_result.shape[0]}")
    print(f"[LAZY] Time elapsed: {lazy_time:.4f} seconds")
    print(lazy_result.head(5))