### Ideas

- Create classes to represent Data, Metadata, and Dataset? (dataset being the combination of the two, with fns which link them; ie. for cross-checking the type defined in meta is what the data type actually is)
- For Meta, can be read in different formats, and then exported in different formats
  - For dataset, meta must be a particular format

- Filter out rows with all nulls

In [None]:
#| default_exp read

In [None]:
#| hide
from nbdev.showdoc import *
from nbdev import nbdev_export

In [None]:
#| export
import pandas as pd
import polars as pl
from polars.testing import assert_frame_equal
import numpy as np
import pyreadstat
import pyspssio
from pathlib import Path
from typing import Optional, Any

from fastcore.utils import *
from fastcore.test import *

from pydantic import BaseModel
from pydantic.dataclasses import dataclass
# from pandera import ...

In [None]:
RAW_DATA = Path("../data/raw")
PROCESSED_DATA = Path("../data/processed")
df, meta = pyspssio.read_sav(RAW_DATA/"G214_PQ.sav")
df = pl.from_pandas(df)

In [None]:
#|export
# TODO: convert to Pydantic BaseModel (ensure that variable types are one of "nominal", "scale" or "ordinal")
@dataclass
class Metadata:
    variable_basename: str
    label: str
    field_values: dict[int, str]
    field_type: str
    field_width: int
    decimals: int
    variable_type: str

Test this works as expected

In [None]:
Metadata(
    variable_basename = "PN17",
    label = "Ever had back pain",
    field_values = {-99: "Missing", 0: "No", 1: "Yes"},
    field_type = "Numeric",
    field_width = 3,
    decimals =  0,
    variable_type = "Nominal"
)

Metadata(variable_basename='PN17', label='Ever had back pain', field_values={-99: 'Missing', 0: 'No', 1: 'Yes'}, field_type='Numeric', field_width=3, decimals=0, variable_type='Nominal')

In [None]:
#|export
def unpack_variable_types(input_dict):
    "..."
    field_type = {}
    decimals = {}
    
    for key, value in input_dict.items():
        if value.startswith('F'):
            field_type[key] = 'Numeric'
            dec = value.split('.')[1]
            decimals[key] = int(dec) if dec != '0' else 0
        elif value.startswith('DATE'):
            field_type[key] = 'Date'
            decimals[key] = 0
        elif value.startswith('A'):
            field_type[key] = 'String'
            decimals[key] = 0
    
    return field_type, decimals

def reformat_metadata(m: pyreadstat.metadata_container, # metadata from pyreadstat
                     ) -> dict[dict[str, Any]]:
    "Reformat metadata into a more readable and consistent format"
    field_type, decimals = unpack_variable_types(m.original_variable_types)
    metadata = {
        "Label": m.column_names_to_labels,
        "Field Type": field_type,
        "Field Width": m.variable_display_width,
        "Decimals": decimals,
        "Variable Type": m.variable_measure,
        "Field Values": m.variable_value_labels
    }
    return metadata

TODO: add Data and Metadata classes, and nest them in the Dataset class

In [None]:
#| export
def read_sav(data_dir: str|Path, 
             file: str, 
             cols: Optional[list[str]] = None
             ) -> pl.LazyFrame:
    df, meta = pyreadstat.read_sav(f"{data_dir}/{file}", usecols=cols)
    df = pl.from_pandas(df).lazy()
    return df, meta

In [None]:
#|export
@dataclass
class Dataset:
    file: str
    data_dir: str | Path
    prefix: Optional[str] = None
    variables: Optional[list[str]] = None

    def load_data(self) -> tuple[pl.LazyFrame, dict[dict[str, Any]]]:
        "Output data and metadata."
        df, meta = read_sav(self.data_dir, self.file, self.variables)
        meta = reformat_metadata(meta)
        return df, meta
    
        # def strip_prefix(self, df: pl.LazyFrame) -> pl.DataFrame:
    #     stripped_columns = {col: col.replace(self.prefix, "") for col in self.variables}
    #     df = df.rename(stripped_columns)
    #     return df

In [None]:
df, meta = Dataset("G214_PQ.sav", RAW_DATA).load_data()

In [None]:
datasets = [
    Dataset("G214_PQ.sav", RAW_DATA, "G214_PQ_", ["ID", "G214_PQ_PN17", "G214_PQ_PN25", "G214_PQ_PN34", "G214_PQ_PN35", "G214_PQ_PN36"]),
    Dataset("G214_SQ.sav", RAW_DATA, "G214_SQ_", ["ID", "G214_SQ_PN17", "G214_SQ_PN25", "G214_SQ_PN34", "G214_SQ_PN35", "G214_SQ_PN36"]),
    Dataset("G217_PQ.sav", RAW_DATA, "G217_PQ_", ["ID", "G217_PQ_PN17", "G217_PQ_PN25", "G217_PQ_PN34", "G217_PQ_PN35", "G217_PQ_PN36", "G217_PQ_PN38", "G217_PQ_PN9"]),
    Dataset("G217_SQ.sav", RAW_DATA, "G217_SQ_", ["ID", "G217_SQ_PN17", "G217_SQ_PN25", "G217_SQ_PN34", "G217_SQ_PN35", "G217_SQ_PN36", "G217_SQ_PN38", "G217_SQ_PN9"])
]

In [None]:
#|export
def read_and_filter_data(datasets: list[Dataset]
                         ) -> list[pl.LazyFrame]:
    """
    Take a list of `Dataset`s and return a list of the respective dataframes 
    and metadata for each dataset, filtered for the given columns.
    """
    dataframes = []
    metadata = []
    for ds in datasets:
        df, meta = ds.load_data()
        dataframes.append(df)
        metadata.append(meta)
    return dataframes, metadata

