# **Day 2: Advanced Shell & Stream Processing ‚Äî Interactive Lab**

**Objective:** Move from the Kernel (Day 1) to the Shell (Day 2). We will prove why the **Unix Philosophy** (small tools, pipes, streams) is the foundation of modern Data Engineering.

### **Core Concepts:**
1. **The Physics of Pipes:** RAM Buffers vs. Disk I/O.
2. **Tool Mechanics:** Why `grep` is 10x faster than `awk`.
3. **Parallelism:** Saturating CPUs with `xargs` and Multiprocessing.
4. **Process Substitution:** Making streams look like files.

---

## **üß™ Level 1: The Physics of Pipes (RAM vs Disk)**

**Theory Recap:** 
* **Disk-Based:** `cmd1 > file; cmd2 < file`. This writes to the physical disk (Slow, High I/O).
* **Stream-Based:** `cmd1 | cmd2`. This writes to a **64KB Kernel Ring Buffer** in RAM. (Fast, Zero Disk I/O).

**Experiment:** We will simulate a "Pipe Race". We will process 500,000 records using intermediate files (Junior approach) vs. Pipes (Senior approach).

In [1]:
import os
import time

# 1. Setup Data
INPUT_FILE = "large_input.txt"
TEMP_FILE = "temp_intermediate.txt"
DATA_SIZE_LINES = 500_000

if not os.path.exists(INPUT_FILE):
    print(f"Generating {DATA_SIZE_LINES} lines of data...")
    with open(INPUT_FILE, "w") as f:
        for i in range(DATA_SIZE_LINES):
            # Write: "Row <ID> <RandomNum>"
            f.write(f"Row {i} {i*2}\n")
    print("‚úÖ Input file ready.")
else:
    print("‚úÖ Using existing input file.")

‚úÖ Using existing input file.


In [2]:
print("--- [1] Disk-Based Approach (Intermediate File) ---")
start = time.time()

# Step 1: Filter (grep equivalent) -> Write to Disk
with open(INPUT_FILE, 'r') as fin, open(TEMP_FILE, 'w') as fout:
    for line in fin:
        if 'Row' in line:
            fout.write(line)

# Step 2: Process (awk equivalent) -> Read from Disk
count = 0
with open(TEMP_FILE, 'r') as fin:
    for line in fin:
        parts = line.split()
        if len(parts) > 2:
            count += 1

disk_time = time.time() - start
print(f"Time: {disk_time:.4f}s")

# Cleanup temp file
if os.path.exists(TEMP_FILE): os.remove(TEMP_FILE)

--- [1] Disk-Based Approach (Intermediate File) ---
Time: 1.1709s


In [3]:
print("--- [2] Stream-Based Approach (Pipe Logic) ---")
start = time.time()

# Single Pass: Read once, process in memory, never write intermediate
count = 0
with open(INPUT_FILE, 'r') as fin:
    for line in fin:
        # Logic 1: Filter
        if 'Row' in line:
            # Logic 2: Process
            parts = line.split()
            if len(parts) > 2:
                count += 1

pipe_time = time.time() - start
print(f"Time: {pipe_time:.4f}s")

print(f"\nüöÄ Speedup: {disk_time / pipe_time:.2f}x faster using Streams")

--- [2] Stream-Based Approach (Pipe Logic) ---
Time: 0.4879s

üöÄ Speedup: 2.40x faster using Streams


### **More Examples: The Power of Pipes**

**Example 1: Streaming Compression (The Log Saver)**
Imagine you have a 100GB log file. You want to compress it. 
* **Bad:** `cat log.txt > temp.txt; gzip temp.txt` (Writes 100GB to disk twice!)
* **Good:** `cat log.txt | gzip > log.gz` (Compresses in memory buffers. Writes only compressed bytes to disk.)

**Example 2: Network Streaming (Video)**
You want to watch a movie being downloaded.
* **Bad:** `wget movie.mp4`. Wait 10 mins. `vlc movie.mp4`.
* **Good:** `wget -O - movie.mp4 | vlc -` (Watch while downloading. Data flows from Network -> RAM -> Player.)

### **üìù Analysis:**
Why was the stream faster? 
1. **No Write Overhead:** We didn't spend time waiting for the hard drive head to write `temp_intermediate.txt`.
2. **No Read Overhead:** We didn't have to read that temp file back into memory.
3. **CPU Cache:** Data likely stayed in the CPU L1/L2 cache between the filter logic and the counting logic.

---

## **üîß Level 2: Tool Mechanics (grep vs awk)**

**Theory Recap:** 
* **`grep`:** A "Bouncer". It checks simple patterns using highly optimized C code (Boyer-Moore algorithm). It treats text as raw bytes.
* **`awk`:** An "Accountant". It parses *every single line* into fields (`$1`, `$2`...) before running your logic. This parsing is expensive.

**Experiment:** We will search for a specific string (`9999`) in a 1,000,000 line file using both methods.

In [4]:
TOOL_FILE = "tool_race.txt"
LINES = 1_000_000

if not os.path.exists(TOOL_FILE):
    print("Generating tool race data...")
    with open(TOOL_FILE, "w") as f:
        for i in range(LINES):
            f.write(f"Log Entry {i} [INFO] System status normal with value {i}\n")
    print("‚úÖ Data generated.")

In [5]:
print("--- Simulation: grep (Scanning) ---")
start = time.time()
count = 0
with open(TOOL_FILE, 'r') as f:
    for line in f:
        # Grep Logic: Just check existence. No splitting.
        if '9999' in line:
            count += 1
grep_time = time.time() - start
print(f"Time: {grep_time:.4f}s")

--- Simulation: grep (Scanning) ---
Time: 0.6505s


