In [3]:
from IPython.display import display, Math, Markdown

legend_text = r"""
---
**Authors:** Essadi <---> (Jane & Yassine) 
---

---
- \( H \) : Header (2 bytes)
- \( S \) : Status (first 4 bits of \( B_0 \))
- \( L \) : Length (remaining 12 bits)
- \( D \) : Data (record content)
- \( P \) : Padding (if \( L \) is odd)
- \( HS \) : Header Size (2)
- \( T \) : Total Header Size (HS + L)
- \( A \) : Alignment (2)
- \( PS \) : Padding Size
"""

display(Markdown(legend_text))


display(Math(r"H = [B_0, B_1]"))                       # Read 2-byte header
display(Math(r"S = \frac{B_0}{16}"))                   # Extract status
display(Math(r"L = (B_0 \mod 16) \times 256 + B_1"))   # Extract length
display(Math(r"D = f.read(L)"))                        # Read record data
display(Math(r"|D| = L"))                              # Check completeness
display(Math(r"P = (L \mod 2)"))                       # Handle padding
display(Markdown(r"""---"""))


# Equations for total header data and padding size
display(Math(r"\text{T} = HS + L"))
display(Math(r"\text{PS} = (A - (T \% A)) \% A"))


---
**Authors:** Essadi <---> (Jane & Yassine) 
---

---
- \( H \) : Header (2 bytes)
- \( S \) : Status (first 4 bits of \( B_0 \))
- \( L \) : Length (remaining 12 bits)
- \( D \) : Data (record content)
- \( P \) : Padding (if \( L \) is odd)
- \( HS \) : Header Size (2)
- \( T \) : Total Header Size (HS + L)
- \( A \) : Alignment (2)
- \( PS \) : Padding Size


<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

---

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<table border="1">
    <tr>
        <th>Step</th>
        <th>Explanation</th>
    </tr>
    <tr>
        <td>Read 2-byte header</td>
        <td>Read two bytes from the binary file, storing them as \( B_0 \) and \( B_1 \).</td>
    </tr>
    <tr>
        <td>Extract status</td>
        <td>Take the first 4 bits of \( B_0 \) by performing a right shift of 4 bits: \( S = (B_0 >> 4) \).</td>
    </tr>
    <tr>
        <td>Extract length</td>
        <td>Use the last 4 bits of \( B_0 \) and combine with \( B_1 \) to get a 12-bit length value.</td>
    </tr>
    <tr>
        <td>Read record data</td>
        <td>Read \( L \) bytes from the file as the record data.</td>
    </tr>
    <tr>
        <td>Check completeness</td>
        <td>Ensure that the number of bytes read matches the expected length \( L \); otherwise, raise an error.</td>
    </tr>
    <tr>
        <td>Handle padding</td>
        <td>If the length \( L \) is odd, read 1 additional byte as padding.</td>
    </tr>
</table>


### 1️⃣ **Initial Position**  
When the file is opened with `open(filename, 'rb')`, the file pointer is at the **beginning (byte 0)**.

### 2️⃣ **Reading the Header (2 bytes)**  
- The first iteration reads **2 bytes** (`header = f.read(2)`).  
- The file pointer moves forward by **2 bytes**.

### 3️⃣ **Extracting `status` and `length`**  
- The **status** is extracted from the first 4 bits of `byte0`.  
- The **length** (L) of the record is determined from the remaining 12 bits.

### 4️⃣ **Reading the Record Data (`L` bytes)**  
- The function reads **L bytes** (`data = f.read(length)`).  
- The file pointer moves forward by **L bytes**.

### 5️⃣ **Handling Padding** (if `L` is odd)  
- If `L` is **odd**, one extra byte (padding) is read (`f.read(1)`).  
- This ensures alignment for the next record.

### 🔄 **Next Iteration (New Position)**  
- The next iteration of the `while` loop starts from **where the last read ended** because the file pointer was never reset.  
- The pointer moves forward **by (2 + L + padding) bytes** per record.

