In [2]:
import os
import sys
import re
import pandas as pd
import numpy as np
from enum import Enum
import zipfile
from typing import Optional, Union, List, Callable, Dict, Tuple
from dataclasses import dataclass, make_dataclass, field
from abc import ABC
from importnb import Notebook

PKG_ROOT = os.path.dirname(os.path.realpath(os.getcwd()))
if not PKG_ROOT in sys.path:
    sys.path.append(PKG_ROOT)

In [3]:
with Notebook():
    from src.basestructs import *

In [5]:
class RecordBase(BlockBase):
     
    def __init__(self, segList:List, nameMap: Optional[Dict]=None, template:Optional[bytearray]=None):
        '''
        SegList has the format of [(name, length, datatype)]
        datatype includes: name, which will be mapped to codes in nameMap
                           int
                           str
                           None
        '''
        self.pattern = re.compile(b''.join([f'(?P<{n}>.{{{s}}})'.encode() for (n, s, t) in segList]))
        self.names, self.sizes, self.dtypes = zip(*segList)
        self.sizes = np.array(self.sizes)
        self.sizeMap = dict(zip(self.names, self.sizes))
        self.rangeMap = dict([(n, r) for (n, r) in zip(self.names, [range(*_) for _ in zip(np.concatenate([[0], self.sizes.cumsum()])
                                                                                           ,self.sizes.cumsum())])])
        self.typeMap = dict(zip(self.names, self.dtypes))
        self.size = self.sizes.sum()
        self.nameMap = nameMap
        if template is None:
            self.template = bytearray(b'0' * self.size)
        else:
            self.template = template
            
            
    def parse(self, buffer:bytearray):
        return pd.DataFrame(_.groupdict() for _ in self.pattern.finditer(buffer))
    
    def infer_col(self, col:pd.Series):
        dtype = self.typeMap.get(col.name, None)
        if dtype is None or dtype == 'NAME':
            return col
        if dtype == int:
            return col.apply(self.bytes2Num)
        if dtype == str:
            return col.apply(self.bytes2Str)
        if dtype.upper() == 'INTNAME':
            return col.apply(lambda x: self.nameMap(self.bytes2Num(x)).name)
        if dtype.upper() == 'STRNAME':
            return col.apply(lambda x: self.nameMap(self.bytes2Str(x)).name)
        if dtype.upper() == 'BYTES':
            return col.apply(lambda x: x.decode('latin'))
        
        
    def inverse_infer_col(self, col:pd.Series):
        col = col.apply(lambda x: x.name if hasattr(x, 'name') else x)
        dtype = self.typeMap.get(col.name, None)
        size = self.sizeMap.get(col.name)
        if dtype is None or dtype == 'NAME':
            return col
        if dtype == int:
            return col.apply(lambda x: self.num2Bytes(x, size))
        if dtype == str:
            return col.apply(lambda x: x.encode().ljust(size, b'\x00'))
        if dtype.upper() == 'INTNAME':
            return col.apply(lambda x: self.num2Bytes(getattr(self.nameMap, x).value, size))
        if dtype.upper() == 'STRNAME':
            return col.apply(lambda x: getattr(self.nameMap, x).value.encode().ljust(size, b'\x00'))
        if dtype.upper() == 'BYTES':
            return col.apply(lambda x: x.encode('latin'))
            
    
    def infer(self, df:pd.DataFrame):
        return pd.concat([self.infer_col(col) for name, col in df.iteritems()], axis=1)
    
    def inverse_infer(self, df:pd.DataFrame):
        return pd.concat([self.inverse_infer_col(col) for name, col in df.iteritems()], axis=1)
    
    def make_records(self, values:pd.DataFrame, repeatCol:int=-1):
        '''
        @params values is a dataframe with the last column as the repeats of each row
        ColNames has to match the segment names in the pattern
        '''
        if repeatCol is not None:
            df = values.groupby(values.columns[repeatCol]).apply(lambda x: pd.concat([x] * x.name)).reset_index(drop=True).drop(values.columns[repeatCol], axis=1)
        else:
            df = values
        temp_df = pd.DataFrame([self.pattern.search(self.template).groupdict()] * df.shape[0])
        temp_df.update(df)
        return temp_df

    def make_recordIndex_from_dict(self, values:List[pd.Series], repeats:Optional[Union[int, Tuple, List, np.ndarray]]=1):
        values = [pd.Series(v, name=k) for k, v in values.items()]
        repeats = np.array(repeats).ravel()
        if repeats.size == 1:
            repeats = np.tile(repeats, values[0].size)
        df = pd.DataFrame(values).T
        df = df[[_ for _ in self.names if _ in df.columns]] # make sure the sequence is aligned with pattern segment sequence
        df.insert(df.shape[1], 'REPEATS', repeats)
        return df