In [6]:
print("--- Simulation: awk (Parsing) ---")
start = time.time()
count = 0
with open(TOOL_FILE, 'r') as f:
    for line in f:
        # Awk Logic: Pay the tax of splitting fields FIRST
        fields = line.split()
        # Then check the specific column
        if len(fields) > 7 and '9999' in fields[7]: 
            count += 1
awk_time = time.time() - start
print(f"Time: {awk_time:.4f}s")

print(f"\n>>> Result: grep logic was {awk_time / grep_time:.1f}x faster.")

--- Simulation: awk (Parsing) ---
Time: 1.3430s

>>> Result: grep logic was 2.1x faster.


### **More Examples: Selecting the Right Tool**

**Example 1: Finding an IP in Access Logs**
* **`grep '192.168.1.1' access.log`**: Fastest. Just scans bytes. Good for "Is this IP here?"
* **`awk '$1 == "192.168.1.1"' access.log`**: Slower. Splits every line by spaces first. Use only if you need to check specific columns (e.g., "IP is in column 1 AND status is 404").

**Example 2: CSV Filtering**
Imagine a CSV: `ID,Name,Age,Salary`.
* **Task:** Find all rows containing "Smith".
    * Use `grep "Smith"`. It ignores commas and just looks for the text.
* **Task:** Find all rows where Salary > 50000.
    * Use `awk -F, '$4 > 50000'`. `grep` cannot do math. You pay the performance cost for the math capability.

---

## **üöÄ Level 3: Parallelism (xargs & Multiprocessing)**

**Theory Recap:** Standard tools (`grep`, `gzip`, Python scripts) are single-threaded. They use 1 CPU core. Modern servers have 64+ cores. 

**Goal:** Saturate the CPU by splitting work. In the Shell, we use `xargs -P`. In Python, `multiprocessing`.

**Experiment:** We will run a CPU-heavy calculation (Sum of Squares) sequentially vs. parallel.

In [None]:
import multiprocessing

def heavy_task(n):
    # Simulate pure CPU work
    count = 0
    for i in range(2_000_000):
        count += i * i
    return count

TASKS = list(range(8)) # 8 Heavy Tasks

print(f"--- Processing {len(TASKS)} CPU-heavy tasks ---")

# 1. Sequential
start = time.time()
for t in TASKS:
    heavy_task(t)
seq_time = time.time() - start
print(f"Sequential Time: {seq_time:.4f}s")

# 2. Parallel (4 Cores)
start = time.time()
with multiprocessing.Pool(processes=4) as pool:
    pool.map(heavy_task, TASKS)
par_time = time.time() - start
print(f"Parallel (4 Workers) Time: {par_time:.4f}s")

print(f"\n>>> Speedup: {seq_time / par_time:.2f}x")

--- Processing 8 CPU-heavy tasks ---
Sequential Time: 1.6900s


### **More Examples: Parallelizing Work**

**Example 1: Mass Image Resizing (CPU Bound)**
You have 10,000 high-res images to resize.
* **Sequential:** `for img in *.jpg; do convert $img ...; done`. (Uses 1 Core. Takes hours.)
* **Parallel:** `ls *.jpg | xargs -P 8 -n 1 convert ...` (Uses 8 Cores. Takes minutes.)

**Example 2: Checking Website Status (I/O Bound)**
You have a list of 1,000 URLs to ping.
* **Sequential:** Ping one, wait for reply, ping next. Most time is spent waiting on the network.
* **Parallel:** `cat urls.txt | xargs -P 50 -n 1 curl -I`. Launch 50 pings at once. The CPU is mostly idle anyway, so you can run many threads.

---

## **üõ†Ô∏è Level 4: Process Substitution (Named Pipes)**

**Theory Recap:** Sometimes a tool *needs* a file argument (like `diff file1 file2`), but your data is in a stream. You don't want to create temp files.

**The Trick:** `diff <(cmd1) <(cmd2)`.
The shell creates a temporary **Named Pipe** (e.g., `/dev/fd/63`) that acts like a file but is actually a RAM buffer.

**Note:** This is a Shell feature. We will demonstrate it using `%%bash`.

In [7]:
%%bash
# Only runs on Linux/Mac/WSL. Windows CMD/Powershell does not support this syntax directly.
if [ "$(uname)" == "Darwin" ] || [ "$(uname)" == "Linux" ]; then
    echo "--- Process Substitution Demo ---"
    
    # We want to compare the output of two commands without saving files.
    # Command 1: echo "Hello"
    # Command 2: echo "World"
    
    # diff expects files. We give it pipes.
    diff <(echo "Hello") <(echo "World")
    
    echo "\n‚úÖ Diff executed without intermediate files."
else
    echo "Skipping Process Substitution (Not supported on Windows Native)"
fi

--- Process Substitution Demo ---
1c1
< Hello
---
> World
\n‚úÖ Diff executed without intermediate files.


### **More Examples: Process Substitution**

**Example 1: Verifying Remote Files**
Compare a local config file with a remote one without downloading.
`diff config.xml <(ssh user@server 'cat /etc/config.xml')`
* The local `diff` tool reads your local file AND the stream coming from SSH as if it were a second file.

**Example 2: Merging Sorted Streams**
You have two commands producing sorted data. You want to merge them.
`sort -m <(generate_data_a | sort) <(generate_data_b | sort)`
* `sort -m` (merge) expects files. We feed it two live streams. It merges them on the fly.

## **üßπ Cleanup**
Removing temporary files created during the lab.

In [8]:
import os
for f in [INPUT_FILE, TEMP_FILE, TOOL_FILE]:
    if os.path.exists(f):
        os.remove(f)
print("Cleanup complete.")

Cleanup complete.