### ✅ **Key Concept**  
The file pointer **does not reset** after reading because `f.read(N)` **advances the pointer** automatically. Each iteration picks up where the previous one left off. 

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Dev Box Centered Image</title>
    <style>
        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }
        .dev-box {
            display: flex;
            justify-content: center;
            align-items: center;
            border: 2px solid #00ff00;
            box-shadow: 0 0 10px #00ff00;
        }
    </style>
</head>
<body>
    <div class="dev-box">
        <img src="./assets/bitwise-right-shift-operator.png" height="300" />
    </div>
</body>
</html>


<hr style='color:orange' width=280/>
<hr style='color:orange' width=380/>
<hr style='color:orange' width=480/>

#### max rdd size for spark: (2^31 - 1)

<hr style='color:skyblue'/>
<hr style='color:skyblue'/>
<hr style='color:skyblue'/>

In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, BinaryType, FloatType,StringType
import schema.ENR_TFONC as sc
from array import array
import enum
import mmap
import gc

### ======================================================================
### 1. Spark Session Configuration
### ======================================================================

In [2]:
spark = SparkSession.builder \
    .appName("LargeScaleBinaryProcessor") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.cores", "4") \
    .config("spark.default.parallelism", "100") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

### ======================================================================
# If running in a Jupyter Notebook, you can import clear_output:
### ======================================================================

try:
    from IPython.display import clear_output
except ImportError:
    clear_output = None

### ======================================================================
### 2. Distributed Binary File Processing
### ======================================================================

In [3]:
class RecordType(enum.Enum):
    SYSTEM_RECORD = 1
    DELETED_RECORD = 2
    SYSTEM_RESERVED = 3
    NORMAL_USER_DATA = 4
    POINTER_RECORD = 6
    USER_DATA_RECORD = 7
    RESERVED_FOR_FUTURE_USE = 9

