In [None]:
import subprocess
from datetime import datetime

import lxml.etree as ET
import polars as pl
import py7zr


def process_xml_in_7z_with_7z_executable(
    archive_path,
    batch_size=1000,
    date_xpath=".//CreationDate",
    start_date=None,
    end_date=None,
    record_tag="row",
):
    """
    Process an XML file within a 7z archive using the 7z command line tool to pipe data.
    This approach avoids extracting to disk.

    Args:
        archive_path (str): Path to the .7z archive
        batch_size (int): Number of elements to process in each batch
        date_xpath (str): XPath to the date element within each record
        start_date (str): Optional start date in format 'YYYY-MM-DD'
        end_date (str): Optional end date in format 'YYYY-MM-DD'
        record_tag (str): XML tag name for records to process

    Returns:
        pl.DataFrame: Polars DataFrame containing the processed data
    """
    # Convert date strings to datetime objects if provided
    start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
    end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None

    # Get the filename inside the archive
    with py7zr.SevenZipFile(archive_path, mode="r") as archive:
        file_list = archive.getnames()
        if not file_list:
            raise ValueError("No files found in archive")
        xml_filename = file_list[0]

    # Use 7z command-line tool to pipe the content without extraction
    cmd = ["7z", "e", "-so", archive_path, xml_filename]
    print(f"Executing: {' '.join(cmd)}")

    # Initialize data lists
    all_data = []
    batch = []
    total_processed = 0
    total_skipped = 0

    # Create a subprocess to stream the data
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Use lxml's iterparse to process the stream
    context = ET.iterparse(
        process.stdout, events=("end",), tag=record_tag, recover=True, huge_tree=True
    )

    try:
        for event, elem in context:
            # Extract date value from attributes (common in StackOverflow XML)
            date_attr = elem.get("CreationDate")

            if date_attr:
                date_str = date_attr
                try:
                    # Handle ISO format dates (common in StackOverflow)
                    if "T" in date_str:
                        record_date = datetime.fromisoformat(
                            date_str.replace("Z", "+00:00")
                        )
                    else:
                        record_date = datetime.strptime(date_str, "%Y-%m-%d")

                    # Apply date filtering
                    date_in_range = True
                    if start_dt and record_date.date() < start_dt.date():
                        date_in_range = False
                    if end_dt and record_date.date() > end_dt.date():
                        date_in_range = False

                    if date_in_range:
                        # Extract all attributes into a dictionary
                        row_data = dict(elem.attrib)
                        batch.append(row_data)
                    else:
                        total_skipped += 1
                except (ValueError, TypeError) as e:
                    print(
                        f"Skipping record with invalid date format: {date_str}, error: {e}"
                    )
                    total_skipped += 1

            # Process in batches
            if len(batch) >= batch_size:
                all_data.extend(batch)
                total_processed += len(batch)
                print(
                    f"Processed {total_processed} records, skipped {total_skipped} records"
                )
                batch = []

            # Clear the element from memory
            elem.clear()
            # Also eliminate now-empty references from the root node to elem
            while elem.getprevious() is not None:
                del elem.getparent()[0]

            # Every 10,000 records, convert current data to DataFrame to free memory
            if total_processed > 0 and total_processed % 10000 == 0:
                if all_data:
                    # Convert to polars dataframe
                    temp_df = pl.from_dicts(all_data)
                    if "df" not in locals():
                        df = temp_df
                    else:
                        # Append to existing dataframe
                        df = pl.concat([df, temp_df])
                    # Clear the all_data list to free memory
                    all_data = []

    finally:
        # Terminate the subprocess if it's still running
        if process.poll() is None:
            process.terminate()

    # Process any remaining items
    if batch:
        all_data.extend(batch)
        total_processed += len(batch)

    print(f"Completed. Total processed: {total_processed}, skipped: {total_skipped}")

    # Convert remaining data to DataFrame
    if all_data:
        temp_df = pl.from_dicts(all_data)
        if "df" in locals():
            df = pl.concat([df, temp_df])
        else:
            df = temp_df

    if "df" in locals() and not df.is_empty():
        print(
            f"Created Polars DataFrame with {len(df)} rows and {len(df.columns)} columns"
        )
        return df
    else:
        print("No data matched the criteria")
        return pl.DataFrame()


