## 01b - Data preparation: Edgelists & Analytical sample

- **Project:** _Families, households, networks: Rethinking the relational structure of families through large-scale network data_ <br>
- **Authors:** Nicol√°s Soler (ORCID 0009-0001-4239-9396), Tom Emery, Agnieszka Kanas <br>
- **Last updated:** January 2026 <br>
- **Full research article published in journal:** _Demography_ (2026)

In [None]:
import yaml
import polars as pl

In [None]:
# Load YAML configuration
path_config = 'config.yml'
with open(path_config, 'r') as f:
    config = yaml.safe_load(f)

In [None]:
# Read spine file
dtypes_spine = {
    "RINPERSOON":pl.String,
    "gender_female":pl.Int64,
    "birth_date":pl.Date,
    "death_date":pl.Date,
    "age":pl.Int64,
    "SOORTOBJECTNUMMER":pl.String,
    "RINOBJECTNUMMER":pl.String,
    "gemeente":pl.String,
    "wijk":pl.String,
    "buurt":pl.String
}

spine = pl.scan_csv(config["data"]["spine"], separator=",", encoding="utf8", schema_overrides=dtypes_spine).select(dtypes_spine.keys()).collect()

# 1 - Prepare family edgelists

## 1.1 - Tidy FAMILIENETWERKTAB

In [None]:
# Read FAMILIENETWERKTAB

dtypes_network = {
    "RINPERSOON":pl.String,
    "RINPERSOONRELATIE":pl.String,
    "RELATIE":pl.Int64
}

famnet = (
    pl
    .scan_csv(config["data"]["net_family"],
             separator=";",
             schema_overrides=dtypes_network)
    .select(["RINPERSOON","RINPERSOONRELATIE","RELATIE"])
    # Ensure that egos and alters are in spine
    .join(spine.lazy(), how="semi", on="RINPERSOON")
    .join(spine.lazy(), how="semi", left_on="RINPERSOONRELATIE", right_on="RINPERSOON")
    # Ensure there are no self-edges
    .filter(pl.col("RINPERSOON")!=pl.col("RINPERSOONRELATIE"))
    .collect()
)

In [None]:
# Age check (grand)parent-(grand)child edges in FAMILIENETWERKTAB

# Calculate age difference between ego and alter
famnet = (
    famnet
    # Join ego age
    .join(spine[["RINPERSOON","age"]], how="left", on="RINPERSOON")
    .rename({"age":"age_ego"})
    # Join alter age
    .join(spine[["RINPERSOON","age"]], how="left", left_on="RINPERSOONRELATIE", right_on="RINPERSOON")
    .rename({"age":"age_alter"})
    # Calculate difference
    .with_columns(age_diff = pl.col("age_ego") - pl.col("age_alter"))
)

# Check age in parent-child edges
print(len(famnet.filter(pl.col("RELATIE")==301)))
edges_pc = (
    famnet
    .filter(pl.col("RELATIE")==301)
    .filter(pl.col("age_diff")<-10)
)
print(len(edges_pc))

# Check age in child-parent edges
print(len(famnet.filter(pl.col("RELATIE")==304)))
edges_cp = (
    famnet
    .filter(pl.col("RELATIE")==304)
    .filter(pl.col("age_diff")>10)
)
print(len(edges_cp))

# Check age in grandparent-grandchild edges
print(len(famnet.filter(pl.col("RELATIE")==303)))
edges_gpgc = (
    famnet
    .filter(pl.col("RELATIE")==303)
    .filter(pl.col("age_diff")<-10)
)
print(len(edges_gpgc))

# Check age in grandchild-grandparent edges
print(len(famnet.filter(pl.col("RELATIE")==305)))
edges_gcgp = (
    famnet
    .filter(pl.col("RELATIE")==305)
    .filter(pl.col("age_diff")>10)
)
print(len(edges_gpgc))

# Subset all other edges and concatenate clean edges
fam_full = (
    famnet
    .filter(~pl.col("RELATIE").is_in([301,303,304,305]))
)
fam_full = pl.concat([fam_full, edges_pc, edges_cp, edges_gpgc, edges_gcgp])
fam_full = fam_full.select(["RINPERSOON","RINPERSOONRELATIE","RELATIE"])

## 1.2 - Create new edgelists

