In [1]:
import pandas as pd
import os
from tqdm.notebook import tqdm
from collections import defaultdict

pd.set_option("display.width", 500)
pd.set_option("display.max_columns", 100)

In [12]:
# Define the directory containing your CSV files
csv_directory = "../data/"

# Define the output directory for user action files
output_directory = "../data_users/"
os.makedirs(output_directory, exist_ok=True)

# Load user IDs from Parquet files
super_users_df = pd.read_parquet("../data_users/SuperUserIds.parquet")
# non_super_users_df = pd.read_parquet("../data_users/NonSuperUserIds.parquet")

# Convert DataFrames to sets for faster lookup
super_user_ids = set(super_users_df["Id"].astype(int))
# non_super_user_ids = set(non_super_users_df["Id"].astype(int))

# Mapping of CSV files to user ID columns and action types
file_mappings = {
    "Votes.csv": {
        "user_id_column": "UserId",
        "action_type_column": "VoteTypeId",
        "action_type_map": {2: "UpVote", 3: "DownVote"},
        "date_column": "CreationDate",
    },
    "Posts.csv": {
        "user_id_column": "OwnerUserId",
        "action_type_column": "PostTypeId",
        "action_type_map": {1: "Question", 2: "Answer"},
        "date_column": "CreationDate",
    },
    "Comments.csv": {
        "user_id_column": "UserId",
        "action_type": "Comment",
        "date_column": "CreationDate",
    },
    "Badges.csv": {
        "user_id_column": "UserId",
        "action_type": "Badge",
        "date_column": "Date",
    },
}

# Define the columns to extract for each file
columns_mapping = {
    "Posts.csv": ["OwnerUserId", "CreationDate", "PostTypeId"],
    "Comments.csv": ["UserId", "CreationDate"],
    "Votes.csv": ["UserId", "CreationDate", "VoteTypeId"],
    "Badges.csv": ["UserId", "Date"],
}

In [19]:
def initialize_output_files(output_dir):
    """
    Initialize consolidated output files with headers.
    """
    super_output_path = os.path.join(output_dir, "super_users_actions.csv")
    non_super_output_path = os.path.join(output_dir, "non_super_users_actions.csv")
    
    if not os.path.exists(super_output_path):
        with open(super_output_path, 'w', encoding='utf-8') as f:
            f.write("UserId,ActionType,CreationDate\n")
    
    if not os.path.exists(non_super_output_path):
        with open(non_super_output_path, 'w', encoding='utf-8') as f:
            f.write("UserId,ActionType,CreationDate\n")

def process_file(file_name, mapping, columns, user_ids, output_file, chunksize=10**6):
    """
    Process a single CSV file in chunks, filter actions for target users,
    and append them to the consolidated output file.
    """
    print(mapping)
    file_path = os.path.join(csv_directory, file_name)

    print(f"Processing {file_name}...")
    date_column = mapping.get('date_column', 'CreationDate')

    # Read the CSV in chunks
    try:
        csv_iterator = pd.read_csv(
            file_path,
            chunksize=chunksize,
            usecols=columns,
            parse_dates=[date_column],
            iterator=True,
            delimiter='\x17',
        )
    except FileNotFoundError:
        print(f"File {file_name} not found in {csv_directory}. Skipping.")
        return
    except Exception as e:
        print(f"Error reading {file_name}: {e}")
        return

    action_type_column = mapping.get('action_type_column')
    action_type_map = mapping.get('action_type_map', {})
    default_action_type = mapping.get('action_type', 'Action')

    # calculate number of chunks for tqdm
    # num_chunks = sum(1 for _ in pd.read_csv(file_path, chunksize=chunksize, delimiter='\x17'))

    print(f"Filtering actions for {len(user_ids)} target users...")
    # Iterate over each chunk
    for chunk in tqdm(csv_iterator, desc=f"Processing {file_name}", unit="chunk"):
        # Drop rows with missing user IDs or date
        chunk.dropna(subset=[mapping['user_id_column'], date_column], inplace=True)

        # Ensure user ID is of integer type if necessary
        chunk[mapping['user_id_column']] = chunk[mapping['user_id_column']].astype(int)

        # print(chunk)

        # Filter rows where the user ID is in target_user_ids
        filtered_chunk = chunk[chunk[mapping['user_id_column']].isin(user_ids)]

        if filtered_chunk.empty:
            continue  # Skip if no relevant actions in this chunk

        # Prepare action records
        records = []
        for _, row in filtered_chunk.iterrows():
            # print(row)
            user_id = row[mapping['user_id_column']]
            creation_date = row[date_column].isoformat()

            if action_type_column and row[action_type_column] in action_type_map:
                action_type = action_type_map[row[action_type_column]]
            else:
                action_type = default_action_type

            records.append(f"{user_id},{action_type},{creation_date}\n")

        # Append records to the consolidated output file
        with open(output_file, 'a', encoding='utf-8') as f:
            f.writelines(records)

    print(f"Finished processing {file_name}.")