def chunked_process_xml_in_7z(
    archive_path,
    batch_size=1000,
    date_xpath=".//CreationDate",
    start_date=None,
    end_date=None,
    record_tag="row",
):
    """
    Process an XML file within a 7z archive in chunks, writing intermediate results to parquet.
    This approach uses very little memory.
    """
    # Convert date strings to datetime objects if provided
    start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
    end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None

    # Get the filename inside the archive
    with py7zr.SevenZipFile(archive_path, mode="r") as archive:
        file_list = archive.getnames()
        if not file_list:
            raise ValueError("No files found in archive")
        xml_filename = file_list[0]

    # Create a temporary directory for chunk files
    chunk_files = []
    # with tempfile.TemporaryDirectory() as temp_dir:
    temp_dir = "data/temp/"
    # Use 7z command-line tool to pipe the content without extraction
    cmd = ["7z", "e", "-so", archive_path, xml_filename]
    print(f"Executing: {' '.join(cmd)}")

    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Process XML in chunks with low memory footprint
    context = ET.iterparse(
        process.stdout,
        events=("end",),
        tag=record_tag,
        recover=True,
        huge_tree=True,
    )

    all_data = []
    total_processed = 0
    total_skipped = 0
    chunk_count = 0

    # Keep track of all column names across all records
    all_columns = set()

    # Sample each record to determine the schema
    # We'll use this later to ensure consistent types
    schema_samples = {}

    try:
        for event, elem in context:
            date_attr = elem.get("CreationDate")

            if date_attr:
                date_str = date_attr
                try:
                    # Parse date
                    if "T" in date_str:
                        record_date = datetime.fromisoformat(
                            date_str.replace("Z", "+00:00")
                        )
                    else:
                        record_date = datetime.strptime(date_str, "%Y-%m-%d")

                    # Apply date filtering
                    date_in_range = True
                    if start_dt and record_date.date() < start_dt.date():
                        date_in_range = False
                    if end_dt and record_date.date() > end_dt.date():
                        date_in_range = False

                    if date_in_range:
                        row_data = dict(elem.attrib)

                        # Collect schema samples (first non-null value for each column)
                        for key, value in row_data.items():
                            if key not in schema_samples and value is not None:
                                schema_samples[key] = value

                        # Update the set of all columns
                        all_columns.update(row_data.keys())
                        all_data.append(row_data)
                        total_processed += 1
                    else:
                        total_skipped += 1
                except (ValueError, TypeError) as e:
                    print(
                        f"Skipping record with invalid date format: {date_str}, error: {e}"
                    )
                    total_skipped += 1

            # Clear memory
            elem.clear()
            while elem.getprevious() is not None:
                del elem.getparent()[0]

            # Write chunk to disk when it reaches batch size
            if len(all_data) >= batch_size:
                # Make sure all dictionaries have the same keys
                for row in all_data:
                    for col in all_columns:
                        if col not in row:
                            row[col] = None  # Add missing columns with None values
                        # Convert all potential numeric fields to strings for consistency
                        elif row[col] is not None:
                            row[col] = str(row[col])

                chunk_df = pl.from_dicts(all_data, infer_schema_length=1000000)
                chunk_file = f"{temp_dir}/chunk_{chunk_count}.parquet"

                # Convert all columns to string type for consistency
                # This ensures we don't have type mismatches between chunks
                for col in chunk_df.columns:
                    if chunk_df[col].dtype != pl.String:
                        chunk_df = chunk_df.with_columns(pl.col(col).cast(pl.String))

                chunk_df.write_parquet(chunk_file)
                chunk_files.append(chunk_file)

                print(
                    f"Processed {total_processed} records, skipped {total_skipped}. Saved chunk {chunk_count}"
                )
                chunk_count += 1
                all_data = []  # Free memory

        # Process remaining data
        if all_data:
            # Make sure all dictionaries have the same keys
            for row in all_data:
                for col in all_columns:
                    if col not in row:
                        row[col] = None  # Add missing columns with None values
                    # Convert all potential numeric fields to strings for consistency
                    elif row[col] is not None:
                        row[col] = str(row[col])

            chunk_df = pl.from_dicts(all_data, infer_schema_length=400000)
            chunk_file = f"{temp_dir}/chunk_{chunk_count}.parquet"

            # Convert all columns to string type for consistency
            for col in chunk_df.columns:
                if chunk_df[col].dtype != pl.String:
                    chunk_df = chunk_df.with_columns(pl.col(col).cast(pl.String))

            chunk_df.write_parquet(chunk_file)
            chunk_files.append(chunk_file)

        print(
            f"Completed. Total processed: {total_processed}, skipped: {total_skipped}"
        )

        # Combine all chunks
        if chunk_files:
            # Read each parquet file with explicit schema control
            dfs = []
            schema = None

            # First pass: determine a unified schema across all files
            for file in chunk_files:
                try:
                    temp_schema = pl.read_parquet(file, n_rows=1).schema
                    if schema is None:
                        schema = temp_schema
                    else:
                        # Update schema to include all columns
                        for col_name, dtype in temp_schema.items():
                            if col_name not in schema:
                                schema[col_name] = dtype
                except Exception as e:
                    print(f"Error reading schema from {file}: {e}")

            # Second pass: read files with the unified schema
            for file in chunk_files:
                try:
                    # Use the unified schema for all chunks
                    temp_df = pl.read_parquet(file)

                    # Ensure all columns from the schema exist
                    for col_name in schema.keys():
                        if col_name not in temp_df.columns:
                            # Add missing columns with null values
                            temp_df = temp_df.with_columns(pl.lit(None).alias(col_name))

                    # Cast all columns to string to ensure type consistency
                    for col in temp_df.columns:
                        temp_df = temp_df.with_columns(pl.col(col).cast(pl.String))

                    dfs.append(temp_df)
                except Exception as e:
                    print(f"Error reading {file}: {e}")

            # Combine all DataFrames with vertical concatenation
            if dfs:
                df = pl.concat(dfs)
                print(
                    f"Created final DataFrame with {len(df)} rows and {len(df.columns)} columns"
                )
            else:
                print("Failed to read any chunks")
                df = pl.DataFrame()

            # Save the final result
            output_file = "stackoverflow_filtered_data.parquet"
            df.write_parquet(output_file)
            print(f"Data saved to {output_file}")

            return df
        else:
            print("No data matched the criteria")
            return pl.DataFrame()

    finally:
        # Terminate the subprocess if still running
        if process.poll() is None:
            process.terminate()