In [6]:
class TableBlock(ValueBlock):
    '''
    Create ValueBlock that using a table of values to fill
    refTable includes Kit, Race, Class, Gender, Alignment and RacialEnemy defined in CRETABLE.csv file
    '''
    
    def __init__(self
                ,pbuffer: bytearray
                ,name: str
                ,valueLoc: int
                ,size: int
                ,signed: bool
                ,order: str
                ,refTableName: str
                ,optimize: Optional[int]
                ,refTable: Enum):
        super().__init__(pbuffer, name, valueLoc, size, signed, order, optimize)
        self.refTable = refTable
        
        
    def inc_value(self, inc_value:int=1):
        pass
    
    @property
    def value(self):
        v = super().value
        if v is not None:
            return self.refTable(v).name
        return None
    
    def set_value(self, value:Union[int, Enum, str]):
        if isinstance(value, str):
            for k, v in self.refTable.__members__.items():
                if value.upper() == k:
                    value = v.value
                    break
            else:
                value = None
        super().set_value(value) 

In [7]:
class SegBlock(BlockBase):
    '''
    Base class of subStructures as defined in GAMSEGS.csv and CRESEGS.csv
    Used for dummy subStructures that will not be modified
    '''
    
    def __init__(self
                ,savRef: object
                ,parentRef: object
                ,pbuffer: bytearray
                ,name: str
                ,countLoc: int  #offset pointer to buffer
                ,offsetLoc: int  #count pointer to buffer
                ,sizeLoc: int
                ,sizeValue: Optional[int] = None
                ,countValue: Optional[int] = None
                ,offsetValue: Optional[int] = None):  #only the header segment has no offset value
        self.savRef = savRef
        self.parentRef = parentRef
        self.resourceDir = savRef.resourceDir
        self.name = name
        self.pbuffer = pbuffer
        self.sizeValue = self.to_int(sizeValue)
        self.countValue = self.to_int(countValue)
        self.offsetValue = self.to_int(offsetValue)
        self.offsetBlock = ValueBlock(self.pbuffer, 'OFFSET', self.to_int(offsetLoc), 4)
        self.countBlock = ValueBlock(self.pbuffer, 'COUNT', self.to_int(countLoc), 4)
        self.sizeBlock = ValueBlock(self.pbuffer, 'SIZE', self.to_int(sizeLoc), 4)
        self.buffer = self.pbuffer[self.offsetValue: self.offsetValue + self.sizeValue]
        self.previous = None

            
    def pack(self):
        self.sizeValue = len(self.buffer)
        if self.previous is not None:
            self.offsetValue = self.previous.offsetValue + self.previous.sizeValue
        return self.buffer
    

In [8]:
class RecordsBlock(SegBlock):
    '''
    Parse subStructures in CRE files that's composed of a list of records
    including spell, item, effect, and item
    '''
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.countValue = self.countBlock.value
        self.df = self.Pattern.parse(self.buffer)
        
    @property
    def display(self):
        return self.Pattern.infer(self.df)
        
    def __repr__(self):
        display(self.display)
        return f'{self.__class__} {self.name}'
    
    def add_from_df(self, values:pd.DataFrame, repeatCol=-1, deduplicate_by:List=[]):
        df = self.Pattern.make_records(values, repeatCol=repeatCol)
        if deduplicate_by:
            self.remove({k:v for k, v in df.drop_duplicates().items() if k in deduplicate_by})
        self.df = pd.concat([self.df, df]).reset_index(drop=True)
        self.post_op()
        
    
    def add(self, values:Dict, repeats:Union[int, Tuple, List, np.ndarray]=1, deduplicate_by:List=[]):
        '''
        If duplicated records are not allowed - such as in the case of knownspells, use the deduplicate_by parameter to find existing records having same values in these columns
        remove those record before adding new ones
        '''
        values = self.Pattern.make_recordIndex_from_dict(values, repeats)
        self.add_from_df(values, deduplicate_by=deduplicate_by)
        
        
    def remove(self, values:Dict, remove_n:Optional[Union[int, Tuple, List, np.ndarray]]=None):
        '''
        Remove the first n records that matchs the values, if set to None, remove all
        '''
        iMap = self.Pattern.make_recordIndex_from_dict(values, remove_n)  # sequnces of cols are ensured to be aligned with Pattern segments in the called function
        group = self.df.groupby(iMap.columns[:-1].tolist())
        iMap = iMap.set_index([_ for _ in iMap.columns if _ != 'REPEATS']).to_dict()['REPEATS']
        self.df = group.apply(lambda x: x.head(0 if iMap.get(x.name, 0) is None else max(0, x.shape[0] - iMap.get(x.name, 0)))).reset_index(drop=True)
        self.post_op()
        
        
    def assign(self, values:pd.DataFrame, repeatCol=-1):
        self.df = self.Pattern.inverse_infer(self.Pattern.make_records(values, repeatCol=repeatCol))
        self.post_op()
        
    
    def post_op(self):
        '''
        Defines the actions after record df is modified, such as reset_index
        '''
        self.buffer = bytearray(b''.join(self.df.values.ravel()))
        

    def pack(self):
        self.countValue = self.df.shape[0]
        return super().pack()