class RecordHeader():
    def __init__(self, filename, record_length: list = [], header_size: int = 2, alignment: int = 2):
        if header_size not in {2, 4}:
            raise ValueError("Only 2-byte or 4-byte headers are supported")
        
        self.filename = filename
        self.header_size = header_size
        self.record_length = record_length
        self.allowed_statuses = [
            RecordType.NORMAL_USER_DATA.value,
            RecordType.SYSTEM_RESERVED.value,
            RecordType.USER_DATA_RECORD.value,
            RecordType.RESERVED_FOR_FUTURE_USE.value,
            RecordType.SYSTEM_RECORD.value,
            RecordType.POINTER_RECORD.value,
        ]
        self.alignment = alignment
        self.file = None        # Will be set in __enter__
        self.mmap_obj = None    # Will be set in __enter__

    def __enter__(self):
        """Opens the file and creates a memory map for it."""
        self.file = open(self.filename, 'rb')
        self.mmap_obj = mmap.mmap(self.file.fileno(), length=0, access=mmap.ACCESS_READ)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Closes the memory map and file when exiting the context."""
        if self.mmap_obj:
            self.mmap_obj.close()
        if self.file:
            self.file.close()

    def _parse_header(self, header_bytes: bytes) -> tuple[int, int]:
        if self.header_size == 2:
            byte0, byte1 = header_bytes
            # Extract status (first 4 bits of byte0)
            status = (byte0 >> 4) & 0x0F
            # Extract length (remaining 12 bits)
            length = ((byte0 & 0x0F) << 8) | byte1
        elif self.header_size == 4:
            # Extract status (first 4 bits of byte0)
            byte0, byte1, byte2, byte3 = header_bytes
            status = (byte0 >> 4) & 0x0F
            # Extract length (remaining 12 bits)
            length = ((byte0 & 0x0F) << 24) | (byte1 << 16) | (byte2 << 8) | byte3

        return status, length
    
    def _parse_padding(self, length: int):
        # --------------------------------------------
        # Example :
        # (2 - (107 + 2 % 2)) % 2
        # --------------------------------------------
        # if length % 2 != 0:
        #     f.seek(1, 1)

        total_header_data = self.header_size + length
        padding_size = (self.alignment - (total_header_data % self.alignment)) % self.alignment
        return padding_size


    def read_records(self):
        try:
            f = self.mmap_obj
            while True:
                # Read the 2-byte header
                pos = f.tell()
                header = f.read(self.header_size)
                if len(header) < self.header_size:
                    break  # End of file or incomplete header
                
                status, length = self._parse_header(header)

                # Read the record data
                data = f.read(length)
                
                if len(data) < length:
                    raise ValueError(f"Incomplete data at position {pos}")

                # if status in self.allowed_statuses and length in self.record_length:
                if status in self.allowed_statuses:
                    yield length, bytes(data)

                padding_size = self._parse_padding(length=length)

                f.seek(padding_size, 1)
        except:
            raise ValueError(f"Incomplete data at position {pos}")
        finally:
            ...

<hr style='color:orange'>
<hr style='color:orange'>
<hr style='color:orange'>
<hr style='color:orange'>

To optimize writing **1 M records per batch**, we need to balance **memory efficiency**, **parallelism**, and **disk I/O performance**. Here's an optimized approach we need to follow:

### **Optimized Batch Writing Strategy**
1. **Batch Processing (1M records per batch)**
2. **Parallelizing Writes (`numSlices=7`)**
3. **Explicit Caching & Repartitioning (`coalesce(4)` to reduce small files)**
4. **Tuning `parquet.block.size` for better disk I/O performance**

---

### **Why This is Optimized**
✅ **Memory Efficiency**: Processing in **1M record batches** prevents memory overload.  
✅ **Parallelism**: `numSlices=7` ensures efficient **CPU core usage**.  
✅ **Parquet Compression (`snappy`)**: Fast compression with minimal CPU overhead.  
✅ **Partition Optimization (`repartition(4)`)**: Prevents small file issues.  
✅ **Spark Configuration Optimizations**:
   - **`spark.sql.files.maxPartitionBytes = 128MB`**: Optimizes partition sizes for large files.
   - **`spark.sql.shuffle.partitions = 7`**: Reduces shuffle overhead.

---

### **Want Even Faster Writes?**
🔹 **Increase `numSlices` to match your CPU cores.**  
🔹 **Try `coalesce(2)` instead of `repartition(4)` if writing fewer large files is preferable.**  
🔹 **Use a larger block size for Parquet (`parquet.block.size=256MB`).**

<hr style='color:orange'>
<hr style='color:orange'>
<hr style='color:orange'>
<hr style='color:orange'>

# Processing Records in Batches

This script reads records from a file and processes them in batches to optimize performance and memory usage.

## Steps:

1. **Initialize the Record Reader**  
   - The `RecordHeader` context manager is used to open and read the file.
   - The `filename` is set to `path_input`, and the records are read with a `header_size` of 2 and `alignment` of 2.

2. **Iterate Through Records**  
   - The `read_records()` method returns `length` and `data` for each record.
   - These records are stored in a list named `records`.

3. **Batch Processing**  
   - If the number of records in `records` reaches or exceeds `batch_size`, the following steps are executed:
     - Convert the list of records into an RDD with `numSlices=7` for parallel processing.
     - Create a Spark DataFrame from the RDD using the defined `schema`.
     - Reduce the number of output files by coalescing the DataFrame into 7 partitions.
     - Write the DataFrame to a Parquet file, partitioning by the `"rdw"` column in **append mode**.

4. **Memory Management**  
   - The `records` list is cleared to free up memory.
   - The DataFrame and RDD are explicitly **unpersisted**.
   - Spark's catalog cache is cleared.
   - The variables `rdd` and `df` are deleted to further manage memory.

## Summary:

- **Batch processing** ensures efficient writing to Parquet.
- **Memory cleanup** prevents excessive memory usage.
- **Parallelization** is handled using Spark's RDD and DataFrame APIs.

### ==================================================================================================================
### 3. Create DataFrame from list of records and length of each record using batch logic since the data it's so large
### ==================================================================================================================

In [None]:
# -------------------------------------------
# User Input: Define File Paths
# -------------------------------------------

path_intput = "../data/filename"
path_output = "../output/filename.parquet"



# -------------------------------------------
# Define Schema for Spark DataFrame
# -------------------------------------------


schema = StructType([
    StructField("rdw", IntegerType()),
    StructField("value", BinaryType())
])

# Read Records in Batches
# --------------------------------------------------------------
# # batch size based on memory constraints -> 1_900_000 * 836 bytes = ~950 MB
# --------------------------------------------------------------
batch_size = 1_900_000  
records = []


with RecordHeader(filename=path_intput, header_size=2, alignment=2) as record_header:
    for length, data in record_header.read_records():
        records.append((length, data))
        
        # Write to Parquet when batch size is reached
        if len(records) >= batch_size:
            rdd = spark.sparkContext.parallelize(records, numSlices=7)
            df = spark.createDataFrame(rdd, schema=schema)
            # Reduce number of output files
            df = df.coalesce(7)

            # Write in batches to Parquet
            df.write.partitionBy("rdw").mode("append").parquet(path_output)

            # Clear records to avoid memory issues
            records.clear()
            df.unpersist(blocking=True)
            rdd.unpersist(blocking=True)
            spark.catalog.clearCache()
            del rdd, df

## Write remaining records if any
##
## since the loop will iterate may some chunks less then 1_900_000 so the code will run last condition if any remaining records here <<J a N e@@yushin>>
print("Batch writing complete. and start for remaining records if any recors still op there.")
##

# -------------------------------------------
# Write Remaining Records (if any)
# -------------------------------------------
if records:
    rdd = spark.sparkContext.parallelize(records, numSlices=7)
    df = spark.createDataFrame(rdd, schema=schema)
    df.write.partitionBy("rdw").mode("append").parquet(path_output)
    records.clear()


# -------------------------------
# Cleanup Section
# -------------------------------

# -------------------------------------------
# Cleanup and Garbage Collection
# -------------------------------------------
gc.collect()
df.unpersist(blocking=True)
rdd.unpersist(blocking=True)
spark.catalog.clearCache()
spark.stop()

# Delete large variables if no longer needed.
del rdd, df

# Clear Jupyter Notebook Output (if applicable)
clear_output(wait=True)

print("Batch writing complete.")

Batch writing complete.


### ======================================================================
# 4. Processing Final Output
### ======================================================================

In [5]:
def unpack_number(p):
    a = array('B', p)
    v = float(0)
    for i in a[:-1]:
        v = (v * 100) + ( ( (i & 0xf0) >> 4) * 10) + (i & 0xf)
    i = a[-1]
    v = (v * 10) + ((i & 0xf0) >> 4)
    if (i & 0xf) == 0xd:
        v = -v
    return int(v)

def comp_3_number(p,num):
    a = array('B', p)
    v = float(0)
    for i in a[:-1]:
        v = (v * 100) + ( ( (i & 0xf0) >> 4) * 10) + (i & 0xf)
    i = a[-1]
    v = (v * 10) + ((i & 0xf0) >> 4)
    if (i & 0xf) == 0xd:
        v = -v
    v = v / 10**num
    return float(v)

unpack_number_udf = F.udf(unpack_number, IntegerType())
comp_3_number_udf = F.udf(lambda p, num: comp_3_number(p, num), FloatType())

# --------------------------------------------
# Generate Column Selection using Expressions
# --------------------------------------------

def generate_column_selection(ENR_REGLT_ACC):
    columns = []
    for start, end, length, name, is_packed, is_comp_3 in ENR_REGLT_ACC:
        if is_comp_3 != 0:
            columns.append(comp_3_number_udf(F.encode(F.decode(F.substring("value", start, length),charset='latin1'), charset='latin1'),F.lit(is_comp_3)).alias(name.replace('-','_')))
        elif is_packed:
            columns.append(unpack_number_udf(F.encode(F.substring(F.decode("value", charset='latin1'), start, length), charset='latin1')).alias(name.replace('-','_')))
        else:
            columns.append(F.decode(F.substring(F.col("value"), start, length),'latin1').alias(name.replace('-','_')))
    return columns

column_selection = generate_column_selection(sc.ENR_TFONC)
extracted_df = df.select(*column_selection)
extracted_df.show(truncate=False)