In [None]:
# Create great-grandparent edges
edges_ggp = (
    fam_full
    .filter(pl.col("RELATIE")==303)
    .drop(pl.col("RELATIE"))
    .rename({"RINPERSOONRELATIE":"GRANDPARENT"})
    .join(fam_full.filter(pl.col("RELATIE")==301).rename({"RINPERSOON":"GRANDPARENT"}), how="left", on="GRANDPARENT")
    .drop(pl.col("GRANDPARENT"))
    .with_columns(RELATIE=400)
    .cast({"RELATIE":pl.Int64})
    .drop_nulls()
)

# Age check
edges_ggp = (
    edges_ggp
    # Join ego age
    .join(spine[["RINPERSOON","age"]], how="left", on="RINPERSOON")
    .rename({"age":"age_ego"})
    # Join alter age
    .join(spine[["RINPERSOON","age"]], how="left", left_on="RINPERSOONRELATIE", right_on="RINPERSOON")
    .rename({"age":"age_alter"})
    # Calculate difference
    .with_columns(age_diff = pl.col("age_ego") - pl.col("age_alter"))
    # Age check
    .filter(pl.col("age_diff")<-10)
    # Drop columns
    .select(["RINPERSOON","RINPERSOONRELATIE","RELATIE"])
)

fam_full = pl.concat([fam_full, edges_ggp])

In [None]:
# Flatten multiplex edges according to hierarchy

# Dict to recode relationship labels
relatie_order = [
    "304","301","312","313","306","307","308","305","303","400","317","318",
    "319","314","315","316","309","310","311","320","321","322","302"
]

# Flatten edges
fam_uniplex = (
    fam_full
    # Order RELATIE
    .with_columns(
        RELATIE = (
            pl.col("RELATIE")
            # Order categories
            .cast(pl.String)
            .cast(pl.Enum(relatie_order))
        )
    )
    # Sort
    .sort(["RINPERSOON","RINPERSOONRELATIE","RELATIE"])
    # Keep only first edge per ego-alter pair
    .unique(subset=["RINPERSOON","RINPERSOONRELATIE"], keep="first")
    # Cast RELATIE back to integer
    # .cast({"RELATIE":pl.Int64})
)

In [None]:
# Create flat edgelist with only p-c and partner edges
fam_core = (
    fam_full
    .filter(pl.col("RELATIE").is_in([301,304,312,313]))
    .drop(pl.col("RELATIE"))
    .unique(keep="any")
)

In [None]:
# Store files
fam_full.write_csv(config["data"]["edges_fam_full"], separator=",", line_terminator="\n")
fam_uniplex.write_csv(config["data"]["edges_fam_uniplex"], separator=",", line_terminator="\n")
fam_core.write_csv(config["data"]["edges_fam_core"], separator=",", line_terminator="\n")

In [None]:
# Subset youngest child edge to identify mothers later
fam_child = (
    fam_full
    # Subset child edges
    .filter(pl.col("RELATIE")==304)
    # Join age to identify children aged <=4
    .join(spine.select(["RINPERSOON","age"]), how="left", left_on="RINPERSOONRELATIE", right_on="RINPERSOON")
    # Keep only edges to children aged <=4
    .filter(pl.col("age")<5)
)

In [None]:
# Free memory
del famnet, fam_full, fam_uniplex, fam_core, edges_pc, edges_cp, edges_gpgc, edges_gcgp, edges_ggp

# 2 - Tidy household edgelist

In [None]:
# Read HUISGENOTENNETWERKTAB
housemates_full = (
    pl
    .scan_csv(config["data"]["net_household"],
             separator=";",
             schema_overrides=dtypes_network)
    .select(["RINPERSOON","RINPERSOONRELATIE","RELATIE"])
    # Subset relationship types
    .filter(pl.col("RELATIE")==401)
    # Ensure that egos and alters are in spine
    .join(spine.lazy(), how="semi", on="RINPERSOON")
    .join(spine.lazy(), how="semi", left_on="RINPERSOONRELATIE", right_on="RINPERSOON")
    # Ensure there are no self-edges
    .filter(pl.col("RINPERSOON")!=pl.col("RINPERSOONRELATIE"))
    # Drop RELATIE
    .select(["RINPERSOON","RINPERSOONRELATIE"])
    .collect()
)