def combine_dataframes(dataframes: list[pl.LazyFrame]
                       ) -> pl.LazyFrame:
    "Take a list of dataframes and return a single, combined dataframe."
    combined_df = dataframes[0]
    for df in dataframes[1:]:
        combined_df = combined_df.join(df, on="ID", how="full", coalesce=True)
    return combined_df

In [None]:
dataframes, metadata = read_and_filter_data(datasets)
df = combine_dataframes(dataframes)

In [None]:
#| export
from collections import defaultdict

In [None]:
#| export
def merge_dictionaries(dicts: list[dict[str, Any]]
                       ) -> dict[str, Any]:
    "Merge a series of nested dictionaries."
    merged_dict = defaultdict(dict)
    for d in dicts:
        for key, nested_dict in d.items():
            for nested_key, value in nested_dict.items():
                if nested_key not in merged_dict[key]:
                    merged_dict[key][nested_key] = value
                elif isinstance(value, dict):
                    merged_dict[key][nested_key].update(value)
    return dict(merged_dict)

In [None]:
meta_merged = merge_dictionaries(metadata)

## Writing Files

In [None]:
#| export
def convert_metadata_list_to_dict(metadata: list[Metadata], # list of Metadata objects
                                  p: str # Prefix for dataset
                                  ) -> dict[dict[str, Any]]:
    """
    Take a list of Metadata objects and convert them into the SPSS format
    with parameters as parents and variables as children, in nested dictionaries.
    """
    converted_metadata = {
        "Label": {p + m.variable_basename: m.label for m in metadata},
        "Field Values": {p + m.variable_basename: m.field_values for m in metadata},
        "Field Type": {p + m.variable_basename: m.field_type for m in metadata},
        "Field Width": {p + m.variable_basename: m.field_width for m in metadata},
        "Decimals": {p + m.variable_basename: m.decimals for m in metadata},
        "Variable Type": {p + m.variable_basename: m.variable_type for m in metadata},
    }
    return converted_metadata

In [None]:
#|export
def pack_variable_types(m: dict[dict[str, Any]] # metadata in nested dictionary format
                        ) -> dict[str, str]:
    """
    Convert metadata parameters related to variable format 
    into an appropriate schema for pyreadstat.
    """
    field_type = m["Field Type"]
    field_width = m["Field Width"]
    decimals = m["Decimals"]
    
    # Verify that variables are identical across each metadata parameter
    assert field_type.keys() == field_width.keys() == decimals.keys(), "Not all variables match."

    result = _pack_variable_types(field_type, field_width, decimals)
    return result
    
def _pack_variable_types(field_type: dict[str, str],
                         field_width: dict[str, int],
                         decimals: dict[str, int]
                         ) -> dict[str, str]:
    """Private function to perform the logic for `pack_variable_types`."""
    d = {}

    field_type_map = {
        "Numeric": "F",
        "String": "A",
        "Date": "DATE"
    }

    for key in field_type:
        f_type = field_type_map[field_type[key]]
        f_width = field_width[key]
        dec = decimals[key]
        
        if field_type[key] == "Numeric" and dec > 0:
            d[key] = f"{f_type}{f_width}.{dec}"
        else:
            d[key] = f"{f_type}{f_width}"

    return d

In [None]:
ft = {
    'G214_PQ_PN17': 'Numeric',
    'G214_PQ_DNWN': 'Date',
    'G214_PQ_HOC1': 'String'
}

fw = {
    'G214_PQ_PN17': 3,
    'G214_PQ_DNWN': 8,
    'G214_PQ_HOC1': 8
}

d = {
    'G214_PQ_PN17': 0,
    'G214_PQ_DNWN': 0,
    'G214_PQ_HOC1': 0
}

m = {
    "Field Type": ft,
    "Field Width": fw,
    "Decimals": d
}

expected_output = {
    'G214_PQ_PN17': "F3",
    'G214_PQ_DNWN': "DATE8",
    'G214_PQ_HOC1': "A8"
}

test_eq(pack_variable_types(m), expected_output)

In [None]:
#| export
# TODO: could be actual Data and Metadata class/types
def write_sav(dst_path: str|Path, # path to save output file
              df: pl.LazyFrame, # raw data
              metadata: dict[str, dict[str, Any]] # corresponding metadata
              ) -> None:
    """Save dataset to SPSS using `pyreadstat` library."""
    # Convert 
    df = df.collect().to_pandas()

    pyreadstat.write_sav(
        df, 
        dst_path,
        column_labels=metadata["Label"],
        variable_value_labels=metadata["Field Values"],
        variable_display_width=metadata["Field Width"],
        variable_measure=metadata["Variable Type"], # TODO: convert to all lowercase
        variable_format=pack_variable_types(metadata)
    )

Verify that the custom SPSS writing function makes no unintended changes.

In [None]:
# Read dataset
G214_PQ = Dataset("G214_PQ.sav", RAW_DATA)
df1, meta1 = G214_PQ.load_data()

# Write the dataset unchanged to a new file
write_sav(PROCESSED_DATA/"G214_PQ.sav", df1, meta1)

# Read that newly saved file
output = Dataset("G214_PQ.sav", PROCESSED_DATA)
df2, meta2 = output.load_data()

# Compare for both data and metadata to verify no unintended changes have been introduced
assert_frame_equal(df1, df2)
test_eq(meta1, meta2)

In [None]:
#| hide
nbdev_export()