In [8]:
# Use the chunked approach which is most memory and disk efficient
df = chunked_process_xml_in_7z(
    "data/stackoverflow.com-Posts.7z",
    batch_size=100000,
    date_xpath=".//CreationDate",
    start_date="2024-01-01",
    end_date="2024-12-31",
    record_tag="row",
)

# Display the first few rows
if not df.is_empty():
    print("\nDataFrame Preview:")
    print(df.head())

Executing: 7z e -so data/stackoverflow.com-Posts.7z Posts.xml
Processed 100000 records, skipped 59433515. Saved chunk 0
Processed 200000 records, skipped 59433515. Saved chunk 1
Processed 300000 records, skipped 59433515. Saved chunk 2
Completed. Total processed: 385533, skipped: 59433515


ShapeError: unable to vstack, column names don't match: "AcceptedAnswerId" and "ParentId"

In [12]:
df0 = pl.read_parquet("data/temp/chunk_0.parquet")
df1 = pl.read_parquet("data/temp/chunk_1.parquet")

In [13]:
df0.shape

(100000, 22)

In [14]:
df1.shape

(100000, 22)

In [15]:
df2 = pl.read_parquet("data/temp/chunk_2.parquet")
df3 = pl.read_parquet("data/temp/chunk_3.parquet")

In [16]:
df2.shape

(100000, 22)

In [17]:
df3.shape

(85533, 22)

In [None]:
import polars as pl


def process_xml_in_7z_with_7z_executable(
    archive_path,
    batch_size=1000,
    date_xpath=".//CreationDate",
    start_date=None,
    end_date=None,
    record_tag="row",
):
    """
    Process an XML file within a 7z archive using the 7z command line tool to pipe data.
    This approach avoids extracting to disk.

    Args:
        archive_path (str): Path to the .7z archive
        batch_size (int): Number of elements to process in each batch
        date_xpath (str): XPath to the date element within each record
        start_date (str): Optional start date in format 'YYYY-MM-DD'
        end_date (str): Optional end date in format 'YYYY-MM-DD'
        record_tag (str): XML tag name for records to process

    Returns:
        pl.DataFrame: Polars DataFrame containing the processed data
    """
    # Convert date strings to datetime objects if provided
    start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
    end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None

    # Get the filename inside the archive
    with py7zr.SevenZipFile(archive_path, mode="r") as archive:
        file_list = archive.getnames()
        if not file_list:
            raise ValueError("No files found in archive")
        xml_filename = file_list[0]

    # Use 7z command-line tool to pipe the content without extraction
    cmd = ["7z", "e", "-so", archive_path, xml_filename]
    print(f"Executing: {' '.join(cmd)}")

    # Initialize data lists
    all_data = []
    batch = []
    total_processed = 0
    total_skipped = 0

    # Create a subprocess to stream the data
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Use lxml's iterparse to process the stream
    context = ET.iterparse(
        process.stdout, events=("end",), tag=record_tag, recover=True, huge_tree=True
    )

    try:
        for event, elem in context:
            # Extract date value from attributes (common in StackOverflow XML)
            date_attr = elem.get("CreationDate")

            if date_attr:
                date_str = date_attr
                try:
                    # Handle ISO format dates (common in StackOverflow)
                    if "T" in date_str:
                        record_date = datetime.fromisoformat(
                            date_str.replace("Z", "+00:00")
                        )
                    else:
                        record_date = datetime.strptime(date_str, "%Y-%m-%d")

                    # Apply date filtering
                    date_in_range = True
                    if start_dt and record_date.date() < start_dt.date():
                        date_in_range = False
                    if end_dt and record_date.date() > end_dt.date():
                        date_in_range = False

                    if date_in_range:
                        # Extract all attributes into a dictionary
                        row_data = dict(elem.attrib)
                        batch.append(row_data)
                    else:
                        total_skipped += 1
                except (ValueError, TypeError) as e:
                    print(
                        f"Skipping record with invalid date format: {date_str}, error: {e}"
                    )
                    total_skipped += 1

            # Process in batches
            if len(batch) >= batch_size:
                all_data.extend(batch)
                total_processed += len(batch)
                print(
                    f"Processed {total_processed} records, skipped {total_skipped} records"
                )
                batch = []

            # Clear the element from memory
            elem.clear()
            # Also eliminate now-empty references from the root node to elem
            while elem.getprevious() is not None:
                del elem.getparent()[0]

            # Every 10,000 records, convert current data to DataFrame to free memory
            if total_processed > 0 and total_processed % 10000 == 0:
                if all_data:
                    # Convert to polars dataframe
                    temp_df = pl.from_dicts(all_data, infer_schema_length=100000)
                    if "df" not in locals():
                        df = temp_df
                    else:
                        # Append to existing dataframe
                        df = pl.concat([df, temp_df])
                    # Clear the all_data list to free memory
                    all_data = []

    finally:
        # Terminate the subprocess if it's still running
        if process.poll() is None:
            process.terminate()

    # Process any remaining items
    if batch:
        all_data.extend(batch)
        total_processed += len(batch)

    print(f"Completed. Total processed: {total_processed}, skipped: {total_skipped}")

    # Convert remaining data to DataFrame
    if all_data:
        temp_df = pl.from_dicts(all_data, infer_schema_length=100000)
        if "df" in locals():
            df = pl.concat([df, temp_df])
        else:
            df = temp_df

    if "df" in locals() and not df.is_empty():
        print(
            f"Created Polars DataFrame with {len(df)} rows and {len(df.columns)} columns"
        )
        return df
    else:
        print("No data matched the criteria")
        return pl.DataFrame()