In [None]:
# Store files
housemates_full.write_csv(config["data"]["edges_house_full"], separator=",", line_terminator="\n")

# 3 - Household ids and size

## 3.1 - Create household ids

In [None]:
# Get all housemates of each person
housemate_list = (
    housemates_full
    .lazy()
    # Get all housemates of each person
    .group_by("RINPERSOON", maintain_order=True)
    .agg(pl.col("RINPERSOONRELATIE"))
    .rename({"RINPERSOONRELATIE":"housemates"})
    .collect()
)

In [None]:
# Identify household of each individual
hhd_ids = (
    housemate_list
    .lazy()
    # Add the ego to the list of housemates to get all household members
    # and sort the lists so that later we can keep only one row per household
    .with_columns(members=pl.col("housemates").list.concat(pl.col("RINPERSOON")).list.sort())
    .select("members")
    # From list to string to check unique households
    .with_columns(members=pl.col("members").list.join(";"))
    # Keep only one instance of each unique household
    .group_by("members")
    .first()
    # Add household id
    .with_row_index(name="id_hhd", offset=1)
    # Rework str into list
    .with_columns(members=pl.col("members").str.split(by = ";"))
    # Explode to rows
    .explode("members")
    # Rename
    .rename({"members":"RINPERSOON"})
    .select(["RINPERSOON","id_hhd"])
    .sort("RINPERSOON")
    .collect()
)

In [None]:
# Add single person households

# Identify RINPERSOONs in the spine that are not in the household RINPERSOONs
# and give them a household id
hhd_ids_single = (
    spine
    .select("RINPERSOON")
    # Identify those missing from household rins
    .join(hhd_ids, how="anti", on="RINPERSOON")
    # Sort
    .sort("RINPERSOON")
    # Continue household id based on household df
    .with_row_index(name = "id_hhd", offset = (hhd_ids["id_hhd"].max() + 1))
    .select(["RINPERSOON","id_hhd"])
)

# Concatenate dfs
hhd_ids = pl.concat([hhd_ids, hhd_ids_single]).sort("RINPERSOON")

In [None]:
# Join to spine
spine = spine.join(hhd_ids, how="left", on="RINPERSOON")

In [None]:
# Checks
print(len(hhd_ids))
print(len(hhd_ids.group_by("id_hhd").first()))
print(len(hhd_ids) / len(hhd_ids.group_by("id_hhd").first()))
print(hhd_ids.group_by("RINPERSOON").len().max().select("len")<2)

## 3.2 - Household size

In [None]:
# Calculate household size from household edgelist
hhd_size = hhd_ids.group_by("id_hhd").len().rename({"len": "hhd_size"})

# Join to spine
spine = spine.join(hhd_size, how="left", on="id_hhd")

# 4 - Identify egos

## 4.1 - Women aged 18-40 years old

In [None]:
# Flag women aged 18-40
spine = (
    spine
    .with_columns(pl.when((pl.col("gender_female")==1) & (pl.col("age")>=18) & (pl.col("age")<=40)).then(1).otherwise(0).alias("is_ego"))
)

## 4.2 - With a child aged <=4 years-old

In [None]:
# Identify youngest cohabiting child aged 0-4 of women aged 18-40
ego_women = spine.filter(pl.col("is_ego")==1).select("RINPERSOON")
fam_child_egos = (
    fam_child
    # Subset child edges of ego women
    .join(ego_women, how="inner", on="RINPERSOON")
    # Subset only cohabiting children
    .join(housemates_full, how="inner", on=["RINPERSOON","RINPERSOONRELATIE"])
    # Keep youngest child
    .sort(["RINPERSOON","age"], descending=False)
    .unique(subset=["RINPERSOON"], keep="first")
    .drop(["age","RELATIE"])
    # Create flag variable
    .with_columns(is_ego_child = 1)
    # Rename child id
    .rename({"RINPERSOONRELATIE":"id_child"})
)

In [None]:
# Flag women with child aged <=4 years-old
spine = (
    spine
    .join(fam_child_egos, how="left", on="RINPERSOON")
    # Ensure that the flag only applies to women aged 18-40
    .with_columns(pl.col("is_ego_child").fill_null(strategy="zero"))
)

In [None]:
# Store spine
spine.write_csv(config["data"]["spine"], separator=",", line_terminator="\n")