In [18]:
import json
import re
from datetime import datetime, timezone
import os

In [19]:
class EventMapper:
    def __init__(self, action_mapping):
        self.action_mapping = action_mapping

    def deserialize_payload(self, event_record):
        """Deserializes the 'payload' field of the event record if it's a string."""
        event_record['payload'] = json.loads(event_record['payload'])
        return event_record
    
    def convert_date_to_iso(self, event_record):
        """Converts 'date' field from Unix timestamp to ISO 8601 format."""
        event_record['created_at'] = datetime.fromtimestamp(event_record['created_at']/1000, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
        return event_record

    def match_condition(self, event_value, mapping_value):
        """Matches an event value against a mapping value, supporting regex and nested matching."""
        if isinstance(mapping_value, dict):
            return all(self.match_condition(event_value.get(k), v) for k, v in mapping_value.items() if k in event_value)
        
        if isinstance(mapping_value, list):
            return all(self.match_condition(ev, mv) for ev, mv in zip(event_value, mapping_value)) if event_value else False

        if isinstance(mapping_value, str) and mapping_value.startswith('^') and mapping_value.endswith('$'):
            return bool(re.match(mapping_value, event_value))
        
        return event_value == mapping_value

    def map_event_to_action(self, event_record):
        """Maps an event to a high-level action based on event type and attributes."""
        event_record = self.deserialize_payload(event_record)
        event_record = self.convert_date_to_iso(event_record)
        event_type = event_record.get('type')

        for action_name, action_details in self.action_mapping['actions'].items():
            if event_type == action_details['event']['type'] and \
               all(self.match_condition(self.extract_field(event_record, k), v) 
                   for k, v in action_details['event'].items() if k != 'type'):
                return self.extract_attributes(event_record, action_details, action_name)

        return self.extract_attributes(event_record, self.action_mapping['actions']['UnknownAction'], 'UnknownAction')

    def extract_attributes(self, event_record, action_details, action_name):
        """Extracts attributes and common fields from the event record."""
        mapped_action = {'action': action_name}

        if action_details['attributes'].get('include_common_fields'):
            mapped_action.update(self.extract_fields(event_record, self.action_mapping['common_fields']))

        mapped_action['details'] = self.extract_fields(event_record, action_details['attributes'].get('details', {}))

        return mapped_action

    def extract_fields(self, event_record, field_mapping):
        """Extracts values based on the provided mapping, handling nested dictionaries and lists."""
        extracted_data = {}

        for field_key, mapping_value in field_mapping.items():
            if isinstance(mapping_value, dict):
                extracted_data[field_key] = self.extract_fields(event_record, mapping_value)
            elif isinstance(mapping_value, list):
                extracted_data[field_key] = self.extract_list(event_record, mapping_value)
            else:
                extracted_data[field_key] = self.extract_field(event_record, mapping_value)

        return extracted_data

    def extract_list(self, event_record, list_mapping):
        """Handles extracting lists of values from the event record."""
        base_list = self.extract_field(event_record, list_mapping[0][list(list_mapping[0].keys())[0]].split('.')[0:-1])
        if not isinstance(base_list, list):
            return []

        return [{key: item.get(path.split('.')[-1], None) for key, path in list_mapping[0].items()} for item in base_list]

    def extract_field(self, event_record, field_path):
        """Extracts a value from the event record using a dotted field path."""
        keys = field_path.split('.') if isinstance(field_path, str) else field_path
        value = event_record
        for key in keys:
            if isinstance(value, list):
                return value
            value = value.get(key)
            if value is None:
                return None

        return value

In [20]:
def load_json_file(file_path):
    """Loads a JSON file from the provided file path."""
    with open(file_path, 'r') as file:
        return json.load(file)

In [28]:
def process_event_files(input_folder, output_folder, combined_output_file, mapping_file):
    """Reads event data, processes them in chronological order, and saves the resulting actions into individual JSON Lines files and a single combined JSON Lines file."""

    action_mapping = load_json_file(mapping_file)
    event_mapper = EventMapper(action_mapping)

    # Get and sort event files by filename (chronological order based on YYYYMM format)
    event_files = sorted(
        [f for f in os.listdir(input_folder) if f.startswith('gh_events') and f.endswith('.json')]
    )

    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Open the combined output file in write mode to append each line
    with open(combined_output_file, 'w') as combined_file:
        for event_file in event_files:
            # Extract year-month from the event filename (e.g., gh_events_202301.json -> 202301)
            year_month = event_file.split('_')[2].split('.')[0]

            input_file_path = os.path.join(input_folder, event_file)
            event_records = load_json_file(input_file_path)

            # Map events to actions
            mapped_actions = [event_mapper.map_event_to_action(event_record) for event_record in event_records]

            # Write each action to the individual JSON Lines file
            output_file_path = os.path.join(output_folder, f"gh_actions_{year_month}.jsonl")
            with open(output_file_path, 'w') as monthly_file:
                for action in mapped_actions:
                    json.dump(action, monthly_file)
                    monthly_file.write('\n')  # JSON Lines format requires a newline after each record

                    # Also write to the combined file
                    json.dump(action, combined_file)
                    combined_file.write('\n')

            print(f"Processed {event_file} -> {output_file_path}")

    print(f"All processed actions saved to {combined_output_file} in JSON Lines format.")

In [31]:
input_folder = '../data/datasets/pre-processed-v2'
output_folder = '../data/datasets/actions/monthly'
combined_output_file = '../data/datasets/actions/gh_all_actions.jsonl'
mapping_file = '../data/mapping/event-action-mapping.json'

process_event_files(input_folder, output_folder, combined_output_file, mapping_file)

Processed gh_events_202301.json -> ../data/datasets/actions/monthly/gh_actions_202301.jsonl
Processed gh_events_202302.json -> ../data/datasets/actions/monthly/gh_actions_202302.jsonl
Processed gh_events_202303.json -> ../data/datasets/actions/monthly/gh_actions_202303.jsonl
Processed gh_events_202304.json -> ../data/datasets/actions/monthly/gh_actions_202304.jsonl
Processed gh_events_202305.json -> ../data/datasets/actions/monthly/gh_actions_202305.jsonl
Processed gh_events_202306.json -> ../data/datasets/actions/monthly/gh_actions_202306.jsonl
Processed gh_events_202307.json -> ../data/datasets/actions/monthly/gh_actions_202307.jsonl
Processed gh_events_202308.json -> ../data/datasets/actions/monthly/gh_actions_202308.jsonl
Processed gh_events_202309.json -> ../data/datasets/actions/monthly/gh_actions_202309.jsonl
Processed gh_events_202310.json -> ../data/datasets/actions/monthly/gh_actions_202310.jsonl
Processed gh_events_202311.json -> ../data/datasets/actions/monthly/gh_actions_2