def chunked_process_xml_in_7z(
    archive_path,
    batch_size=1000,
    start_date=None,
    end_date=None,
    record_tag="row",
):
    """
    Process an XML file within a 7z archive in chunks, writing intermediate results to parquet.
    This approach uses very little memory and ensures consistent column order across chunks.
    """
    # Convert date strings to datetime objects if provided
    start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
    end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None

    # Get the filename inside the archive
    with py7zr.SevenZipFile(archive_path, mode="r") as archive:
        file_list = archive.getnames()
        if not file_list:
            raise ValueError("No files found in archive")
        xml_filename = file_list[0]

    # Create a temporary directory for chunk files
    chunk_files = []
    temp_dir = "data/temp/"

    # Use 7z command-line tool to pipe the content without extraction
    cmd = ["7z", "e", "-so", archive_path, xml_filename]
    print(f"Executing: {' '.join(cmd)}")

    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Process XML in chunks with low memory footprint
    context = ET.iterparse(
        process.stdout,
        events=("end",),
        tag=record_tag,
        recover=True,
        huge_tree=True,
    )

    all_data = []
    total_processed = 0
    total_skipped = 0
    chunk_count = 0

    # Track all columns across all records and maintain a consistent order
    all_columns = set()

    try:
        # First pass - scan the beginning to determine most likely columns
        # This helps establish a baseline schema
        preview_count = 0
        preview_limit = min(
            1000, batch_size
        )  # Preview first 1000 records or batch_size
        preview_data = []

        for event, elem in context:
            if preview_count >= preview_limit:
                break

            date_attr = elem.get("CreationDate")
            if date_attr:
                try:
                    # Parse date for filtering
                    if "T" in date_attr:
                        record_date = datetime.fromisoformat(
                            date_attr.replace("Z", "+00:00")
                        )
                    else:
                        record_date = datetime.strptime(date_attr, "%Y-%m-%d")

                    # Apply date filtering
                    date_in_range = True
                    if start_dt and record_date.date() < start_dt.date():
                        date_in_range = False
                    if end_dt and record_date.date() > end_dt.date():
                        date_in_range = False

                    if date_in_range:
                        row_data = dict(elem.attrib)
                        preview_data.append(row_data)
                        # Update the set of all columns
                        all_columns.update(row_data.keys())
                        preview_count += 1
                except (ValueError, TypeError):
                    pass

            # Clear memory
            elem.clear()
            while elem.getprevious() is not None:
                del elem.getparent()[0]

        # Convert to ordered list to maintain consistent column order
        ordered_columns = sorted(list(all_columns))
        print(f"Established initial schema with {len(ordered_columns)} columns")

        # Reset the process to start from the beginning
        process.terminate()
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        context = ET.iterparse(
            process.stdout,
            events=("end",),
            tag=record_tag,
            recover=True,
            huge_tree=True,
        )

        # Now process the full file with established schema
        for event, elem in context:
            date_attr = elem.get("CreationDate")

            if date_attr:
                date_str = date_attr
                try:
                    # Parse date
                    if "T" in date_str:
                        record_date = datetime.fromisoformat(
                            date_str.replace("Z", "+00:00")
                        )
                    else:
                        record_date = datetime.strptime(date_str, "%Y-%m-%d")

                    # Apply date filtering
                    date_in_range = True
                    if start_dt and record_date.date() < start_dt.date():
                        date_in_range = False
                    if end_dt and record_date.date() > end_dt.date():
                        date_in_range = False

                    if date_in_range:
                        # Get all attributes and ensure string values for consistency
                        row_data = {
                            k: str(v) if v is not None else None
                            for k, v in elem.attrib.items()
                        }

                        # Add this row's columns to our master list if new ones appear
                        new_columns = set(row_data.keys()) - set(ordered_columns)
                        if new_columns:
                            ordered_columns.extend(sorted(new_columns))

                        all_data.append(row_data)
                        total_processed += 1
                    else:
                        total_skipped += 1
                except (ValueError, TypeError) as e:
                    print(
                        f"Skipping record with invalid date format: {date_str}, error: {e}"
                    )
                    total_skipped += 1

            # Clear memory
            elem.clear()
            while elem.getprevious() is not None:
                del elem.getparent()[0]

            # Write chunk to disk when it reaches batch size
            if len(all_data) >= batch_size:
                # Create DataFrame with normalized structure
                chunk_df = pl.from_dicts(all_data, infer_schema_length=100000)

                # Add any missing columns from our master list
                missing_columns = [
                    col for col in ordered_columns if col not in chunk_df.columns
                ]
                for col in missing_columns:
                    chunk_df = chunk_df.with_columns(
                        pl.lit(None).cast(pl.String).alias(col)
                    )

                # Ensure all columns are strings for consistency
                chunk_df = chunk_df.select(
                    [pl.col(col).cast(pl.String) for col in chunk_df.columns]
                )

                # Reorder columns to match our standard order
                available_columns = [
                    col for col in ordered_columns if col in chunk_df.columns
                ]
                chunk_df = chunk_df.select(available_columns)

                # Write to parquet
                chunk_file = f"{temp_dir}/chunk_{chunk_count}.parquet"
                chunk_df.write_parquet(chunk_file)
                chunk_files.append(chunk_file)

                print(
                    f"Processed {total_processed} records, skipped {total_skipped}. Saved chunk {chunk_count}"
                )
                chunk_count += 1
                all_data = []  # Free memory

        # Process remaining data
        if all_data:
            # Create DataFrame with normalized structure
            chunk_df = pl.from_dicts(all_data, infer_schema_length=100000)

            # Add any missing columns from our master list
            missing_columns = [
                col for col in ordered_columns if col not in chunk_df.columns
            ]
            for col in missing_columns:
                chunk_df = chunk_df.with_columns(
                    pl.lit(None).cast(pl.String).alias(col)
                )

            # Ensure all columns are strings for consistency
            chunk_df = chunk_df.select(
                [pl.col(col).cast(pl.String) for col in chunk_df.columns]
            )

            # Reorder columns to match our standard order
            available_columns = [
                col for col in ordered_columns if col in chunk_df.columns
            ]
            chunk_df = chunk_df.select(available_columns)

            # Write to parquet
            chunk_file = f"{temp_dir}/chunk_{chunk_count}.parquet"
            chunk_df.write_parquet(chunk_file)
            chunk_files.append(chunk_file)

        print(
            f"Completed. Total processed: {total_processed}, skipped: {total_skipped}"
        )

        # Combine all chunks
        if chunk_files:
            # Read and combine all parquet files
            dfs = []

            for file in chunk_files:
                try:
                    # Read the parquet file
                    temp_df = pl.read_parquet(file)
                    dfs.append(temp_df)
                except Exception as e:
                    print(f"Error reading {file}: {e}")

            # Combine all DataFrames with vertical concatenation
            if dfs:
                # All DataFrames should have the same column order now
                df = pl.concat(dfs)

                # Final safety check - ensure all columns have same type
                df = df.select([pl.col(col).cast(pl.String) for col in df.columns])

                print(
                    f"Created final DataFrame with {len(df)} rows and {len(df.columns)} columns"
                )
            else:
                print("Failed to read any chunks")
                df = pl.DataFrame()

            # Save the final result
            output_file = "stackoverflow_filtered_data.parquet"
            df.write_parquet(output_file)
            print(f"Data saved to {output_file}")

            return df
        else:
            print("No data matched the criteria")
            return pl.DataFrame()

    finally:
        # Terminate the subprocess if still running
        if process.poll() is None:
            process.terminate()