In [None]:
# Initialize consolidated output files
initialize_output_files(output_directory)

# Define output file paths
super_output_path = os.path.join(output_directory, "super_users_actions.csv")

# remove the output paths if they already exist
if os.path.exists(super_output_path):
    os.remove(super_output_path)

# Process files for Super Users
for file_name, mapping in file_mappings.items():
    columns = columns_mapping.get(file_name, [mapping['user_id_column'], 'CreationDate'])
    process_file(
        file_name=file_name,
        mapping=mapping,
        columns=columns,
        user_ids=super_user_ids,
        output_file=super_output_path
    )

{'user_id_column': 'UserId', 'action_type_column': 'VoteTypeId', 'action_type_map': {2: 'UpVote', 3: 'DownVote'}, 'date_column': 'CreationDate'}
Processing Votes.csv...
Filtering actions for 22076 target users...


Processing Votes.csv: 0chunk [00:00, ?chunk/s]

Finished processing Votes.csv.
{'user_id_column': 'OwnerUserId', 'action_type_column': 'PostTypeId', 'action_type_map': {1: 'Question', 2: 'Answer'}, 'date_column': 'CreationDate'}
Processing Posts.csv...
Filtering actions for 22076 target users...


Processing Posts.csv: 0chunk [00:00, ?chunk/s]

Finished processing Posts.csv.
{'user_id_column': 'UserId', 'action_type': 'Comment', 'date_column': 'CreationDate'}
Processing Comments.csv...
Filtering actions for 22076 target users...


Processing Comments.csv: 0chunk [00:00, ?chunk/s]

Finished processing Comments.csv.
{'user_id_column': 'UserId', 'action_type': 'Badge', 'date_column': 'Date'}
Processing Badges.csv...
Filtering actions for 22076 target users...


Processing Badges.csv: 0chunk [00:00, ?chunk/s]

Finished processing Badges.csv.


In [None]:
non_super_sample = pd.read_parquet("../data_users/NonSuperUserIdsSample.parquet")
non_super_user_ids = set(non_super_sample["Id"].astype(int))

non_super_output_path = os.path.join(output_directory, "non_super_users_actions.csv")

if os.path.exists(non_super_output_path):
    os.remove(non_super_output_path)
    
# Process files for Non-Super Users
for file_name, mapping in file_mappings.items():
    columns = columns_mapping.get(file_name, [mapping['user_id_column'], 'CreationDate'])
    process_file(
        file_name=file_name,
        mapping=mapping,
        columns=columns,
        user_ids=non_super_user_ids,
        output_file=non_super_output_path
    )

{'user_id_column': 'UserId', 'action_type_column': 'VoteTypeId', 'action_type_map': {2: 'UpVote', 3: 'DownVote'}, 'date_column': 'CreationDate'}
Processing Votes.csv...
Filtering actions for 220533 target users...


Processing Votes.csv: 0chunk [00:00, ?chunk/s]