In [3]:
# Use the chunked approach which is most memory and disk efficient
df = chunked_process_xml_in_7z(
    "data/stackoverflow.com-Posts.7z",
    batch_size=100000,
    start_date="2021-01-01",
    end_date="2024-12-31",
    record_tag="row",
)

# Display the first few rows
if not df.is_empty():
    print("\nDataFrame Preview:")
    print(df.head())

Executing: 7z e -so data/stackoverflow.com-Posts.7z Posts.xml


KeyboardInterrupt: 

In [None]:
import os

import polars as pl

In [2]:
chunks = []
# List files in directory
files = os.listdir("data/temp/")
for file in files:
    if file.endswith(".parquet"):
        chunk = pl.read_parquet(f"data/temp/{file}")
        chunks.append(chunk)

In [5]:
df = pl.concat(chunks, how="diagonal")
df.shape

(4000000, 21)

In [6]:
df.head()

Id,PostTypeId,CreationDate,Score,ViewCount,Body,OwnerUserId,LastActivityDate,Title,Tags,AnswerCount,CommentCount,ContentLicense,AcceptedAnswerId,LastEditorUserId,LastEditDate,ClosedDate,OwnerDisplayName,FavoriteCount,LastEditorDisplayName,CommunityOwnedDate
str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
"""66489236""","""1""","""2021-03-05T08:35:32.080""","""2""","""261""","""<p>package.json file</p> <pre>…","""15252063""","""2021-03-05T16:00:44.680""","""is it possible to do sql injec…","""|sql|node.js|postgresql|web|gr…","""1""","""0""","""CC BY-SA 4.0""",,,,,,,,
"""66489239""","""1""","""2021-03-05T08:35:53.420""","""0""","""1188""","""<p>I tried to follow as many a…","""1013139""","""2021-03-10T02:57:17.007""","""iOS applink not working on the…","""|ios|deep-linking|applinks|ios…","""1""","""0""","""CC BY-SA 4.0""","""66557704""","""1013139""","""2021-03-08T23:28:52.147""",,,,,
"""66509681""","""1""","""2021-03-06T19:12:58.427""","""0""","""380""","""<p>I'm trying to figure out ho…","""12913047""","""2021-03-06T19:25:49.643""","""How to create utility color cl…","""|css|""","""1""","""2""","""CC BY-SA 4.0""","""66509790""","""12913047""","""2021-03-06T19:16:18.500""","""2021-03-06T19:27:27.250""",,,,
"""66509682""","""1""","""2021-03-06T19:12:59.847""","""1""","""100""","""<p>I'm trying to run an evalua…","""9443671""","""2021-03-06T19:12:59.847""","""Running into troubles with dat…","""|directory|path|bert-language-…","""0""","""1""","""CC BY-SA 4.0""",,,,,,,,
"""66459802""","""1""","""2021-03-03T15:12:43.877""","""0""","""11""","""<p>Hello I have a dataframe su…","""12559770""","""2021-03-03T15:12:43.877""","""Add new column with the colnam…","""|r|dplyr|""","""0""","""4""","""CC BY-SA 4.0""",,,,"""2021-03-03T15:14:14.910""",,,,


In [7]:
df = df.with_columns(
    pl.col("CreationDate").cast(pl.Datetime),
    pl.col("LastActivityDate").cast(pl.Datetime),
)

In [8]:
df.write_parquet("stackoverflow_filtered_data1.parquet")

In [2]:
df = pl.read_parquet("stackoverflow_filtered_data.parquet")
df.shape

(2498557, 22)

In [18]:
df_plot = (
    df.with_columns(pl.col("CreationDate").dt.strftime("%Y-%m").alias("YM"))
    # .filter(pl.col("Tags").str.contains("java"))
    .group_by("YM")
    .agg(pl.len().alias("Count"))
    .sort("YM")
)
df_plot.plot.line(x="YM", y="Count")

In [26]:
df.with_columns(
    pl.col("CreationDate").dt.strftime("%Y-%m").alias("month"),
    pl.col("CreationDate").dt.strftime("%d").alias("day"),
).filter(pl.col("month") == "2024-03")["day"].value_counts()

day,count
str,u32
"""12""",884
"""14""",1723
"""15""",282
"""10""",1087
"""07""",2778
…,…
"""16""",72
"""05""",2351
"""13""",926
"""03""",1196


In [None]:
from pathlib import Path

import polars as pl


def process_xml_in_7z(
    archive_path,
    batch_size=1000,
    start_date=None,
    end_date=None,
    record_tag="row",
    chunk_to_disk=False,
    temp_dir="data/temp/",
):
    """
    Process an XML file within a 7z archive efficiently, filtering for non-empty titles.

    Args:
        archive_path (str): Path to the .7z archive
        batch_size (int): Number of elements to process in each batch
        start_date (str): Optional start date in format 'YYYY-MM-DD'
        end_date (str): Optional end date in format 'YYYY-MM-DD'
        record_tag (str): XML tag name for records to process
        chunk_to_disk (bool): Whether to write intermediate chunks to disk (for very large files)
        temp_dir (str): Directory to store temporary chunk files if chunking is enabled

    Returns:
        pl.DataFrame: Polars DataFrame containing the processed data
    """
    # Convert date strings to datetime objects if provided
    start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
    end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None

    # Get the filename inside the archive
    with py7zr.SevenZipFile(archive_path, mode="r") as archive:
        file_list = archive.getnames()
        if not file_list:
            raise ValueError("No files found in archive")
        xml_filename = "Posts.xml"  # file_list[0]

    # Use 7z command-line tool to pipe the content without extraction
    cmd = ["7z", "e", "-so", archive_path, xml_filename]
    print(f"Executing: {' '.join(cmd)}")

    # Initialize tracking variables
    all_data = []
    total_processed = 0
    total_skipped = 0
    chunk_files = []
    chunk_count = 0
    ordered_columns = set()

    # Create temp directory if chunking is enabled
    if chunk_to_disk:
        Path(temp_dir).mkdir(parents=True, exist_ok=True)

    # Start the extraction process
    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Process XML with iterparse
    context = ET.iterparse(
        process.stdout, events=("end",), tag=record_tag, recover=True, huge_tree=True
    )

    try:
        for _, elem in context:
            # Check for title first (our primary filter)
            title = elem.get("Title")
            if not title or title.strip() == "":
                total_skipped += 1
                elem.clear()
                continue

            # Extract date if we also need date filtering
            date_attr = elem.get("CreationDate")

            # Apply date filtering if needed
            if start_dt or end_dt:
                if date_attr:
                    try:
                        # Parse date for filtering
                        if "T" in date_attr:
                            record_date = datetime.fromisoformat(
                                date_attr.replace("Z", "+00:00")
                            )
                        else:
                            record_date = datetime.strptime(date_attr, "%Y-%m-%d")

                        # Skip if out of date range
                        if (start_dt and record_date.date() < start_dt.date()) or (
                            end_dt and record_date.date() > end_dt.date()
                        ):
                            total_skipped += 1
                            elem.clear()
                            continue
                    except (ValueError, TypeError) as e:
                        print(f"Warning: Invalid date format '{date_attr}', error: {e}")
                        # Continue processing the record anyway since title is the primary filter

            # If we reach here, the record should be included
            # Extract all attributes as strings for consistency
            row_data = {
                k: str(v) if v is not None else None for k, v in elem.attrib.items()
            }
            all_data.append(row_data)

            # Track all columns for consistent schema
            ordered_columns.update(row_data.keys())
            total_processed += 1

            # Clear the element from memory
            elem.clear()
            while elem.getprevious() is not None:
                del elem.getparent()[0]

            # Process in batches
            if len(all_data) >= batch_size:
                if chunk_to_disk:
                    # Create and save dataframe chunk
                    chunk_df = pl.from_dicts(all_data, infer_schema_length=100000)
                    chunk_file = f"{temp_dir}/chunk_{chunk_count}.parquet"
                    chunk_df.write_parquet(chunk_file)
                    chunk_files.append(chunk_file)
                    chunk_count += 1
                    all_data = []  # Free memory
                else:
                    # Keep accumulating in memory if not chunking to disk
                    if total_processed % 10000 == 0:
                        print(
                            f"Processed {total_processed} records, skipped {total_skipped}"
                        )

                if total_processed % 10000 == 0:
                    print(
                        f"Processed {total_processed} records, skipped {total_skipped}"
                    )

    finally:
        # Terminate the subprocess if it's still running
        if process.poll() is None:
            
            process.terminate()

    # Final results processing
    print(f"Completed. Total processed: {total_processed}, skipped: {total_skipped}")

    if chunk_to_disk and chunk_files:
        # Combine all chunks from disk
        dfs = [pl.read_parquet(file) for file in chunk_files]
        if dfs:
            # Combine all DataFrames
            df = pl.concat(dfs, how="diagonal_relaxed")
            # Ensure all columns have same type
            df = df.select([pl.col(col).cast(pl.String) for col in df.columns])
            return df
        else:
            return pl.DataFrame()
    else:
        # Process in-memory data
        if all_data:
            df = pl.from_dicts(all_data, infer_schema_length=100000)
            return df
        else:
            return pl.DataFrame()


def process_stackoverflow_data(
    archive_path,
    output_file="stackoverflow_filtered_data.parquet",
    start_date=None,
    end_date=None,
    batch_size=5000,
    large_file=False,
):
    """
    High-level function to process StackOverflow XML data with non-empty titles.

    Args:
        archive_path (str): Path to the 7z archive containing XML data
        output_file (str): Path to save the output parquet file
        start_date (str): Optional start date filter in 'YYYY-MM-DD' format
        end_date (str): Optional end date filter in 'YYYY-MM-DD' format
        batch_size (int): Batch size for processing
        large_file (bool): If True, use disk-based chunking for very large files

    Returns:
        pl.DataFrame: Processed data with non-empty titles
    """
    print(f"Processing {archive_path} for records with non-empty titles")

    # Process the file
    df = process_xml_in_7z(
        archive_path=archive_path,
        batch_size=batch_size,
        start_date=start_date,
        end_date=end_date,
        chunk_to_disk=large_file,
    )

    if not df.is_empty():
        print(f"Found {len(df)} records with non-empty titles")
        print(f"Columns: {df.columns}")

        # Save to parquet
        df.write_parquet(output_file)
        print(f"Data saved to {output_file}")
    else:
        print("No matching records found")

    return df

In [5]:
df_law = process_stackoverflow_data(
    "data/law.stackexchange.com.7z",
    output_file="data/law/law.parquet",
    batch_size=100000,
    start_date="2021-01-01",
    end_date="2024-12-31",
    large_file=True,
)

Processing data/law.stackexchange.com.7z for records with non-empty titles
Executing: 7z e -so data/law.stackexchange.com.7z Posts.xml
Completed. Total processed: 11325, skipped: 62019
Found 11325 records with non-empty titles
Columns: ['Id', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId', 'LastEditorUserId', 'LastEditDate', 'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'ClosedDate', 'ContentLicense', 'LastEditorDisplayName', 'AcceptedAnswerId', 'OwnerDisplayName', 'FavoriteCount']
Data saved to data/law/law.parquet


In [7]:
df_plot = (
    df_law.with_columns(
        pl.col("CreationDate").cast(pl.Datetime),
        pl.col("LastActivityDate").cast(pl.Datetime),
    )
    .with_columns(pl.col("CreationDate").dt.strftime("%Y-%m").alias("YM"))
    # .filter(pl.col("Tags").str.contains("java"))
    .group_by("YM")
    .agg(pl.len().alias("Count"))
    .sort("YM")
)
df_plot.plot.line(x="YM", y="Count")

In [9]:
df_ac = process_stackoverflow_data(
    "data/academia.stackexchange.com.7z",
    output_file="data/academia/academia.parquet",
    batch_size=100000,
    start_date="2021-01-01",
    end_date="2024-12-31",
    large_file=True,
)

Processing data/academia.stackexchange.com.7z for records with non-empty titles
Executing: 7z e -so data/academia.stackexchange.com.7z Posts.xml
Completed. Total processed: 9992, skipped: 138814
Found 9992 records with non-empty titles
Columns: ['Id', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId', 'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'ContentLicense', 'AcceptedAnswerId', 'LastEditorUserId', 'LastEditDate', 'ClosedDate', 'FavoriteCount', 'OwnerDisplayName', 'LastEditorDisplayName', 'CommunityOwnedDate']
Data saved to data/academia/academia.parquet


In [11]:
df_plot = (
    df_ac.with_columns(
        pl.col("CreationDate").cast(pl.Datetime),
        pl.col("LastActivityDate").cast(pl.Datetime),
    )
    .with_columns(pl.col("CreationDate").dt.strftime("%Y-%m").alias("YM"))
    # .filter(pl.col("Tags").str.contains("java"))
    .group_by("YM")
    .agg(pl.len().alias("Count"))
    .sort("YM")
)
df_plot.plot.line(x="YM", y="Count")

In [None]:
df_ph = process_stackoverflow_data(
    "data/physics.stackexchange.com.7z",
    output_file="data/physics/physics.parquet",
    batch_size=100000,
    start_date="2021-01-01",
    end_date="2024-12-31",
    large_file=True,
)

Processing data/physics.stackexchange.com.7z for records with non-empty titles
Executing: 7z e -so data/physics.stackexchange.com.7z Posts.xml
Completed. Total processed: 62307, skipped: 514994
Found 62307 records with non-empty titles
Columns: ['Id', 'PostTypeId', 'AcceptedAnswerId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId', 'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'ContentLicense', 'LastEditorUserId', 'LastEditDate', 'ClosedDate', 'CommunityOwnedDate', 'OwnerDisplayName', 'FavoriteCount', 'LastEditorDisplayName']
Data saved to data/physics/academia.parquet


In [None]:
df_plot = (
    df_ph.with_columns(
        pl.col("CreationDate").cast(pl.Datetime),
        pl.col("LastActivityDate").cast(pl.Datetime),
    )
    .with_columns(pl.col("CreationDate").dt.strftime("%Y-%m").alias("YM"))
    # .filter(pl.col("Tags").str.contains("java"))
    .group_by("YM")
    .agg(pl.len().alias("Count"))
    .sort("YM")
)
df_plot.plot.line(x="YM", y="Count")

In [18]:
df_ma = process_stackoverflow_data(
    "data/math.stackexchange.com.7z",
    output_file="data/math/math.parquet",
    batch_size=100000,
    start_date="2021-01-01",
    end_date="2024-12-31",
    large_file=True,
)

Processing data/math.stackexchange.com.7z for records with non-empty titles
Executing: 7z e -so data/math.stackexchange.com.7z Posts.xml
Processed 100000 records, skipped 3253522
Processed 200000 records, skipped 3354510
Processed 300000 records, skipped 3450219
Completed. Total processed: 321580, skipped: 3470855
Found 300000 records with non-empty titles
Columns: ['Id', 'PostTypeId', 'AcceptedAnswerId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId', 'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'ContentLicense', 'LastEditorUserId', 'LastEditDate', 'ClosedDate', 'OwnerDisplayName', 'LastEditorDisplayName', 'FavoriteCount', 'CommunityOwnedDate']
Data saved to data/math/math.parquet


In [19]:
df_plot = (
    df_ma.with_columns(
        pl.col("CreationDate").cast(pl.Datetime),
        pl.col("LastActivityDate").cast(pl.Datetime),
    )
    .with_columns(pl.col("CreationDate").dt.strftime("%Y-%m").alias("YM"))
    # .filter(pl.col("Tags").str.contains("java"))
    .group_by("YM")
    .agg(pl.len().alias("Count"))
    .sort("YM")
)
df_plot.plot.line(x="YM", y="Count")

In [5]:
df = process_stackoverflow_data(
    "data/stackoverflow.com-Posts.7z",
    batch_size=100000,
    start_date="2021-01-01",
    end_date="2024-12-31",
    large_file=True,
)

Processing data/stackoverflow.com-Posts.7z for records with non-empty titles
Executing: 7z e -so data/stackoverflow.com-Posts.7z Posts.xml
Processed 100000 records, skipped 50941048
Processed 200000 records, skipped 51071575
Processed 300000 records, skipped 51199892
Processed 400000 records, skipped 51327192
Processed 500000 records, skipped 51455032
Processed 600000 records, skipped 51581951
Processed 700000 records, skipped 51711507
Processed 800000 records, skipped 51841194
Processed 900000 records, skipped 51969489
Processed 1000000 records, skipped 52100047
Processed 1100000 records, skipped 52232351
Processed 1200000 records, skipped 52362887
Processed 1300000 records, skipped 52492746
Processed 1400000 records, skipped 52619123
Processed 1500000 records, skipped 52746017
Processed 1600000 records, skipped 52876885
Processed 1700000 records, skipped 53005148
Processed 1800000 records, skipped 53130551
Processed 1900000 records, skipped 53254833
Processed 2000000 records, skipped

: 