# Data Prep - Census Data
This notebook is the second part to the data preparation phase for this project. I will perform a series of manipulations on sociodemographic data from the 2010's Brazilian Census, the latest census conducted in the country. This is a complex process with many stages and will involve both careful manipulations of the raw data as well as handling of geospatial data.

In order to perform these manipulations, we will use both `pyspark` and an extension called `Apache Sedona` (formerly known as `geospark`).

In [None]:
# installing the requirements:
!pip install -r ../configs/dependencies/dataprep_requirements.txt >> ../configs/dependencies/package_installation.txt

In [2]:
# loading the magic commands:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [3]:
###### Loading the necessary libraries #########

# PySpark dependencies:s
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
import pyspark.sql.types as T
from pyspark.sql.window import Window

# Sedona dependencies:
from sedona.utils.adapter import Adapter
from sedona.register import SedonaRegistrator
from sedona.utils import KryoSerializer, SedonaKryoRegistrator
from sedona.core.SpatialRDD import SpatialRDD
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.formatMapper import GeoJsonReader

# database utilities:
from sqlalchemy import create_engine
import sqlite3 as db
import pandas as pd
from tqdm import tqdm
import geopandas as gpd
import fiona

# plotting and data visualization:
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import HTML, Image

# other relevant libraries:
import warnings
import unidecode
import inflection
import unicodedata
from datetime import datetime, timedelta
from functools import partial
import json
import re
import os
from glob import glob
import shutil
import itertools
import chardet

# importing the atlas utilities:
from atlasutils import (
    save_to_filesystem,
    save_as_table,
    rotate_xticks,
    get_file_encoding,
    normalize_entities,
    normalize_column_name,
    apply_category_map,
    standardize_variable_names,
    get_null_columns,
    replace_decimal_separator,
    convert_to_geopandas,
    drop_invalid_census_columns,
    clean_census_column_name,
    get_file_crs,
    get_column_values,
)


# setting global parameters for visualizationsss:
warnings.filterwarnings("ignore")
pd.set_option("display.precision", 4)
pd.set_option("display.float_format", lambda x: "%.2f" % x)

# 0. Configuring Spark

In [4]:
# function to encapsulate standard spark configurations:
def init_spark(app_name):

    spark = (
        SparkSession.builder.appName(app_name)
        .config("spark.files.overwrite", "true")
        .config("spark.serializer", KryoSerializer.getName)
        .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
        .config(
            "spark.jars.packages",
            "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.1-incubating,"
            "org.datasyslab:geotools-wrapper:geotools-24.1",
        )
        .config("spark.sql.repl.eagerEval.enabled", True)
        .config("spark.sql.repl.eagerEval.maxNumRows", 5)
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .config("sedona.global.charset", "utf8")
        .config("sedona.global.index", "true")
        .enableHiveSupport()
        .getOrCreate()
    )

    SedonaRegistrator.registerAll(spark)

    return spark

In [5]:
# init the spark session:
spark = init_spark("SP Atlas - IBGE Census")

In [6]:
# verifying the session status:
spark

# 1. Loading and Inspecting the Data
We will first load the Polygon geospatial data that we will later use to match to the `(lat, long)` paths in the listings datasets.

## 1.1 Sector Polygons
The lowest level of data aggregation in the Census is a Sector. A sector is a specific area in which researchers collect census data. These sectors are contiguous areas and IBGE provides the Shapefiles for them. This will allow us to match the Listings data to the census sector where they can be located.

In [7]:
# loading the raw dataset:
RAW_DATA_DIR = "../data/raw/"

# shapefiles for sectors:
sector_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_layers/census_sectors/*"
)

df_sector = Adapter.toDf(sector_rdd, spark)

# verifying the sectors:
df_sector

geometry,ID,CD_GEOCODI,TIPO,CD_GEOCODS,NM_SUBDIST,CD_GEOCODD,NM_DISTRIT,CD_GEOCODM,NM_MUNICIP,NM_MICRO,NM_MESO,CD_GEOCODB,NM_BAIRRO,ID1
POLYGON ((-46.410...,98237,354100005000009,URBANO,35410000500,,354100005,PRAIA GRANDE,3541000,PRAIA GRANDE,SANTOS,METROPOLITANA DE ...,354100005001,Boqueirão,1
POLYGON ((-46.416...,98232,354100005000004,URBANO,35410000500,,354100005,PRAIA GRANDE,3541000,PRAIA GRANDE,SANTOS,METROPOLITANA DE ...,354100005001,Boqueirão,2
POLYGON ((-46.412...,98230,354100005000002,URBANO,35410000500,,354100005,PRAIA GRANDE,3541000,PRAIA GRANDE,SANTOS,METROPOLITANA DE ...,354100005001,Boqueirão,3
POLYGON ((-46.411...,98229,354100005000001,URBANO,35410000500,,354100005,PRAIA GRANDE,3541000,PRAIA GRANDE,SANTOS,METROPOLITANA DE ...,354100005001,Boqueirão,4
POLYGON ((-46.413...,98231,354100005000003,URBANO,35410000500,,354100005,PRAIA GRANDE,3541000,PRAIA GRANDE,SANTOS,METROPOLITANA DE ...,354100005001,Boqueirão,5


In [8]:
# selecting the relevant columns:
df_sector = df_sector.select(
    F.col("ID").alias("id"),
    F.col("geometry"),
    F.col("CD_GEOCODI").alias("sector_code"),
    F.col("CD_GEOCODM").alias("city_code"),
    F.col("CD_GEOCODB").alias("neighborhood_code"),
    F.col("NM_MUNICIP").alias("city"),
    F.col("NM_BAIRRO").alias("neighborhood"),
    F.col("TIPO").alias("sector_type"),
)

# filtering just São Paulo:
df_sector = df_sector.filter(F.col("city") == "SÃO PAULO")

In [9]:
# adding the dataframe to sql context:
df_sector.createOrReplaceTempView("tb_sector")

## 1.2 Area of Ponderation

In [10]:
# reading the area of ponderation files:
ap_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_layers/census_ponderations/*"
)

df_ap = Adapter.toDf(ap_rdd, spark)

# adding the geometry dataframes to the SQL Context:
df_ap.createOrReplaceTempView("tb_ponderation")

# verifying the sectors:
df_ap

geometry,ID,AREA,COD_AED,COD_AED_S
POLYGON ((317854....,158,10.450821,3550308005243,243
POLYGON ((320682....,159,13.241315,3550308005242,242
POLYGON ((325497....,160,4.180473,3550308005309,309
POLYGON ((326306....,117,11.936847,3550308005189,189
POLYGON ((328654....,1,7.321331,3550308005127,127


In [11]:
Q_AP_CONVERSION = """
SELECT 
    ST_FlipCoordinates(ST_Transform(A.geometry, 'epsg:29193','epsg:4326')) as geometry,
    A.AREA as ponderation_area,
    A.COD_AED as ponderation_area_code
FROM tb_ponderation as A
"""

# converting the CRS to the standard format:
df_ap = spark.sql(Q_AP_CONVERSION)

# adding the geometry dataframes to the SQL Context:
df_ap.createOrReplaceTempView("tb_ponderation")

In [12]:
# verifying the results:
df_ap

geometry,ponderation_area,ponderation_area_code
POLYGON ((-46.782...,10.450821,3550308005243
POLYGON ((-46.755...,13.241315,3550308005242
POLYGON ((-46.708...,4.180473,3550308005309
POLYGON ((-46.700...,11.936847,3550308005189
POLYGON ((-46.677...,7.321331,3550308005127


## 1.3 Neighborhoods

In [13]:
# reading the neighborhoods files:
nb_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_layers/neighborhoods/*"
)

df_nb = Adapter.toDf(nb_rdd, spark)
df_nb.createOrReplaceTempView("tb_neighborhood")

# verifying the sectors:
df_nb

geometry,Name,descriptio
MULTIPOLYGON (((-...,Alto da Riviera,ALTO DA RIVIERA
POLYGON ((-46.589...,Alto da Mooca,ALTO DA MOOCA
POLYGON ((-46.719...,Alto da Lapa,ALTO DA LAPA
POLYGON ((-46.629...,Vila Agua Funda,VILA AGUA FUNDA
POLYGON ((-46.622...,Agua Fria,AGUA FRIA


## 1.4 Districts

In [14]:
# reading the neighborhoods files:
district_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_layers/districts/*"
)

df_district = Adapter.toDf(district_rdd, spark)

# verifying the sectors:
df_district

geometry,CLASSID,FEATID,REVISIONNU,NOME_DIST,SIGLA_DIST,COD_DIST,COD_SUB,DATA_CRIAC,USUARIO_ID
POLYGON ((352436....,4.0,8583485.0,1.0,JOSE BONIFACIO,JBO,47,27,20070319,0.0
POLYGON ((320696....,4.0,8583484.0,1.0,JD SAO LUIS,JDS,46,18,20070319,0.0
POLYGON ((349461....,4.0,8583445.0,1.0,ARTUR ALVIM,AAL,5,21,20070319,0.0
POLYGON ((320731....,4.0,8583479.0,1.0,JAGUARA,JAG,40,8,20070319,0.0
POLYGON ((338651....,4.0,8583437.0,1.0,VILA PRUDENTE,VPR,93,29,20070319,0.0


In [15]:
# it looks like the file's geometry is not the standard we want (ESPG:4326), let's find out what is it:
get_file_crs(RAW_DATA_DIR + "sp_layers/districts/")

<Projected CRS: EPSG:29193>
Name: SAD69 / UTM zone 23S
Axis Info [cartesian]:
- E[east]: Easting (metre)
- N[north]: Northing (metre)
Area of Use:
- name: Brazil - between 48°W and 42°W, northern and southern hemispheres, onshore and offshore.
- bounds: (-48.0, -33.5, -42.0, 5.13)
Coordinate Operation:
- name: UTM zone 23S
- method: Transverse Mercator
Datum: South American Datum 1969
- Ellipsoid: GRS 1967 Modified
- Prime Meridian: Greenwich

We will need to convert the dataset to a standard CRS (coordinate reference system), which, in our case, is `espg:4326`.

In [16]:
# dropping the unnecessary columns:
df_district = df_district.drop(
    "CLASSID", "FEATID", "REVISIONNU", "DATA_CRIAC", "USUARIO_ID"
)

df_district.createOrReplaceTempView("tb_district")

# converting the coordinate system in the district file:
Q_DISTRICT_CONVERSION = """
SELECT 
    ST_FlipCoordinates(ST_Transform(A.geometry, 'epsg:29193','epsg:4326')) as geometry,
    A.NOME_DIST as district_name,
    A.SIGLA_DIST as district_abbreviation,
    A.COD_DIST as district_code,
    A.COD_SUB as subdistrict_code
FROM tb_district as A
"""

df_district = spark.sql(Q_DISTRICT_CONVERSION)

## 1.5 Zip Codes

In [17]:
# shapefiles for sectors:
zipcode_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_layers/logradouros/*"
)

df_zipcode = Adapter.toDf(zipcode_rdd, spark)
df_zipcode.createOrReplaceTempView("tb_zipcode")

# verifying the zipcode shapes::
df_zipcode

geometry,ID,LENGTH,CODL,SEG,ANGULO,ORIENTA_O,COD_ORI,LEFT_ZIP,RIGHT_ZIP,START_LEFT,END_LEFT,START_RIGH,END_RIGHT,CEP_D,CEP_E,TITULO,PREP,NAME,BAIRRO_D,BAIRRO_E
LINESTRING (31949...,1,227.218229,731960,SG731960001,170,LESTE-OESTE,O,5547,5547,1,227,2,226,5547020,5547020,,,ERNESTO PAGLIA,JARDIM ROSA MARIA,JARDIM ROSA MARIA
LINESTRING (31949...,2,100.109237,753718,SG753718003,294,NORTE-SUL,S,5549,5549,111,209,110,208,5549120,5549120,PRFA.,,MARIA OSORIO TEIX...,JARDIM DAS ESMERA...,JARDIM DAS ESMERA...
LINESTRING (31955...,3,76.883336,734519,SG734519005,135,LESTE-OESTE,O,5549,5549,281,355,280,354,5549080,5549080,,,VITORIANO DA SILVA,JARDIM DAS ESMERA...,JARDIM DAS ESMERA...
LINESTRING (31954...,4,50.402368,753718,SG753718002,208,LESTE-OESTE,O,5549,5549,61,109,60,108,5549120,5549120,PRFA.,,MARIA OSORIO TEIX...,JARDIM DAS ESMERA...,JARDIM DAS ESMERA...
LINESTRING (31959...,5,59.831298,753718,SG753718001,205,LESTE-OESTE,O,5549,5549,1,59,2,58,5549120,5549120,PRFA.,,MARIA OSORIO TEIX...,JARDIM DAS ESMERA...,JARDIM DAS ESMERA...


In [18]:
# converting the coordinate system in the zipcode file:
Q_ZIPCODE_CONVERSION = """
WITH zip_left as (
  SELECT 
    DISTINCT A.CEP_E as zipcode,
    ST_FlipCoordinates(ST_Transform(A.geometry, 'epsg:29193','epsg:4326')) as geometry,
    A.NAME as street_name,
    A.LENGTH as street_length
    FROM tb_zipcode as A  
),

zip_right as (
    SELECT
        DISTINCT A.CEP_D as zipcode,
        ST_FlipCoordinates(ST_Transform(A.geometry, 'epsg:29193','epsg:4326')) as geometry,
        A.NAME as street_name,
        A.LENGTH as street_length
        FROM tb_zipcode as A      
)

SELECT * FROM zip_left 
UNION 
SELECT * FROM zip_right
"""

df_zipcode = spark.sql(Q_ZIPCODE_CONVERSION)

# dropping duplicates:
df_zipcode = df_zipcode.drop_duplicates(subset=["zipcode"])

In [19]:
# resulting dataframe becomes:
df_zipcode

zipcode,geometry,street_name,street_length
1012030,LINESTRING (-46.6...,CAFE,21.804877
1233001,LINESTRING (-46.6...,TUPI,16.602098
1248050,LINESTRING (-46.6...,PENAPOLIS,17.203656
1303040,LINESTRING (-46.6...,GRAVATAI,141.574693
1439020,LINESTRING (-46.6...,JAMAICA,94.675586


In [20]:
# number of distinct zip codes:
df_zipcode.count()

44886

In [21]:
# registering to SQL context:
df_zipcode.createOrReplaceTempView("tb_zipcode")

## 1.6 IPVS

In [22]:
# shapefiles for ipvs (will use geopandas, since
gdf_ipvs = gpd.read_file(RAW_DATA_DIR + "sp_layers/ipvs/")

# saving intermediary file:
gdf_ipvs.to_file("../data/raw/sp_ipvs/shapefile/geo_ipvs_2010.shp")

In [23]:
# shapefiles for sectors:
ipvs_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_ipvs/shapefile/*"
)

df_ipvs = Adapter.toDf(ipvs_rdd, spark)
df_ipvs.createOrReplaceTempView("tb_ipvs")

# verifying the ipvs shapes:
df_ipvs

geometry,ID,AREA,CD_GEODI,TIPO,CD_GEODM,V1,V2,V3,V4,V5,V6,V61,V62,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,V29,V30,V31,V32,V33,V34,V35,V36,V37,V38,V39,V40,V41,V42,V43,V44,V45,V46,V47,V48,V49
POLYGON ((-47.712...,6164,0.33,350820705000003,URBANO,3508207,3508207,Buritizal,0,0.0,695,REG ADM FRANCA,350820705,Buritizal,350820705000003,Área urbanizada d...,N,Vulnerabilidade m...,263,262,0,747,2.85,7.8983,262,2181.2595,765.0469,1.9084,1.5267,13.7405,63.3588,19.4656,15.2672,4.1985,48.8015,1471.9695,93.5114,13.7405,67.4825,55.1905,92.0635,3.1746,1098.4286,-0.4747,-0.5777,,,99.6183,100.0,100.0,100.0,1,8,4044,747.0,3306.000000000000000,4053
POLYGON ((-47.714...,6165,0.06,350820705000004,URBANO,3508207,3508207,Buritizal,0,0.0,695,REG ADM FRANCA,350820705,Buritizal,350820705000004,Área urbanizada d...,N,Vulnerabilidade m...,149,149,0,472,3.17,9.5339,149,1930.3221,609.3602,0.6711,0.0,12.7517,73.8255,12.7517,12.7517,3.3557,41.302,1247.4899,97.3154,17.4497,64.626,44.6842,94.7368,5.2632,833.7105,-0.1363,0.2784,,,100.0,100.0,100.0,100.0,1,8,4044,747.0,3306.000000000000000,4053
POLYGON ((-51.158...,24919,26.26,352890810000003,RURAL,3528908,3528908,Mariápolis,0,0.0,691,REG ADM PRESIDENT...,352890810,Mourão,352890810000003,"Zona rural, exclu...",N,Não classificado,30,30,0,94,3.13,4.2553,30,1264.6667,403.617,0.0,0.0,33.3333,63.3333,3.3333,33.3333,6.6667,49.6,853.6667,100.0,0.0,67.5013,56.0,100.0,0.0,500.0,0.0,0.0,,,0.0,6.6667,20.0,100.0,0,4,3912,779.0,3137.000000000000000,3916
POLYGON ((-49.948...,24920,0.13,352900505000001,URBANO,3529005,3529005,Marília,0,0.0,692,REG ADM MARILIA,352900505,Marília,352900505000001,Área urbanizada d...,N,Vulnerabilidade m...,154,134,0,307,2.29,5.2117,134,3594.1418,1568.7785,0.7463,0.0,4.4776,43.2836,51.4925,4.4776,1.4925,55.403,2184.403,97.0149,12.6866,60.7768,57.1711,96.0526,7.8947,1476.1842,0.5118,-0.7793,,,100.0,100.0,100.0,100.0,2543,104,214098,9724.0,207021.0000000000...,216745
POLYGON ((-49.950...,24921,0.06,352900505000002,URBANO,3529005,3529005,Marília,0,0.0,692,REG ADM MARILIA,352900505,Marília,352900505000002,Área urbanizada d...,N,Baixíssima vulner...,121,113,0,232,2.05,2.1552,113,4936.5133,2404.4224,1.7699,0.0,0.885,24.7788,72.5664,0.885,0.0,58.2389,4103.3982,98.2301,15.0442,83.1234,61.5849,96.2264,13.2075,2937.5472,1.5504,-0.4993,,,100.0,100.0,100.0,100.0,2543,104,214098,9724.0,207021.0000000000...,216745


In [24]:
# reading the codebook for the ipvs dataset:
df_ipvs_codebook = pd.read_excel("../references/documentation/ipvs/ipvs_codebook.xlsx")

In [25]:
# taking a look at the codebook
df_ipvs_codebook.head()

Unnamed: 0,NOME DO ARQUIVO,VARIAVEL,NOME DA VARIAVEL,DESCRICAO DA VARIAVEL,FONTE,TIPO,normalized_variable
0,IPVS 2010 EST SP,ID,Identificados,Identificados,IBGE,NÚMERO,id
1,IPVS 2010 EST SP,AREA,Área em Km2,Área em Km2,IBGE,TEXTO,sector_area_square_kms
2,IPVS 2010 EST SP,CD_GEOCODI,Código do setor censitário do IBGE,Código do setor censitário do IBGE,IBGE,NÚMERO,sector_code
3,IPVS 2010 EST SP,TIPO,Tipo de setor censitário,Tipo de setor censitário,IBGE,TEXTO,sector_type
4,IPVS 2010 EST SP,CD_GEOCODM,Código do município do IBGE,Código do município do IBGE,IBGE,NÚMERO,city_code_census


In [26]:
# converting the column names to normalized variable:
ipvs_col_lookup = dict(
    zip(df_ipvs_codebook.VARIAVEL.values, df_ipvs_codebook.normalized_variable.values)
)

In [27]:
# changing the column names:
for orig_col, new_col in ipvs_col_lookup.items():
    df_ipvs = df_ipvs.withColumnRenamed(orig_col, new_col)

# changing few leftover columns:
df_ipvs = df_ipvs.withColumnRenamed("CD_GEODI", "sector_code").withColumnRenamed(
    "CD_GEODM", "city_code_census"
)

df_ipvs.createOrReplaceTempView("tb_ipvs")

In [28]:
# using a subquery to remove the sector codes that are not in the base census shapefile:
Q_SECTOR_FILTER = """
SELECT
    A.*
FROM tb_ipvs as A
LEFT JOIN tb_sector as B 
ON A.sector_code = B.sector_code
WHERE A.sector_code IN (SELECT DISTINCT sector_code FROM tb_sector)
"""

df_ipvs_clean = spark.sql(Q_SECTOR_FILTER)

In [29]:
# cleaning up is_subnormal_aglomerate column
df_ipvs_clean = df_ipvs_clean.withColumn(
    "is_subnormal_aglomerate",
    F.when(F.col("is_subnormal_aglomerate") == "N", False)
    .when(F.col("is_subnormal_aglomerate") == "S", True)
    .otherwise(False),
)

In [31]:
# setting a ordinal scale based on the fibonacci sequence with -1 as indicators for unclassified sectors
ipvs_groups = sorted(get_column_values(df_ipvs_clean, "ipvs_group"))

groups_code = [
    -1,  # ''
    1,  # 'Baixssima vulnerabilidade'
    -1,  # 'Não classificado'
    8,  # 'Vulnerabilidade alta (Urbanos)'
    2,  # 'Vulnerabilidade baixa'
    13,  # 'Vulnerabilidade muito alta (aglomerados subnormais urbanos)'
    3,  # 'Vulnerabilidade muito baixa'
    5,  # 'Vulnerabilidade média'
]

# generating the lookup:
ipvs_groups_lookup = dict(zip(ipvs_groups, groups_code))

In [32]:
# applying the lookup to the group column:
df_ipvs_clean = df_ipvs_clean.withColumn(
    "ipvs_group", apply_category_map(ipvs_groups_lookup)("ipvs_group")
)

In [33]:
# verifying the results:
df_ipvs_clean

geometry,id,sector_area_square_kms,sector_code,sector_type,city_code_census,city_code_ipvs,city_name,metropolitan_region_code,metropolitan_region_name,administrative_region_code,administrative_region_name,district_code,district_name,census_sector_code,sector_situation,is_subnormal_aglomerate,ipvs_group,households,households_permanent,improvised_households,people_permanent_households,average_residents_permanent_households,prop_0_5_age_population,total_responsible_people,average_household_income,per_capita_income,prop_households_no_income,prop_househols_one_eigth_sm_income,prop_households_one_halft_sm_income,prop_households_two_sm_income,prop_households_two_or_more_income,prop_private_households_one_half_sm_income,prop_private_households_one_quarter_sm_income,average_age_responsible_people,average_income_household_responsible,prop_literate_people,prop_responsible_people_up_to_30_years,prop_income_participation_in_household_responsible,average_age_women_responsible,prop_literate_women,prop_women_up_to_30_years,average_income_women_responsible,factor_1,factor_2,factor_1_rural,factor_2_rural,prop_households_access_water,prop_households_sanitation,prop_households_trash_collection,prop_households_electricity,population_collective_households_city,population_improvised_households_city,population_permanent_households_city,population_rural_areas_city,population_urban_areas_city,population_city
POLYGON ((-46.569...,43405,0.08,355030801000005,URBANO,3550308,3550308,São Paulo,800,0,681,REG METROPOLIT DE...,355030801,Água Rasa,355030801000005,Área urbanizada d...,False,3,240,240,0,754,3.1417,7.1618,240,2563.3958,815.935,3.3333,0.0,9.1667,60.0,27.5,9.1667,0.8333,50.8958,1416.9,96.6667,10.4167,55.2743,51.3832,95.3271,11.215,938.1402,0.0821,-0.5106,,,100.0,100.0,100.0,100.0,36944,6886,11209673,*****************...,*****************...,11253503
POLYGON ((-46.700...,43558,0.03,355030802000028,URBANO,3550308,3550308,São Paulo,800,0,681,REG METROPOLIT DE...,355030802,Alto de Pinheiros,355030802000028,Área urbanizada d...,False,1,177,177,0,394,2.226,3.2995,177,8323.226,3739.1142,3.3898,0.0,1.1299,8.4746,87.0057,1.1299,0.565,52.7684,5017.3333,99.435,10.7345,60.2811,54.5169,98.8764,12.3596,4285.4719,2.2971,0.0103,,,100.0,100.0,100.0,100.0,36944,6886,11209673,*****************...,*****************...,11253503
POLYGON ((-46.675...,44007,0.06,355030806000016,URBANO,3550308,3550308,São Paulo,800,0,681,REG METROPOLIT DE...,355030806,Barra Funda,355030806000016,Área urbanizada d...,False,1,135,135,0,356,2.637,5.0562,135,6132.4444,2325.5056,1.4815,0.0,0.0,16.2963,82.2222,0.0,0.0,51.5704,3564.8296,100.0,17.037,58.1306,53.0133,100.0,18.6667,2499.2,1.6655,0.4119,,,100.0,100.0,100.0,100.0,36944,6886,11209673,*****************...,*****************...,11253503
POLYGON ((-46.599...,44200,0.12,355030808000029,URBANO,3550308,3550308,São Paulo,800,0,681,REG METROPOLIT DE...,355030808,Belém,355030808000029,Área urbanizada d...,False,3,259,259,0,739,2.8533,5.4127,259,3249.4479,1138.8457,5.4054,0.3861,8.4942,37.8378,47.8764,8.8803,1.1583,49.7066,1867.7259,100.0,11.1969,57.4783,51.2308,100.0,10.0,1436.5462,0.5398,-0.4063,,,100.0,100.0,100.0,100.0,36944,6886,11209673,*****************...,*****************...,11253503
POLYGON ((-46.603...,44216,0.12,355030808000045,URBANO,3550308,3550308,São Paulo,800,0,681,REG METROPOLIT DE...,355030808,Belém,355030808000045,Área urbanizada d...,False,3,193,193,0,577,2.9896,5.7192,193,3659.4404,1224.0416,0.0,0.0,2.0725,48.1865,49.7409,2.0725,0.5181,54.513,2075.0,97.4093,4.6632,56.7027,59.1216,95.9459,4.0541,1589.3514,0.356,-1.293,,,100.0,97.9275,100.0,100.0,36944,6886,11209673,*****************...,*****************...,11253503


In [34]:
# converting the data types:
ipvs_types = {
    "sector_area_square_kms": "float",
    "ipvs_group": "integer",
    "households": "long",
    "households_permanent": "long",
    "improvised_households": "long",
    "people_permanent_households": "long",
    "average_residents_permanent_households": "double",
    "prop_0_5_age_population": "double",
    "total_responsible_people": "long",
    "average_household_income": "double",
    "per_capita_income": "double",
    "prop_households_no_income": "double",
    "prop_househols_one_eigth_sm_income": "double",
    "prop_households_one_halft_sm_income": "double",
    "prop_households_two_sm_income": "double",
    "prop_households_two_or_more_income": "double",
    "prop_private_households_one_half_sm_income": "double",
    "prop_private_households_one_quarter_sm_income": "double",
    "average_age_responsible_people": "double",
    "average_income_household_responsible": "double",
    "prop_literate_people": "double",
    "prop_responsible_people_up_to_30_years": "double",
    "prop_income_participation_in_household_responsible": "double",
    "average_age_women_responsible": "double",
    "factor_1": "double",
    "factor_2": "double",
    "factor_1_rural": "double",
    "factor_2_rural": "double",
    "prop_households_access_water": "double",
    "prop_households_sanitation": "double",
    "prop_households_trash_collection": "double",
    "prop_households_electricity": "double",
}

for col, new_type in ipvs_types.items():
    df_ipvs_clean = df_ipvs_clean.withColumn(col, F.col(col).cast(new_type))

In [35]:
df_ipvs_clean.printSchema()

root
 |-- geometry: geometry (nullable = true)
 |-- id: string (nullable = true)
 |-- sector_area_square_kms: float (nullable = true)
 |-- sector_code: string (nullable = true)
 |-- sector_type: string (nullable = true)
 |-- city_code_census: string (nullable = true)
 |-- city_code_ipvs: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- metropolitan_region_code: string (nullable = true)
 |-- metropolitan_region_name: string (nullable = true)
 |-- administrative_region_code: string (nullable = true)
 |-- administrative_region_name: string (nullable = true)
 |-- district_code: string (nullable = true)
 |-- district_name: string (nullable = true)
 |-- census_sector_code: string (nullable = true)
 |-- sector_situation: string (nullable = true)
 |-- is_subnormal_aglomerate: boolean (nullable = false)
 |-- ipvs_group: integer (nullable = true)
 |-- households: long (nullable = true)
 |-- households_permanent: long (nullable = true)
 |-- improvised_households: long (nulla

## 1.7 Demographic Density

In [50]:
# shapefiles for demographic density:
density_rdd = ShapefileReader.readToGeometryRDD(
    sc=spark, inputPath=RAW_DATA_DIR + "sp_layers/demographic_density/*"
)

df_density = Adapter.toDf(density_rdd, spark)
df_density.createOrReplaceTempView("tb_density")

# verifying the density shapes:
df_density

geometry,id,setor_cens,populacao,area_hect,habit_hect,ano_densid,an_censo
POLYGON ((317440....,32355,355030865000149,573,0.100683,5691.134318,2010,2010
POLYGON ((317403....,32356,355030865000148,156,0.226534,688.63773,2010,2010
POLYGON ((345980....,24960,355030864000087,350,5.201065,67.293912,2010,2010
POLYGON ((346080....,24961,355030864000088,306,2.962862,103.278515,2010,2010
POLYGON ((346465....,24964,355030864000058,997,8.876239,112.322347,2010,2010


In [51]:
# converting the coordinate system in the district file:
Q_DEM_DENSITY_CONV = """
SELECT 
    ST_FlipCoordinates(ST_Transform(A.geometry, 'epsg:29193','epsg:4326')) as geometry,
    A.setor_cens as sector_code,
    CAST(A.populacao as double) as population,
    CAST(A.area_hect as double) as area_in_hectares,
    CAST(A.habit_hect as double) as population_density
FROM tb_density as A
"""

df_density = spark.sql(Q_DEM_DENSITY_CONV)
df_density.createOrReplaceTempView("tb_density")

## 1.8 Census Codebook
The codebook is a master file that contains the name of the variables in respect to their original locations and meaning in the raw data. We will use these to give more meaningful names to the many census variables.

In [53]:
# loading the codebook from the census data
DATA_DOC_DIR = "../references/documentation/"

df_codebook = spark.read.json(DATA_DOC_DIR + "ibge/codebook_features_selected.json")

In [54]:
# we will normalize the data in the codebook such that we can use it to match the variables in the other datasets:
new_cols = {
    "Descrição da Variável": "variable_description",
    "Nome da Variável": "variable_name",
    "Tabela": "dataset",
}

for original_name, new_name in new_cols.items():
    df_codebook = df_codebook.withColumnRenamed(original_name, new_name)

In [55]:
# codebook yields:
df_codebook.count()  # there are a total of 4107 variables in all the datasets

4107

In [56]:
# verifying the dataset:
df_codebook

dataset_name_pt,is_selected,normalized_variable,simplified_variable_name,variable_description_en,variable_description_pt,variable_name
Básico,1,sector_code,cod_setor,Sector code,Código do setor,Cod_setor
Básico,1,code_large_region...,cod_grandes_regioes,Code of Large Reg...,Código das Grande...,Cod_Grandes Regiões
Básico,1,name_large_region...,nome_grande_regiao,Name of large reg...,Nome das Grandes ...,Nome_Grande_Regiao
Básico,1,federation_unit_code,cod_uf,Federation Unit Code,Código da Unidade...,Cod_UF
Básico,1,name_federation_unit,nome_da_uf,Name of the Feder...,Nome da Unidade d...,Nome_da_UF


In [57]:
# building udfs from the utilities:
normalize_entities_udf = F.udf(normalize_entities)

df_codebook = df_codebook.withColumn(
    "dataset_normalized", normalize_entities_udf(F.col("dataset_name_pt"))
)

In [58]:
# fixing the order of columns for the codebook:
df_codebook_subset = df_codebook.select(
    "dataset_normalized", "simplified_variable_name", "normalized_variable"
)

# generating a lookup table base on the dataset : original_name relation
map_rdd = df_codebook_subset.rdd.groupBy(lambda row: row["dataset_normalized"]).map(
    lambda row: (row[0], {variable[1]: variable[2] for variable in row[1]})
)

codebook_lookup = dict(map_rdd.collect())

## 1.9 Census Data
The Census data itself is broken up in several different files that represent the different entities of the data. For example:

1. Domicilio{N}: refers to data at the `household`. Things like number of households in a sector, number of residents and such is located here;
2. Pessoa{N}: refers to the `person` data. Here we can find information about the sociodemographic make-up of the sector, such as gender distributions, number of children, race profiles, et cetera;

... and so on.

I will process all the files related to the city of São Paulo and gather them into the appropriate entities. I also went ahead and pre-selected several features from the codebook, such that we won't use all of the available features (`4107` according to the codebook).

# 2. Processing the Census Data

In [223]:
# defining a blacklist of column names to not convert data types:

blacklist = [
    "cod_bairro",
    "cod_distrito",
    "cod_grandes_regioes",
    "cod_meso",
    "cod_micro",
    "cod_municipio",
    "cod_rm",
    "cod_setor",
    "cod_subdistrito",
    "cod_uf",
    "nome_da_meso",
    "nome_da_micro",
    "nome_da_rm",
    "nome_da_uf",
    "nome_do_bairro",
    "nome_do_distrito",
    "nome_do_municipio",
    "nome_do_subdistrito",
    "nome_grande_regiao",
    "situacao_setor",
    "tipo_setor",
    "situacao",
    "setor_precoleta",
]

In [224]:
# list all the files available on the census data directory:
census_files = sorted(glob(RAW_DATA_DIR + "sp_census/universe_results/*.csv"))
BASE_PATH = RAW_DATA_DIR + "sp_census/universe_results/"

# saving the resulting dataframes to a processed stage for further handling:
PROCESSED_CENSUS_DATA_DIR = "../data/processed/sp_census/raw_datasets/"

# instantiating the UDF for replacing decimal separators:
replace_decimal_separator_udf = F.udf(replace_decimal_separator)
census_data = []

In [225]:
for file in tqdm(census_files):

    print(f"Processing file: {file}")

    # fixing names of the original tables
    dataset_name = (
        file.split("/")[-1].replace(".csv", "").replace("SP1", "").replace("_", "")
    )

    # normalizes the text data for the dataset
    dataset_name = normalize_entities(dataset_name)

    # loading the raw csv file
    df_temp = spark.read.csv(
        file,
        header=True,
        sep=";",
        inferSchema=False,
        encoding="ISO-8859-1",
    )

    # dropping duplicated columns (we can't do much about these)
    df_temp = drop_invalid_census_columns(df_temp, codebook_lookup, dataset_name)

    # retrieving the columns:
    original_columns = sorted(df_temp.columns)

    # normalize column names:
    normalized_columns = list(map(clean_census_column_name, original_columns))

    # looking up column names:
    new_columns = list(
        map(lambda col: codebook_lookup[dataset_name][col], normalized_columns)
    )

    # get columns with duplicates:
    cols_map = dict(zip(normalized_columns, new_columns))

    # iterating over the columns to replace with the new ones:
    for i in range(len(original_columns)):

        input_col = original_columns[i]
        check_col = clean_census_column_name(input_col)
        output_col = new_columns[i]

        try:

            if check_col not in blacklist:

                # renames the column:
                df_temp = df_temp.withColumnRenamed(input_col, output_col)

                # in this case, convert the data types to float after replacing decimal separators:
                df_temp = df_temp.withColumn(
                    output_col, replace_decimal_separator_udf(F.col(output_col))
                ).withColumn(output_col, F.col(output_col).cast("double"))

            else:
                df_temp = df_temp.withColumnRenamed(input_col, output_col)

        except Exception as e:
            print(f"Dropping column {col} as it could not be processed: {e}")

            df_temp.drop(input_col)

    # using the helper function to save the file:
    OUTPUT_NAME = f"tb_{dataset_name}"

    save_to_filesystem(
        df_temp, PROCESSED_CENSUS_DATA_DIR, OUTPUT_NAME, OUTPUT_NAME + ".parquet"
    )

  0%|          | 0/26 [00:00<?, ?it/s]

Processing file: ../data/raw/sp_census/universe_results/Basico_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


  4%|▍         | 1/26 [00:00<00:23,  1.09it/s]

Processing file: ../data/raw/sp_census/universe_results/Domicilio01_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns ['V026', 'V120', 'V202', 'V206', 'V025', 'V198', 'V041', 'V203', 'V204', 'V119', 'V126', 'V125', 'V039', 'V208', 'V004', 'V139', 'V035'] for being duplicates


  8%|▊         | 2/26 [00:44<10:24, 26.03s/it]

Processing file: ../data/raw/sp_census/universe_results/Domicilio02_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns ['V002', 'V098', 'V046', 'V054', 'V090', 'V010'] for being duplicates


 12%|█▏        | 3/26 [00:55<07:18, 19.05s/it]

Processing file: ../data/raw/sp_census/universe_results/DomicilioRenda_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 15%|█▌        | 4/26 [00:55<04:18, 11.75s/it]

Processing file: ../data/raw/sp_census/universe_results/Entorno01_SP1.csv
Dropping columns ['Cod_Grandes Regiões', 'Cod_RM', 'Cod_UF', 'Cod_bairro', 'Cod_distrito', 'Cod_meso', 'Cod_micro', 'Cod_municipio', 'Cod_subdistrito', 'Nome_Grande_Regiao', 'Nome_da_RM', 'Nome_da_UF ', 'Nome_da_meso', 'Nome_da_micro', 'Nome_do_bairro', 'Nome_do_distrito', 'Nome_do_municipio', 'Nome_do_subdistrito', 'Setor_Precoleta'] - not present in the codebook
Dropping columns ['V022', 'V007', 'V003', 'V023', 'V041', 'V103', 'V057', 'V102', 'V162', 'V059', 'Situacao_setor', 'V004', 'Cod_setor', 'V014', 'V062', 'V002', 'V021', 'V020', 'V063', 'V016', 'V039', 'V163', 'V006'] for being duplicates


 19%|█▉        | 5/26 [01:20<05:45, 16.46s/it]

Processing file: ../data/raw/sp_census/universe_results/Entorno02_SP1.csv
Dropping columns ['Cod_Grandes Regiões', 'Cod_RM', 'Cod_UF', 'Cod_bairro', 'Cod_distrito', 'Cod_meso', 'Cod_micro', 'Cod_municipio', 'Cod_subdistrito', 'Nome_Grande_Regiao', 'Nome_da_RM', 'Nome_da_UF ', 'Nome_da_meso', 'Nome_da_micro', 'Nome_do_bairro', 'Nome_do_distrito', 'Nome_do_municipio', 'Nome_do_subdistrito', 'Setor_Precoleta'] - not present in the codebook
Dropping columns ['V206', 'V202', 'V203', 'V207', 'Situacao_setor', 'Cod_setor'] for being duplicates


 23%|██▎       | 6/26 [01:59<08:03, 24.16s/it]

Processing file: ../data/raw/sp_census/universe_results/Entorno03_SP1.csv
Dropping columns ['Cod_Grandes Regiões', 'Cod_RM', 'Cod_UF', 'Cod_bairro', 'Cod_distrito', 'Cod_meso', 'Cod_micro', 'Cod_municipio', 'Cod_subdistrito', 'Nome_Grande_Regiao', 'Nome_da_RM', 'Nome_da_UF ', 'Nome_da_meso', 'Nome_da_micro', 'Nome_do_bairro', 'Nome_do_distrito', 'Nome_do_municipio', 'Nome_do_subdistrito', 'Setor_Precoleta'] - not present in the codebook
Dropping columns ['V429', 'V477', 'V479', 'V453', 'V475', 'V457', 'V431', 'Situacao_setor', 'V478', 'V471', 'V480', 'Cod_setor'] for being duplicates


 27%|██▋       | 7/26 [02:28<08:05, 25.56s/it]

Processing file: ../data/raw/sp_census/universe_results/Entorno04_SP1.csv
Dropping columns ['Cod_Grandes Regiões', 'Cod_RM', 'Cod_UF', 'Cod_bairro', 'Cod_distrito', 'Cod_meso', 'Cod_micro', 'Cod_municipio', 'Cod_subdistrito', 'Nome_Grande_Regiao', 'Nome_da_RM', 'Nome_da_UF ', 'Nome_da_meso', 'Nome_da_micro', 'Nome_do_bairro', 'Nome_do_distrito', 'Nome_do_municipio', 'Nome_do_subdistrito', 'Setor_Precoleta'] - not present in the codebook
Dropping columns ['V693', 'V694', 'Cod_setor', 'Situacao_setor'] for being duplicates


 31%|███       | 8/26 [03:08<09:03, 30.21s/it]

Processing file: ../data/raw/sp_census/universe_results/Entorno05_SP1.csv
Dropping columns ['Cod_Grandes Regiões', 'Cod_RM', 'Cod_UF', 'Cod_bairro', 'Cod_distrito', 'Cod_meso', 'Cod_micro', 'Cod_municipio', 'Cod_subdistrito', 'Nome_Grande_Regiao', 'Nome_da_RM', 'Nome_da_UF ', 'Nome_da_meso', 'Nome_da_micro', 'Nome_do_bairro', 'Nome_do_distrito', 'Nome_do_municipio', 'Nome_do_subdistrito', 'Setor_Precoleta'] - not present in the codebook
Dropping columns ['Cod_setor', 'Situacao_setor'] for being duplicates


 35%|███▍      | 9/26 [03:49<09:31, 33.60s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa01_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 38%|███▊      | 10/26 [03:54<06:34, 24.65s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa02_SP1.csv
Dropping columns ['V171', 'V172', 'V173', 'V174', 'V175', 'V176', 'V177', 'V178', 'V179', 'V180', 'V181', 'V182', 'V183', 'V184', 'V185', 'V186', 'V187', 'V188', 'V189', 'V190', 'V191', 'V192', 'V193', 'V194', 'V195', 'V196', 'V197', 'V198', 'V199', 'V200', 'V201', 'V202', 'V203', 'V204', 'V205', 'V206', 'V207', 'V208', 'V209', 'V210', 'V211', 'V212', 'V213', 'V214', 'V215', 'V216', 'V217', 'V218', 'V219', 'V220', 'V221', 'V222', 'V223', 'V224', 'V225', 'V226', 'V227', 'V228', 'V229', 'V230', 'V231', 'V232', 'V233', 'V234', 'V235', 'V236', 'V237', 'V238', 'V239', 'V240', 'V241', 'V242', 'V243', 'V244', 'V245', 'V246', 'V247', 'V248', 'V249', 'V250', 'V251', 'V252', 'V253', 'V254', 'V255'] - not present in the codebook
Dropping columns ['V105', 'V094'] for being duplicates


 42%|████▏     | 11/26 [03:58<04:37, 18.49s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa03_SP1.csv
Dropping columns ['V002'] - not present in the codebook
Dropping columns [] for being duplicates


 46%|████▌     | 12/26 [04:55<07:04, 30.30s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa04_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 50%|█████     | 13/26 [05:12<05:41, 26.25s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa05_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 54%|█████▍    | 14/26 [05:13<03:41, 18.44s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa06_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 58%|█████▊    | 15/26 [05:50<04:25, 24.17s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa07_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns ['V036', 'V202', 'V203', 'V042', 'V041', 'V189', 'V195', 'V194', 'V015', 'V161', 'V049', 'V168', 'V008', 'V050'] for being duplicates


 62%|██████▏   | 16/26 [06:18<04:13, 25.31s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa08_SP1.csv
Dropping columns ['Cod_setor', 'V255'] - not present in the codebook
Dropping columns ['V206', 'V086', 'V064', 'V229', 'V251', 'V096', 'V227', 'V075', 'V228', 'V236', 'V209', 'V055', 'V091', 'V097', 'V218', 'V226', 'V084', 'V052', 'V242', 'V098', 'V054', 'V207', 'V053', 'V057', 'V215', 'V071', 'V083', 'V065', 'V239', 'V100', 'V082', 'V073', 'V216', 'V080', 'V205', 'V089', 'V059', 'V224', 'V245', 'V210', 'V223', 'V070', 'V058', 'V067', 'V244', 'V213', 'V062', 'V253', 'V237', 'V249', 'V060', 'V076', 'V063', 'V211', 'V056', 'V212', 'V250', 'V217', 'V208', 'V092', 'V220'] for being duplicates


 65%|██████▌   | 17/26 [06:54<04:17, 28.56s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa09_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns ['V161', 'V157', 'V172', 'V168'] for being duplicates


 69%|██████▉   | 18/26 [07:44<04:38, 34.86s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa10_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 73%|███████▎  | 19/26 [07:44<02:51, 24.45s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa11_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 77%|███████▋  | 20/26 [07:56<02:04, 20.78s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa12_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 81%|████████  | 21/26 [08:08<01:30, 18.18s/it]

Processing file: ../data/raw/sp_census/universe_results/Pessoa13_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 85%|████████▍ | 22/26 [08:20<01:05, 16.35s/it]

Processing file: ../data/raw/sp_census/universe_results/PessoaRenda_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 88%|████████▊ | 23/26 [08:33<00:45, 15.15s/it]

Processing file: ../data/raw/sp_census/universe_results/Responsavel01_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 92%|█████████▏| 24/26 [08:40<00:25, 12.86s/it]

Processing file: ../data/raw/sp_census/universe_results/Responsavel02_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns [] for being duplicates


 96%|█████████▌| 25/26 [09:20<00:20, 20.80s/it]

Processing file: ../data/raw/sp_census/universe_results/ResponsavelRenda_SP1.csv
Dropping columns [] - not present in the codebook
Dropping columns ['V052', 'V085', 'V019', 'V118', 'V063', 'V129'] for being duplicates


100%|██████████| 26/26 [09:31<00:00, 21.97s/it]


# 3. Summarizing the Polygon Data
In order to simplify the operations with Polygon data at the lowest level (sector), we will add a helper column as an approximation of the geospatial reference of the sector by calculating its centroid. This will allow simpler joins later on at a reasonable level of approximation due to the comparatively small area of the census sectors.

We will also use these to generate the features themselves at different levels. 

In [59]:
# registering dataframes as temporary table views -- will allow us to access them in SQL Context:
df_sector.createOrReplaceTempView("tb_census_sector")

In [60]:
# adding the centroid and area of the polygons:
Q_ADD_GEOM = """
SELECT 
    A.*,
    ST_Centroid(A.geometry) as sector_centroid,
    ST_X(ST_Centroid(A.geometry)) as centroid_longitude,
    ST_Y(ST_Centroid(A.geometry)) as centroid_latitude
FROM tb_census_sector as A
"""

# adding the geometry:
df_sector = spark.sql(Q_ADD_GEOM)

In [61]:
# verifying the resulting schema:
df_sector.printSchema()  # looks good to go

root
 |-- id: string (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- sector_code: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- neighborhood_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- sector_type: string (nullable = true)
 |-- sector_centroid: geometry (nullable = false)
 |-- centroid_longitude: double (nullable = true)
 |-- centroid_latitude: double (nullable = true)



# 4. Selecting the target features

In the codebook file provided, I had already selected several features of interest as a basis for this project. As it is originally intended for use in the Real State industry, the selection reflect that. Anyone interested in other kinds of features is free to create a selection of his or her own and use the following code as reference. The original files that hold the variables of interest are the following:

1. `Basico`
2. `Domicilio01`
3. `Domicilio02`
4. `Pessoa11`
5. `Pessoa12`
6. `Pessoa13`
7. `Responsavel02`

In [62]:
# specifying a directory for the census data:
CENSUS_DATA_DIR = "../data/processed/sp_census/raw_datasets/"

# basico - basic entity:
tb_basico = spark.read.parquet(CENSUS_DATA_DIR + "tb_basico.parquet")

# domicilio - housing entities:
tb_domicilio01 = spark.read.parquet(CENSUS_DATA_DIR + "tb_domicilio01.parquet")
tb_domicilio02 = spark.read.parquet(CENSUS_DATA_DIR + "tb_domicilio02.parquet")

# pessoa - person entities:
tb_pessoa01 = spark.read.parquet(CENSUS_DATA_DIR + "tb_pessoa01.parquet")
tb_pessoa11 = spark.read.parquet(CENSUS_DATA_DIR + "tb_pessoa11.parquet")
tb_pessoa12 = spark.read.parquet(CENSUS_DATA_DIR + "tb_pessoa12.parquet")
tb_pessoa13 = spark.read.parquet(CENSUS_DATA_DIR + "tb_pessoa13.parquet")

# resposavel - household responsible entities:
tb_responsible02 = spark.read.parquet(CENSUS_DATA_DIR + "tb_responsavel02.parquet")

In [63]:
# listing the number of census sectors:
tb_basico.count()  # joins show end up with the same number of rows (distinct sectors)

18363

In [64]:
# joining all the datasets before selecting:
tb_census = tb_basico.join(
    tb_domicilio01, on=["sector_code", "sector_situation_code"], how="left"
)
tb_census = tb_census.join(
    tb_domicilio02, on=["sector_code", "sector_situation_code"], how="left"
)
tb_census = tb_census.join(
    tb_pessoa11, on=["sector_code", "sector_situation_code"], how="left"
)
tb_census = tb_census.join(
    tb_pessoa12, on=["sector_code", "sector_situation_code"], how="left"
)
tb_census = tb_census.join(
    tb_responsible02, on=["sector_code", "sector_situation_code"], how="left"
)

tb_census = tb_census.join(
    tb_pessoa01, on=["sector_code", "sector_situation_code"], how="left"
)

In [65]:
# verifying the data integrity:
assert (
    tb_basico.count() == tb_census.count()
), "There are more rows than expected in the final file"

In [66]:
# selecting the columns that are relevant from the codebook:
col_selection = df_codebook.filter(F.col("is_selected") == 1)

In [67]:
# keeping the columns of interest:
cols_to_keep = get_column_values(col_selection, "normalized_variable")

In [68]:
# dropping the columns that are not part of the selection:
tb_census = tb_census.drop(
    *[col for col in tb_census.columns if col not in cols_to_keep]
)

# 5. Preparing features at the units of interest level

As previously described in the `1.0 Data Understanding - All Datasets.ipynb` notebook, we have several units of interest for the datasets we are working with. These are the following:

1. `sector`: the lowest unit of measurement for the Brazilian Census, which is one of most important geospatially referenced datasets we will be working with;
2. `zipcode`: zip codes in the city of São Paulo can be roughly approximated to an entire street (also called a logradouro);
3. `area_of_ponderation`: areas of ponderation are contiguous groups of census sectors;
4. `neighborhoods`: areas that are often (but not directly) related to the neighborhoods of the city;
5. `districts`: districts are administrative regions defined by law (and thus, won't change much over time), used to allocate resources by the City Hall;

The map below illustrates this relationship:

![Map illustrating the different layers](../references/img/layers_map_example.png)

We will aggregate the features at the levels above and maintain them in their raw state. Note: the zip code data will be processed in a different notebook as it requires a different processing approach.

In [69]:
# adding the geometry dataframes to the SQL Context:
df_ap.createOrReplaceTempView("tb_ponderation")
df_nb.createOrReplaceTempView("tb_neighborhood")
df_district.createOrReplaceTempView("tb_district")

In [70]:
# adding the raw features to the geometry:
df_sector_features = (
    df_sector.join(
        tb_census, (df_sector.sector_code == tb_census.sector_code), how="left"
    )
    .drop(tb_census.neighborhood_code)
    .drop(tb_census.sector_code)
)

In [71]:
# retrieving the number of rows after the join:
df_sector_features.count()

18953

In [72]:
# removing duplicate columns from the ipvs and density dataframes:
df_ipvs_clean = df_ipvs_clean.drop(
    "id",
    "sector_type",
    "city_code_census",
    "city_code_ipvs",
    "city_name",
    "metropolitan_region_code",
    "metropolitan_region_name",
    "administrative_region_code",
    "administrative_region_name",
    "district_name",
    "district_code",
    "sector_situation",
    "population_rural_areas_city",
    "population_urban_areas_city",
    "population_city",
    "population_permanent_households_city",
    "population_improvised_households_city",
    "population_collective_households_city",
    "geometry",
)

df_density = df_density.drop("geometry")

In [73]:
# adding the ipvs and density featurs:
df_sector_features = df_sector_features.join(
    df_ipvs_clean, on=["sector_code"], how="left"
).join(df_density, on=["sector_code"], how="left")

In [74]:
# listing columns with null values:
get_null_columns(df_sector_features)

-RECORD 0------------------------------------------------------------------------------------------------------------------
 sector_code                                                                                                       | 0     
 id                                                                                                                | 0     
 geometry                                                                                                          | 0     
 city_code                                                                                                         | 0     
 neighborhood_code                                                                                                 | 0     
 city                                                                                                              | 0     
 neighborhood                                                                                                      | 0     
 sector_

In [75]:
# dropping null rows from the join (represent sectors that are not present in the area of study):
df_sector_features = df_sector_features.na.drop(
    subset=["code_large_regions_geographical_regions"]
)

# dropping duplicates
df_sector_features = df_sector_features.drop_duplicates(subset=["sector_code"])

In [76]:
# removing the missing values gets us back to the correct number (the ones that match the census exactly)
df_sector_features.count()

18363

In [77]:
# adding the resulting dataframe to the SQL Context:
df_sector_features.createOrReplaceTempView("tb_sector_features")

In [78]:
# adding the sectors back to the main dataframe:
df_sector_final = df_sector_features.join(
    df_sector.drop(
        "centroid_latitude",
        "centroid_longitude",
        "city_code",
        "city",
        "geometry",
        "id",
        "neighborhood_code",
        "neighborhood",
        "sector_centroid",
        "sector_type",
    ),
    on=["sector_code"],
    how="inner",
)

In [79]:
# save the results to the specified directory:
SECTOR_OUTPUT = f"sectors/tb_sectors_census"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

save_to_filesystem(
    df_sector_final,
    PROCESSED_UNITS_OF_INTEREST,
    SECTOR_OUTPUT,
    SECTOR_OUTPUT + ".parquet",
)

True

In [81]:
SECTOR_NO_GEO = f"sectors/tb_sectors_census_no_geo"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

df_sector_no_geo = df_sector.drop("geometry")

save_to_filesystem(
    df_sector_no_geo,
    PROCESSED_UNITS_OF_INTEREST,
    SECTOR_NO_GEO,
    SECTOR_NO_GEO + ".parquet",
)

True

## 5.1 Area of Ponderation

In [85]:
# adding the centroid and area of the polygons:
Q_AP_MATCH = """
SELECT 
    A.ponderation_area_code,
    B.sector_code
FROM tb_ponderation as A, tb_sector_features as B
WHERE ST_Contains(A.geometry, B.sector_centroid)
"""

# matching the areas of ponderation to their sectors:
df_ap_match = spark.sql(Q_AP_MATCH)

In [86]:
# adding the resulting dataframe to the SQL Context:
df_ap_match.createOrReplaceTempView("tb_ap_match")

In [87]:
# adding the features to the ponderation area back:
Q_AP_RAW_FEATURES = """
SELECT
    A.ponderation_area_code,
    B.*
FROM tb_ap_match as A
LEFT JOIN tb_sector_features as B
ON A.sector_code = B.sector_code
"""

# reading back the dataset:
df_ap_raw_features = spark.sql(Q_AP_RAW_FEATURES)

# dropping the geometry columns and looking at the results:
df_ap_raw_features = df_ap_raw_features.drop("geometry", "sector_centroid")

# dropping duplicates:
df_ap_raw_features = df_ap_raw_features.drop_duplicates()

In [88]:
df_ap_raw_features.printSchema()

root
 |-- ponderation_area_code: string (nullable = true)
 |-- sector_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- neighborhood_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- sector_type: string (nullable = true)
 |-- centroid_longitude: double (nullable = true)
 |-- centroid_latitude: double (nullable = true)
 |-- sector_situation_code: string (nullable = true)
 |-- code_large_regions_geographical_regions: string (nullable = true)
 |-- name_large_regions_geographical_regions: string (nullable = true)
 |-- federation_unit_code: string (nullable = true)
 |-- name_federation_unit: string (nullable = true)
 |-- mesoregion_code: string (nullable = true)
 |-- name_mesoregion: string (nullable = true)
 |-- microregion_code: string (nullable = true)
 |-- name_microregion: string (nullable = true)
 |-- metropolitan_region_code_or_ride: string (nullable = true)


### 5.1.1 Aggregating Raw Area of Ponderation Features

In [107]:
df_ap_agg = df_ap_raw_features.groupby("ponderation_area_code").agg(
    F.sum(F.col("population")).alias("population"),
    F.sum(F.col("alphabetized_people_with_5_or_more_years_age")).alias(
        "alphabetized_population"
    ),
    F.sum(F.when(F.col("ipvs_group") > 5, 1).otherwise(0)).alias(
        "n_regions_high_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") == 5, 1).otherwise(0)).alias(
        "n_regions_medium_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") < 5, 1).otherwise(0)).alias(
        "n_regions_low_social_vulnerability"
    ),
    F.max(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "max_ipvs_group"
    ),
    F.min(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "min_ipvs_group"
    ),
    F.avg(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "avg_ipvs_group"
    ),
    F.sum(F.col("area_in_hectares")).alias("area_in_hectares"),
    F.avg("population_density").alias("average_population_density"),
    F.avg(F.col("average_age_responsible_people")).alias(
        "average_age_household_leaders"
    ),
    F.avg(F.col("average_age_women_responsible")).alias(
        "average_age_women_household_leaders"
    ),
    F.avg(F.col("average_household_income")).alias("average_household_income"),
    F.avg(F.col("average_income_household_responsible")).alias(
        "average_income_household_leaders"
    ),
    F.avg(F.col("average_income_women_responsible")).alias(
        "average_income_women_household_leaders"
    ),
    F.avg(
        F.col(
            "average_number_dwellers_permanent_private_households_obtained_by_var2_division_by_var1"
        )
    ).alias("average_residents_per_household"),
    F.avg(F.col("average_residents_permanent_households")).alias(
        "average_residents_in_permanent_households"
    ),
    F.avg(F.col("children_only_from_person_responsible_private_households")).alias(
        "average_number_children"
    ),
    F.sum(F.col("dwellers_permanent_private_households_and_acquisition")).alias(
        "residents_in_households_acquisition"
    ),
    F.avg(F.col("factor_1")).alias("average_ipvs_factor_1"),
    F.avg(F.col("factor_2")).alias("average_ipvs_factor_2"),
    F.avg(F.col("factor_1_rural")).alias("average_ipvs_factor_1_rural"),
    F.avg(F.col("factor_2_rural")).alias("average_ipvs_factor_2_rural"),
    F.sum(
        F.col("household_employees_private_households_males")
        + F.col("household_employees_private_households")
    ).alias("number_house_workers"),
    F.sum(F.col("households")).alias("households"),
    F.sum(F.col("households_permanent")).alias("permanent_housholds"),
    F.sum(F.col("improvised_households")).alias("improvised_households"),
    F.sum(F.when(F.col("is_subnormal_aglomerate") == True, 1).otherwise(0)).alias(
        "number_subnormal_aglomerates"
    ),
    F.avg(F.col("per_capita_income")).alias("average_per_capita_income"),
    F.sum(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "permanent_private_houses_in_acquisition"
    ),
    F.sum(F.col("permanent_private_home_type_households_and_clear")).alias(
        "permanent_private_houses_fully_owned"
    ),
    F.sum(F.col("permanent_private_household_homes_rented")).alias(
        "permanent_private_rented_properties"
    ),
    F.sum(F.col("permanent_private_households_acquisition")).alias(
        "permanent_private_properties_in_acquisition"
    ),
    F.sum(F.col("permanent_private_households_and_disposed")).alias(
        "permanent_private_properties_fully_owned"
    ),
    F.sum(F.col("permanent_private_households_rented")).alias(
        "permanent_private_properties_rented"
    ),
    F.sum(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "permanent_private_households_with_10_or_more_residents"
    ),
    F.sum(F.col("permanent_private_households_with_1_resident")).alias(
        "permanent_private_households_with_1_resident"
    ),
    F.sum(F.col("permanent_private_households_with_2_residents")).alias(
        "permanent_private_households_with_2_residents"
    ),
    F.sum(F.col("permanent_private_households_with_3_residents")).alias(
        "permanent_private_households_with_3_residents"
    ),
    F.sum(F.col("permanent_private_households_with_4_residents")).alias(
        "permanent_private_households_with_4_residents"
    ),
    F.sum(F.col("permanent_private_households_with_5_residents")).alias(
        "permanent_private_households_with_5_residents"
    ),
    F.sum(F.col("permanent_private_households_with_6_residents")).alias(
        "permanent_private_households_with_6_residents"
    ),
    F.sum(F.col("permanent_private_households_with_7_residents")).alias(
        "permanent_private_households_with_7_residents"
    ),
    F.sum(F.col("permanent_private_households_with_8_residents")).alias(
        "permanent_private_households_with_8_residents"
    ),
    F.sum(F.col("permanent_private_households_with_9_residents")).alias(
        "permanent_private_households_with_9_residents"
    ),
    F.sum(F.col("permanent_private_households_with_electricity")).alias(
        "permanent_private_households_with_electricity"
    ),
    F.sum(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias("permanent_private_households_without_exclusive_use_bathroom_residents"),
    F.avg(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "average_permanent_private_houses_in_acquisition"
    ),
    F.avg(F.col("permanent_private_home_type_households_and_clear")).alias(
        "average_permanent_private_houses_fully_owned"
    ),
    F.avg(F.col("permanent_private_household_homes_rented")).alias(
        "average_permanent_private_rented_properties"
    ),
    F.avg(F.col("permanent_private_households_acquisition")).alias(
        "average_permanent_private_properties_in_acquisition"
    ),
    F.avg(F.col("permanent_private_households_and_disposed")).alias(
        "average_permanent_private_properties_fully_owned"
    ),
    F.avg(F.col("permanent_private_households_rented")).alias(
        "average_permanent_private_properties_rented"
    ),
    F.avg(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "average_permanent_private_households_with_10_or_more_residents"
    ),
    F.avg(F.col("permanent_private_households_with_1_resident")).alias(
        "average_permanent_private_households_with_1_resident"
    ),
    F.avg(F.col("permanent_private_households_with_2_residents")).alias(
        "average_permanent_private_households_with_2_residents"
    ),
    F.avg(F.col("permanent_private_households_with_3_residents")).alias(
        "average_permanent_private_households_with_3_residents"
    ),
    F.avg(F.col("permanent_private_households_with_4_residents")).alias(
        "average_permanent_private_households_with_4_residents"
    ),
    F.avg(F.col("permanent_private_households_with_5_residents")).alias(
        "average_permanent_private_households_with_5_residents"
    ),
    F.avg(F.col("permanent_private_households_with_6_residents")).alias(
        "average_permanent_private_households_with_6_residents"
    ),
    F.avg(F.col("permanent_private_households_with_7_residents")).alias(
        "average_permanent_private_households_with_7_residents"
    ),
    F.avg(F.col("permanent_private_households_with_8_residents")).alias(
        "average_permanent_private_households_with_8_residents"
    ),
    F.avg(F.col("permanent_private_households_with_9_residents")).alias(
        "average_permanent_private_households_with_9_residents"
    ),
    F.avg(F.col("permanent_private_households_with_electricity")).alias(
        "average_permanent_private_households_with_electricity"
    ),
    F.avg(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias(
        "average_permanent_private_households_without_exclusive_use_bathroom_residents"
    ),
    F.avg(F.col("prop_households_electricity")).alias(
        "average_proportion_households_electricity"
    ),
    F.avg(F.col("prop_households_no_income")).alias(
        "average_proportion_households_no_income"
    ),
    F.avg(F.col("prop_households_one_halft_sm_income")).alias(
        "average_proportion_households_one_halft_sm_income"
    ),
    F.avg(F.col("prop_households_sanitation")).alias(
        "average_proportion_households_sanitation"
    ),
    F.avg(F.col("prop_households_trash_collection")).alias(
        "average_proportion_households_trash_collection"
    ),
    F.avg(F.col("prop_households_two_or_more_income")).alias(
        "average_proportion_households_two_or_more_income"
    ),
    F.avg(F.col("prop_households_two_sm_income")).alias(
        "average_proportion_households_two_sm_income"
    ),
    F.avg(F.col("prop_househols_one_eigth_sm_income")).alias(
        "average_proportion_househols_one_eigth_sm_income"
    ),
    F.avg(F.col("prop_income_participation_in_household_responsible")).alias(
        "average_proportion_income_participation_in_household_responsible"
    ),
    F.avg(F.col("prop_literate_people")).alias("average_proportion_literate_people"),
    F.avg(F.col("prop_literate_women")).alias("average_proportion_literate_women"),
    F.avg(F.col("prop_private_households_one_half_sm_income")).alias(
        "average_proportion_private_households_one_half_sm_income"
    ),
    F.avg(F.col("prop_private_households_one_quarter_sm_income")).alias(
        "average_proportion_private_households_one_quarter_sm_income"
    ),
    F.avg(F.col("prop_women_up_to_30_years")).alias(
        "average_proportion_women_up_to_30_years"
    ),
    F.avg(F.col("prop_0_5_age_population")).alias(
        "average_proportion_population_up_to_5_years_age"
    ),
    F.avg(F.col("prop_households_access_water")).alias(
        "average_prop_households_access_water"
    ),
    F.avg(F.col("prop_responsible_people_up_to_30_years")).alias(
        "average_proportion_responsible_people_up_to_30_years"
    ),
    F.avg(
        F.col(
            "value_average_monthly_nominal_income_persons_responsible_for_permanent_private_households_with_and_without_income"
        )
    ).alias("average_monthly_income"),
    F.sum(F.col("total_responsible_people")).alias("population_house_leaders"),
    F.sum(F.col("residents_permanent_permanent_households")).alias(
        "residents_in_permanent_households"
    ),
    F.sum(F.col("residents_permanent_private_households_apartment_type")).alias(
        "residents_in_apartments"
    ),
    F.sum(
        F.col(
            "residents_permanent_private_households_or_population_residing_permanent_private_households"
        )
    ).alias("residents_in_private_households"),
    F.sum(F.col("residents_permanent_private_households_rented")).alias(
        "residents_in_rented_properties"
    ),
    F.sum(F.col("residents_private_households_and_collective_households")).alias(
        "residents_in_collective_households"
    ),
    F.sum(F.col("permanent_private_households_with_3_bathrooms_residents")).alias(
        "permanent_private_households_with_up_to_3_bathrooms"
    ),
)

# adding literacy rate column:
df_ap_agg = df_ap_agg.withColumn(
    "literacy_rate", F.col("population") / F.col("alphabetized_population")
)

In [108]:
# fixing the column names:
new_columns = list(map(lambda col: f"ponderation_area_{col}", df_ap_agg.columns))

for i in range(len(df_ap_agg.columns)):
    df_ap_agg = df_ap_agg.withColumnRenamed(df_ap_agg.columns[i], new_columns[i])

# removing the extraneous columns:
df_ap_agg = df_ap_agg.withColumnRenamed(
    "ponderation_area_ponderation_area_code", "ponderation_area_code"
)

# sorting the dataframe columns:
df_ap_agg = sort_dataframe_columns(df_ap_agg, "ponderation_area_code")

# adding the resulting dataframe to the SQL Context:
df_ap_agg.createOrReplaceTempView("tb_ponderation_raw_features")

In [110]:
# generating the final level of aggregation for the census features:
Q_AP_GEOM = """
SELECT
    A.*,
    B.geometry,
    ST_Centroid(B.geometry) as ponderation_area_centroid
FROM tb_ponderation_raw_features as A
LEFT JOIN tb_ponderation as B 
ON A.ponderation_area_code = B.ponderation_area_code
"""

df_ap_final = spark.sql(Q_AP_GEOM)

### 5.1.2 Exporting Area of Ponderation level Results

In [111]:
# save the results to the specified directory:
AP_OUTPUT = f"areas_of_ponderation/tb_area_ponderation_census"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

save_to_filesystem(
    df_ap_final, PROCESSED_UNITS_OF_INTEREST, AP_OUTPUT, AP_OUTPUT + ".parquet"
)

True

In [112]:
# saving without the geometry:
df_ap_no_geo = df_ap_final.drop("geometry", "ponderation_area_centroid")

AP_NO_GEO = f"areas_of_ponderation/tb_area_ponderation_census_no_geo"

save_to_filesystem(
    df_ap_final, PROCESSED_UNITS_OF_INTEREST, AP_NO_GEO, AP_NO_GEO + ".parquet"
)

True

## 5.2 Neighborhoods

In [113]:
# adding the centroid and area of the polygons:
Q_NEIGHBORHOOD_MATCH = """
SELECT 
    A.Name as neighborhood_name,
    B.sector_code
FROM tb_neighborhood as A, tb_sector_features as B
WHERE ST_Contains(A.geometry, B.sector_centroid)
"""

# matching the areas of ponderation to their sectors:
df_nb_match = spark.sql(Q_NEIGHBORHOOD_MATCH)

# adding the resulting dataframe to the SQL Context:
df_nb_match.createOrReplaceTempView("tb_nb_match")

In [114]:
# adding the features to the ponderation area back:
Q_NB_RAW_FEATURES = """
SELECT
    A.neighborhood_name,
    B.*
FROM tb_nb_match as A
LEFT JOIN tb_sector_features as B
ON A.sector_code = B.sector_code
"""

# reading back the dataset:
df_nb_raw_features = spark.sql(Q_NB_RAW_FEATURES)

# dropping the geometry columns and looking at the results:
df_nb_raw_features = df_nb_raw_features.drop("geometry", "sector_centroid")

In [115]:
# verifying the data integrity:
assert (
    df_nb_raw_features.count() == df_nb_match.count()
), "There are more rows than expected in the final file"

### 5.2.1 Aggregating Raw Neighborhood Features

In [118]:
df_nb_agg = df_nb_raw_features.groupby("neighborhood_name").agg(
    F.sum(F.col("population")).alias("population"),
    F.sum(F.col("alphabetized_people_with_5_or_more_years_age")).alias(
        "alphabetized_population"
    ),
    F.sum(F.when(F.col("ipvs_group") > 5, 1).otherwise(0)).alias(
        "n_regions_high_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") == 5, 1).otherwise(0)).alias(
        "n_regions_medium_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") < 5, 1).otherwise(0)).alias(
        "n_regions_low_social_vulnerability"
    ),
    F.max(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "max_ipvs_group"
    ),
    F.min(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "min_ipvs_group"
    ),
    F.avg(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "avg_ipvs_group"
    ),
    F.sum(F.col("area_in_hectares")).alias("area_in_hectares"),
    F.avg("population_density").alias("average_population_density"),
    F.avg(F.col("average_age_responsible_people")).alias(
        "average_age_household_leaders"
    ),
    F.avg(F.col("average_age_women_responsible")).alias(
        "average_age_women_household_leaders"
    ),
    F.avg(F.col("average_household_income")).alias("average_household_income"),
    F.avg(F.col("average_income_household_responsible")).alias(
        "average_income_household_leaders"
    ),
    F.avg(F.col("average_income_women_responsible")).alias(
        "average_income_women_household_leaders"
    ),
    F.avg(
        F.col(
            "average_number_dwellers_permanent_private_households_obtained_by_var2_division_by_var1"
        )
    ).alias("average_residents_per_household"),
    F.avg(F.col("average_residents_permanent_households")).alias(
        "average_residents_in_permanent_households"
    ),
    F.avg(F.col("children_only_from_person_responsible_private_households")).alias(
        "average_number_children"
    ),
    F.sum(F.col("dwellers_permanent_private_households_and_acquisition")).alias(
        "residents_in_households_acquisition"
    ),
    F.avg(F.col("factor_1")).alias("average_ipvs_factor_1"),
    F.avg(F.col("factor_2")).alias("average_ipvs_factor_2"),
    F.avg(F.col("factor_1_rural")).alias("average_ipvs_factor_1_rural"),
    F.avg(F.col("factor_2_rural")).alias("average_ipvs_factor_2_rural"),
    F.sum(
        F.col("household_employees_private_households_males")
        + F.col("household_employees_private_households")
    ).alias("number_house_workers"),
    F.sum(F.col("households")).alias("households"),
    F.sum(F.col("households_permanent")).alias("permanent_housholds"),
    F.sum(F.col("improvised_households")).alias("improvised_households"),
    F.sum(F.when(F.col("is_subnormal_aglomerate") == True, 1).otherwise(0)).alias(
        "number_subnormal_aglomerates"
    ),
    F.avg(F.col("per_capita_income")).alias("average_per_capita_income"),
    F.sum(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "permanent_private_houses_in_acquisition"
    ),
    F.sum(F.col("permanent_private_home_type_households_and_clear")).alias(
        "permanent_private_houses_fully_owned"
    ),
    F.sum(F.col("permanent_private_household_homes_rented")).alias(
        "permanent_private_rented_properties"
    ),
    F.sum(F.col("permanent_private_households_acquisition")).alias(
        "permanent_private_properties_in_acquisition"
    ),
    F.sum(F.col("permanent_private_households_and_disposed")).alias(
        "permanent_private_properties_fully_owned"
    ),
    F.sum(F.col("permanent_private_households_rented")).alias(
        "permanent_private_properties_rented"
    ),
    F.sum(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "permanent_private_households_with_10_or_more_residents"
    ),
    F.sum(F.col("permanent_private_households_with_1_resident")).alias(
        "permanent_private_households_with_1_resident"
    ),
    F.sum(F.col("permanent_private_households_with_2_residents")).alias(
        "permanent_private_households_with_2_residents"
    ),
    F.sum(F.col("permanent_private_households_with_3_residents")).alias(
        "permanent_private_households_with_3_residents"
    ),
    F.sum(F.col("permanent_private_households_with_4_residents")).alias(
        "permanent_private_households_with_4_residents"
    ),
    F.sum(F.col("permanent_private_households_with_5_residents")).alias(
        "permanent_private_households_with_5_residents"
    ),
    F.sum(F.col("permanent_private_households_with_6_residents")).alias(
        "permanent_private_households_with_6_residents"
    ),
    F.sum(F.col("permanent_private_households_with_7_residents")).alias(
        "permanent_private_households_with_7_residents"
    ),
    F.sum(F.col("permanent_private_households_with_8_residents")).alias(
        "permanent_private_households_with_8_residents"
    ),
    F.sum(F.col("permanent_private_households_with_9_residents")).alias(
        "permanent_private_households_with_9_residents"
    ),
    F.sum(F.col("permanent_private_households_with_electricity")).alias(
        "permanent_private_households_with_electricity"
    ),
    F.sum(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias("permanent_private_households_without_exclusive_use_bathroom_residents"),
    F.avg(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "average_permanent_private_houses_in_acquisition"
    ),
    F.avg(F.col("permanent_private_home_type_households_and_clear")).alias(
        "average_permanent_private_houses_fully_owned"
    ),
    F.avg(F.col("permanent_private_household_homes_rented")).alias(
        "average_permanent_private_rented_properties"
    ),
    F.avg(F.col("permanent_private_households_acquisition")).alias(
        "average_permanent_private_properties_in_acquisition"
    ),
    F.avg(F.col("permanent_private_households_and_disposed")).alias(
        "average_permanent_private_properties_fully_owned"
    ),
    F.avg(F.col("permanent_private_households_rented")).alias(
        "average_permanent_private_properties_rented"
    ),
    F.avg(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "average_permanent_private_households_with_10_or_more_residents"
    ),
    F.avg(F.col("permanent_private_households_with_1_resident")).alias(
        "average_permanent_private_households_with_1_resident"
    ),
    F.avg(F.col("permanent_private_households_with_2_residents")).alias(
        "average_permanent_private_households_with_2_residents"
    ),
    F.avg(F.col("permanent_private_households_with_3_residents")).alias(
        "average_permanent_private_households_with_3_residents"
    ),
    F.avg(F.col("permanent_private_households_with_4_residents")).alias(
        "average_permanent_private_households_with_4_residents"
    ),
    F.avg(F.col("permanent_private_households_with_5_residents")).alias(
        "average_permanent_private_households_with_5_residents"
    ),
    F.avg(F.col("permanent_private_households_with_6_residents")).alias(
        "average_permanent_private_households_with_6_residents"
    ),
    F.avg(F.col("permanent_private_households_with_7_residents")).alias(
        "average_permanent_private_households_with_7_residents"
    ),
    F.avg(F.col("permanent_private_households_with_8_residents")).alias(
        "average_permanent_private_households_with_8_residents"
    ),
    F.avg(F.col("permanent_private_households_with_9_residents")).alias(
        "average_permanent_private_households_with_9_residents"
    ),
    F.avg(F.col("permanent_private_households_with_electricity")).alias(
        "average_permanent_private_households_with_electricity"
    ),
    F.avg(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias(
        "average_permanent_private_households_without_exclusive_use_bathroom_residents"
    ),
    F.avg(F.col("prop_households_electricity")).alias(
        "average_proportion_households_electricity"
    ),
    F.avg(F.col("prop_households_no_income")).alias(
        "average_proportion_households_no_income"
    ),
    F.avg(F.col("prop_households_one_halft_sm_income")).alias(
        "average_proportion_households_one_halft_sm_income"
    ),
    F.avg(F.col("prop_households_sanitation")).alias(
        "average_proportion_households_sanitation"
    ),
    F.avg(F.col("prop_households_trash_collection")).alias(
        "average_proportion_households_trash_collection"
    ),
    F.avg(F.col("prop_households_two_or_more_income")).alias(
        "average_proportion_households_two_or_more_income"
    ),
    F.avg(F.col("prop_households_two_sm_income")).alias(
        "average_proportion_households_two_sm_income"
    ),
    F.avg(F.col("prop_househols_one_eigth_sm_income")).alias(
        "average_proportion_househols_one_eigth_sm_income"
    ),
    F.avg(F.col("prop_income_participation_in_household_responsible")).alias(
        "average_proportion_income_participation_in_household_responsible"
    ),
    F.avg(F.col("prop_literate_people")).alias("average_proportion_literate_people"),
    F.avg(F.col("prop_literate_women")).alias("average_proportion_literate_women"),
    F.avg(F.col("prop_private_households_one_half_sm_income")).alias(
        "average_proportion_private_households_one_half_sm_income"
    ),
    F.avg(F.col("prop_private_households_one_quarter_sm_income")).alias(
        "average_proportion_private_households_one_quarter_sm_income"
    ),
    F.avg(F.col("prop_women_up_to_30_years")).alias(
        "average_proportion_women_up_to_30_years"
    ),
    F.avg(F.col("prop_0_5_age_population")).alias(
        "average_proportion_population_up_to_5_years_age"
    ),
    F.avg(F.col("prop_households_access_water")).alias(
        "average_prop_households_access_water"
    ),
    F.avg(F.col("prop_responsible_people_up_to_30_years")).alias(
        "average_proportion_responsible_people_up_to_30_years"
    ),
    F.avg(
        F.col(
            "value_average_monthly_nominal_income_persons_responsible_for_permanent_private_households_with_and_without_income"
        )
    ).alias("average_monthly_income"),
    F.sum(F.col("total_responsible_people")).alias("population_house_leaders"),
    F.sum(F.col("residents_permanent_permanent_households")).alias(
        "residents_in_permanent_households"
    ),
    F.sum(F.col("residents_permanent_private_households_apartment_type")).alias(
        "residents_in_apartments"
    ),
    F.sum(
        F.col(
            "residents_permanent_private_households_or_population_residing_permanent_private_households"
        )
    ).alias("residents_in_private_households"),
    F.sum(F.col("residents_permanent_private_households_rented")).alias(
        "residents_in_rented_properties"
    ),
    F.sum(F.col("residents_private_households_and_collective_households")).alias(
        "residents_in_collective_households"
    ),
    F.sum(F.col("permanent_private_households_with_3_bathrooms_residents")).alias(
        "permanent_private_households_with_up_to_3_bathrooms"
    ),
)

# adding literacy rate column:
df_nb_agg = df_nb_agg.withColumn(
    "literacy_rate", F.col("population") / F.col("alphabetized_population")
)

In [119]:
# fixing the column names:
new_columns = list(map(lambda col: f"neighborhood_{col}", df_nb_agg.columns))

for i in range(len(df_ap_agg.columns)):
    df_nb_agg = df_nb_agg.withColumnRenamed(df_nb_agg.columns[i], new_columns[i])

df_nb_agg = df_nb_agg.withColumnRenamed(
    "neighborhood_neighborhood_name", "neighborhood_name"
)

# sorting the dataframe columns:
df_nb_agg = sort_dataframe_columns(df_nb_agg, "neighborhood_name")

In [120]:
# adding the resulting dataframe to the SQL Context:
df_nb_agg.createOrReplaceTempView("tb_neighborhood_features")

# generating the final level of aggregation for the census features:
Q_NB_GEOM = """
SELECT
    A.*,
    B.geometry,
    ST_Centroid(B.geometry) as neighborhood_centroid
FROM tb_neighborhood_features as A
LEFT JOIN tb_neighborhood as B 
ON A.neighborhood_name = B.Name
"""

df_nb_final = spark.sql(Q_NB_GEOM)

### 5.2.2 Exporting Neighborhood level Results

In [121]:
# save the results to the specified directory:
NB_OUTPUT = f"neighborhoods/tb_neighborhood_census"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

save_to_filesystem(
    df_nb_final, PROCESSED_UNITS_OF_INTEREST, NB_OUTPUT, NB_OUTPUT + ".parquet"
)

True

In [122]:
# saving the results the results without geometries:
NB_NO_GEO = f"neighborhoods/tb_neighborhood_census_no_geo"

df_nb_no_geo = df_nb_final.drop("geometry", "neighborhood_centroid")

save_to_filesystem(
    df_nb_no_geo, PROCESSED_UNITS_OF_INTEREST, NB_NO_GEO, NB_NO_GEO + ".parquet"
)

True

## 5.3 Districts

In [123]:
# adding the centroid and area of the polygons:
Q_DS_MATCH = """
SELECT 
    A.district_name,
    B.sector_code
FROM tb_district as A, tb_sector_features as B
WHERE ST_Contains(A.geometry, B.sector_centroid)
"""

# matching the areas of ponderation to their sectors:
df_ds_match = spark.sql(Q_DS_MATCH)

In [124]:
# adding the resulting dataframe to the SQL Context:
df_ds_match.createOrReplaceTempView("tb_ds_match")
df_sector_features = df_sector_features.drop("district_name")
df_sector_features.createOrReplaceTempView("tb_sector_features")

In [125]:
# adding the features to the ponderation area back:
Q_DS_RAW_FEATURES = """
SELECT
    A.district_name,
    B.*
FROM tb_ds_match as A
LEFT JOIN tb_sector_features as B
ON A.sector_code = B.sector_code
"""

# reading back the dataset:
df_ds_raw_features = spark.sql(Q_DS_RAW_FEATURES)

# dropping the geometry columns and looking at the results:
df_ds_raw_features = df_ds_raw_features.drop("geometry", "sector_centroid")

### 5.3.1 Aggregating Raw District Features

In [126]:
df_ds_agg = df_ds_raw_features.groupby("district_name").agg(
    F.sum(F.col("population")).alias("population"),
    F.sum(F.col("alphabetized_people_with_5_or_more_years_age")).alias(
        "alphabetized_population"
    ),
    F.sum(F.when(F.col("ipvs_group") > 5, 1).otherwise(0)).alias(
        "n_regions_high_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") == 5, 1).otherwise(0)).alias(
        "n_regions_medium_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") < 5, 1).otherwise(0)).alias(
        "n_regions_low_social_vulnerability"
    ),
    F.max(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "max_ipvs_group"
    ),
    F.min(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "min_ipvs_group"
    ),
    F.avg(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "avg_ipvs_group"
    ),
    F.sum(F.col("area_in_hectares")).alias("area_in_hectares"),
    F.avg("population_density").alias("average_population_density"),
    F.avg(F.col("average_age_responsible_people")).alias(
        "average_age_household_leaders"
    ),
    F.avg(F.col("average_age_women_responsible")).alias(
        "average_age_women_household_leaders"
    ),
    F.avg(F.col("average_household_income")).alias("average_household_income"),
    F.avg(F.col("average_income_household_responsible")).alias(
        "average_income_household_leaders"
    ),
    F.avg(F.col("average_income_women_responsible")).alias(
        "average_income_women_household_leaders"
    ),
    F.avg(
        F.col(
            "average_number_dwellers_permanent_private_households_obtained_by_var2_division_by_var1"
        )
    ).alias("average_residents_per_household"),
    F.avg(F.col("average_residents_permanent_households")).alias(
        "average_residents_in_permanent_households"
    ),
    F.avg(F.col("children_only_from_person_responsible_private_households")).alias(
        "average_number_children"
    ),
    F.sum(F.col("dwellers_permanent_private_households_and_acquisition")).alias(
        "residents_in_households_acquisition"
    ),
    F.avg(F.col("factor_1")).alias("average_ipvs_factor_1"),
    F.avg(F.col("factor_2")).alias("average_ipvs_factor_2"),
    F.avg(F.col("factor_1_rural")).alias("average_ipvs_factor_1_rural"),
    F.avg(F.col("factor_2_rural")).alias("average_ipvs_factor_2_rural"),
    F.sum(
        F.col("household_employees_private_households_males")
        + F.col("household_employees_private_households")
    ).alias("number_house_workers"),
    F.sum(F.col("households")).alias("households"),
    F.sum(F.col("households_permanent")).alias("permanent_housholds"),
    F.sum(F.col("improvised_households")).alias("improvised_households"),
    F.sum(F.when(F.col("is_subnormal_aglomerate") == True, 1).otherwise(0)).alias(
        "number_subnormal_aglomerates"
    ),
    F.avg(F.col("per_capita_income")).alias("average_per_capita_income"),
    F.sum(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "permanent_private_houses_in_acquisition"
    ),
    F.sum(F.col("permanent_private_home_type_households_and_clear")).alias(
        "permanent_private_houses_fully_owned"
    ),
    F.sum(F.col("permanent_private_household_homes_rented")).alias(
        "permanent_private_rented_properties"
    ),
    F.sum(F.col("permanent_private_households_acquisition")).alias(
        "permanent_private_properties_in_acquisition"
    ),
    F.sum(F.col("permanent_private_households_and_disposed")).alias(
        "permanent_private_properties_fully_owned"
    ),
    F.sum(F.col("permanent_private_households_rented")).alias(
        "permanent_private_properties_rented"
    ),
    F.sum(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "permanent_private_households_with_10_or_more_residents"
    ),
    F.sum(F.col("permanent_private_households_with_1_resident")).alias(
        "permanent_private_households_with_1_resident"
    ),
    F.sum(F.col("permanent_private_households_with_2_residents")).alias(
        "permanent_private_households_with_2_residents"
    ),
    F.sum(F.col("permanent_private_households_with_3_residents")).alias(
        "permanent_private_households_with_3_residents"
    ),
    F.sum(F.col("permanent_private_households_with_4_residents")).alias(
        "permanent_private_households_with_4_residents"
    ),
    F.sum(F.col("permanent_private_households_with_5_residents")).alias(
        "permanent_private_households_with_5_residents"
    ),
    F.sum(F.col("permanent_private_households_with_6_residents")).alias(
        "permanent_private_households_with_6_residents"
    ),
    F.sum(F.col("permanent_private_households_with_7_residents")).alias(
        "permanent_private_households_with_7_residents"
    ),
    F.sum(F.col("permanent_private_households_with_8_residents")).alias(
        "permanent_private_households_with_8_residents"
    ),
    F.sum(F.col("permanent_private_households_with_9_residents")).alias(
        "permanent_private_households_with_9_residents"
    ),
    F.sum(F.col("permanent_private_households_with_electricity")).alias(
        "permanent_private_households_with_electricity"
    ),
    F.sum(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias("permanent_private_households_without_exclusive_use_bathroom_residents"),
    F.avg(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "average_permanent_private_houses_in_acquisition"
    ),
    F.avg(F.col("permanent_private_home_type_households_and_clear")).alias(
        "average_permanent_private_houses_fully_owned"
    ),
    F.avg(F.col("permanent_private_household_homes_rented")).alias(
        "average_permanent_private_rented_properties"
    ),
    F.avg(F.col("permanent_private_households_acquisition")).alias(
        "average_permanent_private_properties_in_acquisition"
    ),
    F.avg(F.col("permanent_private_households_and_disposed")).alias(
        "average_permanent_private_properties_fully_owned"
    ),
    F.avg(F.col("permanent_private_households_rented")).alias(
        "average_permanent_private_properties_rented"
    ),
    F.avg(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "average_permanent_private_households_with_10_or_more_residents"
    ),
    F.avg(F.col("permanent_private_households_with_1_resident")).alias(
        "average_permanent_private_households_with_1_resident"
    ),
    F.avg(F.col("permanent_private_households_with_2_residents")).alias(
        "average_permanent_private_households_with_2_residents"
    ),
    F.avg(F.col("permanent_private_households_with_3_residents")).alias(
        "average_permanent_private_households_with_3_residents"
    ),
    F.avg(F.col("permanent_private_households_with_4_residents")).alias(
        "average_permanent_private_households_with_4_residents"
    ),
    F.avg(F.col("permanent_private_households_with_5_residents")).alias(
        "average_permanent_private_households_with_5_residents"
    ),
    F.avg(F.col("permanent_private_households_with_6_residents")).alias(
        "average_permanent_private_households_with_6_residents"
    ),
    F.avg(F.col("permanent_private_households_with_7_residents")).alias(
        "average_permanent_private_households_with_7_residents"
    ),
    F.avg(F.col("permanent_private_households_with_8_residents")).alias(
        "average_permanent_private_households_with_8_residents"
    ),
    F.avg(F.col("permanent_private_households_with_9_residents")).alias(
        "average_permanent_private_households_with_9_residents"
    ),
    F.avg(F.col("permanent_private_households_with_electricity")).alias(
        "average_permanent_private_households_with_electricity"
    ),
    F.avg(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias(
        "average_permanent_private_households_without_exclusive_use_bathroom_residents"
    ),
    F.avg(F.col("prop_households_electricity")).alias(
        "average_proportion_households_electricity"
    ),
    F.avg(F.col("prop_households_no_income")).alias(
        "average_proportion_households_no_income"
    ),
    F.avg(F.col("prop_households_one_halft_sm_income")).alias(
        "average_proportion_households_one_halft_sm_income"
    ),
    F.avg(F.col("prop_households_sanitation")).alias(
        "average_proportion_households_sanitation"
    ),
    F.avg(F.col("prop_households_trash_collection")).alias(
        "average_proportion_households_trash_collection"
    ),
    F.avg(F.col("prop_households_two_or_more_income")).alias(
        "average_proportion_households_two_or_more_income"
    ),
    F.avg(F.col("prop_households_two_sm_income")).alias(
        "average_proportion_households_two_sm_income"
    ),
    F.avg(F.col("prop_househols_one_eigth_sm_income")).alias(
        "average_proportion_househols_one_eigth_sm_income"
    ),
    F.avg(F.col("prop_income_participation_in_household_responsible")).alias(
        "average_proportion_income_participation_in_household_responsible"
    ),
    F.avg(F.col("prop_literate_people")).alias("average_proportion_literate_people"),
    F.avg(F.col("prop_literate_women")).alias("average_proportion_literate_women"),
    F.avg(F.col("prop_private_households_one_half_sm_income")).alias(
        "average_proportion_private_households_one_half_sm_income"
    ),
    F.avg(F.col("prop_private_households_one_quarter_sm_income")).alias(
        "average_proportion_private_households_one_quarter_sm_income"
    ),
    F.avg(F.col("prop_women_up_to_30_years")).alias(
        "average_proportion_women_up_to_30_years"
    ),
    F.avg(F.col("prop_0_5_age_population")).alias(
        "average_proportion_population_up_to_5_years_age"
    ),
    F.avg(F.col("prop_households_access_water")).alias(
        "average_prop_households_access_water"
    ),
    F.avg(F.col("prop_responsible_people_up_to_30_years")).alias(
        "average_proportion_responsible_people_up_to_30_years"
    ),
    F.avg(
        F.col(
            "value_average_monthly_nominal_income_persons_responsible_for_permanent_private_households_with_and_without_income"
        )
    ).alias("average_monthly_income"),
    F.sum(F.col("total_responsible_people")).alias("population_house_leaders"),
    F.sum(F.col("residents_permanent_permanent_households")).alias(
        "residents_in_permanent_households"
    ),
    F.sum(F.col("residents_permanent_private_households_apartment_type")).alias(
        "residents_in_apartments"
    ),
    F.sum(
        F.col(
            "residents_permanent_private_households_or_population_residing_permanent_private_households"
        )
    ).alias("residents_in_private_households"),
    F.sum(F.col("residents_permanent_private_households_rented")).alias(
        "residents_in_rented_properties"
    ),
    F.sum(F.col("residents_private_households_and_collective_households")).alias(
        "residents_in_collective_households"
    ),
    F.sum(F.col("permanent_private_households_with_3_bathrooms_residents")).alias(
        "permanent_private_households_with_up_to_3_bathrooms"
    ),
)

# adding literacy rate column:
df_ds_agg = df_ds_agg.withColumn(
    "literacy_rate", F.col("population") / F.col("alphabetized_population")
)

In [127]:
# fixing the column names:
new_columns = list(map(lambda col: f"district_{col}", df_ds_agg.columns))

for i in range(len(df_ds_agg.columns)):
    df_ds_agg = df_ds_agg.withColumnRenamed(df_ds_agg.columns[i], new_columns[i])

df_ds_agg = df_ds_agg.withColumnRenamed("district_district_name", "district")

# sorting the dataframe columns:
df_ds_agg = sort_dataframe_columns(df_ds_agg, "district")

In [128]:
# adding the resulting dataframe to the SQL Context:
df_ds_agg.createOrReplaceTempView("tb_district_features")

# generating the final level of aggregation for the census features:
Q_DS_GEOM = """
SELECT
    A.*,
    B.geometry,
    ST_Centroid(B.geometry) as district_centroid
FROM tb_district_features as A
LEFT JOIN tb_district as B 
ON A.district = B.district_name
"""

df_ds_final = spark.sql(Q_DS_GEOM)

### 5.3.2 Exporting District level Results

In [129]:
# save the results to the specified directory:
DS_OUTPUT = f"districts/tb_district_census"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

save_to_filesystem(
    df_ds_final, PROCESSED_UNITS_OF_INTEREST, DS_OUTPUT, DS_OUTPUT + ".parquet"
)

True

In [130]:
# saving the results the results without geometries:
DS_NO_GEO = f"districts/tb_district_census_no_geo"

df_ds_no_geo = df_ds_final.drop("geometry", "district_centroid")

save_to_filesystem(
    df_ds_no_geo, PROCESSED_UNITS_OF_INTEREST, DS_NO_GEO, DS_NO_GEO + ".parquet"
)

True

## 5.4 Zip Code

The deal with the Zip codes is that, different from a polygon, it does not represent an enclosed structure. With that, with that, the approach we were developing before that included matching the centroids of the census sectors to the polygon in which they are located. The base shapes are `linestrings` instead. `linestrings` can intersect census sectors, for example. With that, we will have an `intersection` join instead of `containment` spatial join.

Why can't we just calculate the middle point from the line representing the sector instead and use it as a reference? That could certainly work, but it does not capture much of the variation in longer streets (and thus some zipcodes might have less accurate representations). Performing the crossing join is a way to capture more of the variation along the same street.

> **Note**: this can be quite an expensive operation, since we end up with a many-to-many relationship established. One street can cross multiple sectors and sectors can be crossed by multiple streets (zip codes). It should be taken into consideration. 

In [131]:
# calculating the
Q_ZIPCODE_MATCH = """
SELECT 
    A.zipcode,
    B.sector_code
FROM tb_zipcode as A, tb_sector_features as B
WHERE ST_Intersects(B.geometry, A.geometry)
"""

# matching the areas of ponderation to their sectors:
df_zipcode_match = spark.sql(Q_ZIPCODE_MATCH)

# adding the resulting dataframe to the SQL Context:
df_zipcode_match.createOrReplaceTempView("tb_zipcode_match")

In [132]:
# verifying the results:
df_zipcode_match

zipcode,sector_code
4886050,355030855000057
4891110,355030855000042
4891160,355030855000041
4888080,355030855000056
4890490,355030855000031


In [133]:
# counting the number of matches
df_zipcode_match.count()

69557

In [134]:
# number of distinct original zipcodes
df_zipcode.count()

44886

In [372]:
# verifying the number of distinct zipcodes:
df_zipcode_match.select(
    F.countDistinct(F.col("zipcode")).alias("n_distinct_zipcodes")
)  # we lost some zip codes (about 114), that is good enough

n_distinct_zipcodes
44772


In [135]:
# adding the features to the ponderation area back:
Q_ZIPCODE_FEATURES = """
SELECT
    A.zipcode,
    B.*
FROM tb_zipcode_match as A
LEFT JOIN tb_sector_features as B
ON A.sector_code = B.sector_code
"""

# reading back the dataset:
df_zipcode_raw_features = spark.sql(Q_ZIPCODE_FEATURES)

# dropping the geometry columns and looking at the results:
df_zipcode_raw_features = df_zipcode_raw_features.drop("geometry", "sector_centroid")

### 5.4.1 Aggregating the Zip Code Features

In [136]:
df_zip_agg = df_zipcode_raw_features.groupby("zipcode").agg(
    F.sum(F.col("population")).alias("population"),
    F.sum(F.col("alphabetized_people_with_5_or_more_years_age")).alias(
        "alphabetized_population"
    ),
    F.sum(F.when(F.col("ipvs_group") > 5, 1).otherwise(0)).alias(
        "n_regions_high_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") == 5, 1).otherwise(0)).alias(
        "n_regions_medium_social_vulnerability"
    ),
    F.sum(F.when(F.col("ipvs_group") < 5, 1).otherwise(0)).alias(
        "n_regions_low_social_vulnerability"
    ),
    F.max(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "max_ipvs_group"
    ),
    F.min(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "min_ipvs_group"
    ),
    F.avg(F.when(F.col("ipvs_group") != -1, F.col("ipvs_group"))).alias(
        "avg_ipvs_group"
    ),
    F.sum(F.col("area_in_hectares")).alias("area_in_hectares"),
    F.avg("population_density").alias("average_population_density"),
    F.avg(F.col("average_age_responsible_people")).alias(
        "average_age_household_leaders"
    ),
    F.avg(F.col("average_age_women_responsible")).alias(
        "average_age_women_household_leaders"
    ),
    F.avg(F.col("average_household_income")).alias("average_household_income"),
    F.avg(F.col("average_income_household_responsible")).alias(
        "average_income_household_leaders"
    ),
    F.avg(F.col("average_income_women_responsible")).alias(
        "average_income_women_household_leaders"
    ),
    F.avg(
        F.col(
            "average_number_dwellers_permanent_private_households_obtained_by_var2_division_by_var1"
        )
    ).alias("average_residents_per_household"),
    F.avg(F.col("average_residents_permanent_households")).alias(
        "average_residents_in_permanent_households"
    ),
    F.avg(F.col("children_only_from_person_responsible_private_households")).alias(
        "average_number_children"
    ),
    F.sum(F.col("dwellers_permanent_private_households_and_acquisition")).alias(
        "residents_in_households_acquisition"
    ),
    F.avg(F.col("factor_1")).alias("average_ipvs_factor_1"),
    F.avg(F.col("factor_2")).alias("average_ipvs_factor_2"),
    F.avg(F.col("factor_1_rural")).alias("average_ipvs_factor_1_rural"),
    F.avg(F.col("factor_2_rural")).alias("average_ipvs_factor_2_rural"),
    F.sum(
        F.col("household_employees_private_households_males")
        + F.col("household_employees_private_households")
    ).alias("number_house_workers"),
    F.sum(F.col("households")).alias("households"),
    F.sum(F.col("households_permanent")).alias("permanent_housholds"),
    F.sum(F.col("improvised_households")).alias("improvised_households"),
    F.sum(F.when(F.col("is_subnormal_aglomerate") == True, 1).otherwise(0)).alias(
        "number_subnormal_aglomerates"
    ),
    F.avg(F.col("per_capita_income")).alias("average_per_capita_income"),
    F.sum(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "permanent_private_houses_in_acquisition"
    ),
    F.sum(F.col("permanent_private_home_type_households_and_clear")).alias(
        "permanent_private_houses_fully_owned"
    ),
    F.sum(F.col("permanent_private_household_homes_rented")).alias(
        "permanent_private_rented_properties"
    ),
    F.sum(F.col("permanent_private_households_acquisition")).alias(
        "permanent_private_properties_in_acquisition"
    ),
    F.sum(F.col("permanent_private_households_and_disposed")).alias(
        "permanent_private_properties_fully_owned"
    ),
    F.sum(F.col("permanent_private_households_rented")).alias(
        "permanent_private_properties_rented"
    ),
    F.sum(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "permanent_private_households_with_10_or_more_residents"
    ),
    F.sum(F.col("permanent_private_households_with_1_resident")).alias(
        "permanent_private_households_with_1_resident"
    ),
    F.sum(F.col("permanent_private_households_with_2_residents")).alias(
        "permanent_private_households_with_2_residents"
    ),
    F.sum(F.col("permanent_private_households_with_3_residents")).alias(
        "permanent_private_households_with_3_residents"
    ),
    F.sum(F.col("permanent_private_households_with_4_residents")).alias(
        "permanent_private_households_with_4_residents"
    ),
    F.sum(F.col("permanent_private_households_with_5_residents")).alias(
        "permanent_private_households_with_5_residents"
    ),
    F.sum(F.col("permanent_private_households_with_6_residents")).alias(
        "permanent_private_households_with_6_residents"
    ),
    F.sum(F.col("permanent_private_households_with_7_residents")).alias(
        "permanent_private_households_with_7_residents"
    ),
    F.sum(F.col("permanent_private_households_with_8_residents")).alias(
        "permanent_private_households_with_8_residents"
    ),
    F.sum(F.col("permanent_private_households_with_9_residents")).alias(
        "permanent_private_households_with_9_residents"
    ),
    F.sum(F.col("permanent_private_households_with_electricity")).alias(
        "permanent_private_households_with_electricity"
    ),
    F.sum(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias("permanent_private_households_without_exclusive_use_bathroom_residents"),
    F.avg(F.col("permanent_private_home_type_households_and_acquisition")).alias(
        "average_permanent_private_houses_in_acquisition"
    ),
    F.avg(F.col("permanent_private_home_type_households_and_clear")).alias(
        "average_permanent_private_houses_fully_owned"
    ),
    F.avg(F.col("permanent_private_household_homes_rented")).alias(
        "average_permanent_private_rented_properties"
    ),
    F.avg(F.col("permanent_private_households_acquisition")).alias(
        "average_permanent_private_properties_in_acquisition"
    ),
    F.avg(F.col("permanent_private_households_and_disposed")).alias(
        "average_permanent_private_properties_fully_owned"
    ),
    F.avg(F.col("permanent_private_households_rented")).alias(
        "average_permanent_private_properties_rented"
    ),
    F.avg(F.col("permanent_private_households_with_10_or_more_residents")).alias(
        "average_permanent_private_households_with_10_or_more_residents"
    ),
    F.avg(F.col("permanent_private_households_with_1_resident")).alias(
        "average_permanent_private_households_with_1_resident"
    ),
    F.avg(F.col("permanent_private_households_with_2_residents")).alias(
        "average_permanent_private_households_with_2_residents"
    ),
    F.avg(F.col("permanent_private_households_with_3_residents")).alias(
        "average_permanent_private_households_with_3_residents"
    ),
    F.avg(F.col("permanent_private_households_with_4_residents")).alias(
        "average_permanent_private_households_with_4_residents"
    ),
    F.avg(F.col("permanent_private_households_with_5_residents")).alias(
        "average_permanent_private_households_with_5_residents"
    ),
    F.avg(F.col("permanent_private_households_with_6_residents")).alias(
        "average_permanent_private_households_with_6_residents"
    ),
    F.avg(F.col("permanent_private_households_with_7_residents")).alias(
        "average_permanent_private_households_with_7_residents"
    ),
    F.avg(F.col("permanent_private_households_with_8_residents")).alias(
        "average_permanent_private_households_with_8_residents"
    ),
    F.avg(F.col("permanent_private_households_with_9_residents")).alias(
        "average_permanent_private_households_with_9_residents"
    ),
    F.avg(F.col("permanent_private_households_with_electricity")).alias(
        "average_permanent_private_households_with_electricity"
    ),
    F.avg(
        F.col("permanent_private_households_without_exclusive_use_bathroom_residents")
    ).alias(
        "average_permanent_private_households_without_exclusive_use_bathroom_residents"
    ),
    F.avg(F.col("prop_households_electricity")).alias(
        "average_proportion_households_electricity"
    ),
    F.avg(F.col("prop_households_no_income")).alias(
        "average_proportion_households_no_income"
    ),
    F.avg(F.col("prop_households_one_halft_sm_income")).alias(
        "average_proportion_households_one_halft_sm_income"
    ),
    F.avg(F.col("prop_households_sanitation")).alias(
        "average_proportion_households_sanitation"
    ),
    F.avg(F.col("prop_households_trash_collection")).alias(
        "average_proportion_households_trash_collection"
    ),
    F.avg(F.col("prop_households_two_or_more_income")).alias(
        "average_proportion_households_two_or_more_income"
    ),
    F.avg(F.col("prop_households_two_sm_income")).alias(
        "average_proportion_households_two_sm_income"
    ),
    F.avg(F.col("prop_househols_one_eigth_sm_income")).alias(
        "average_proportion_househols_one_eigth_sm_income"
    ),
    F.avg(F.col("prop_income_participation_in_household_responsible")).alias(
        "average_proportion_income_participation_in_household_responsible"
    ),
    F.avg(F.col("prop_literate_people")).alias("average_proportion_literate_people"),
    F.avg(F.col("prop_literate_women")).alias("average_proportion_literate_women"),
    F.avg(F.col("prop_private_households_one_half_sm_income")).alias(
        "average_proportion_private_households_one_half_sm_income"
    ),
    F.avg(F.col("prop_private_households_one_quarter_sm_income")).alias(
        "average_proportion_private_households_one_quarter_sm_income"
    ),
    F.avg(F.col("prop_women_up_to_30_years")).alias(
        "average_proportion_women_up_to_30_years"
    ),
    F.avg(F.col("prop_0_5_age_population")).alias(
        "average_proportion_population_up_to_5_years_age"
    ),
    F.avg(F.col("prop_households_access_water")).alias(
        "average_prop_households_access_water"
    ),
    F.avg(F.col("prop_responsible_people_up_to_30_years")).alias(
        "average_proportion_responsible_people_up_to_30_years"
    ),
    F.avg(
        F.col(
            "value_average_monthly_nominal_income_persons_responsible_for_permanent_private_households_with_and_without_income"
        )
    ).alias("average_monthly_income"),
    F.sum(F.col("total_responsible_people")).alias("population_house_leaders"),
    F.sum(F.col("residents_permanent_permanent_households")).alias(
        "residents_in_permanent_households"
    ),
    F.sum(F.col("residents_permanent_private_households_apartment_type")).alias(
        "residents_in_apartments"
    ),
    F.sum(
        F.col(
            "residents_permanent_private_households_or_population_residing_permanent_private_households"
        )
    ).alias("residents_in_private_households"),
    F.sum(F.col("residents_permanent_private_households_rented")).alias(
        "residents_in_rented_properties"
    ),
    F.sum(F.col("residents_private_households_and_collective_households")).alias(
        "residents_in_collective_households"
    ),
    F.sum(F.col("permanent_private_households_with_3_bathrooms_residents")).alias(
        "permanent_private_households_with_up_to_3_bathrooms"
    ),
)

# adding literacy rate column:
df_zip_agg = df_zip_agg.withColumn(
    "literacy_rate", F.col("population") / F.col("alphabetized_population")
)

In [137]:
# fixing the column names:
new_columns = list(map(lambda col: f"zipcode_{col}", df_zip_agg.columns))

for i in range(len(df_zip_agg.columns)):
    df_zip_agg = df_zip_agg.withColumnRenamed(df_zip_agg.columns[i], new_columns[i])

df_zip_agg = df_zip_agg.withColumnRenamed("zipcode_zipcode", "zipcode")

# sorting the dataframe columns:
df_zip_agg = sort_dataframe_columns(df_zip_agg, "zipcode")

In [138]:
# adding the resulting dataframe to the SQL Context:
df_zip_agg.createOrReplaceTempView("tb_zipcode_features")

# generating the final level of aggregation for the census features:
Q_ZIPCODE_GEOM = """
SELECT
    A.*,
    B.geometry,
    ST_Centroid(B.geometry) as district_centroid
FROM tb_zipcode_features as A
LEFT JOIN tb_zipcode as B 
ON A.zipcode = B.zipcode
"""

df_zipcode_final = spark.sql(Q_ZIPCODE_GEOM)

## 5.4.2 Exporting Zipcode level Results

In [139]:
# save the results to the specified directory:
ZIP_OUTPUT = f"zipcodes/tb_zipcode_census"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

save_to_filesystem(
    df_zipcode_final, PROCESSED_UNITS_OF_INTEREST, ZIP_OUTPUT, ZIP_OUTPUT + ".parquet"
)

True

In [140]:
# save the results to the specified directory:
ZIP_NO_GEO = f"zipcodes/tb_zipcode_census_no_geo"
PROCESSED_UNITS_OF_INTEREST = "../data/processed/sp_census/units_of_interest/"

df_zip_no_geo = df_zipcode_final.drop("geometry")

save_to_filesystem(
    df_zip_no_geo, PROCESSED_UNITS_OF_INTEREST, ZIP_NO_GEO, ZIP_NO_GEO + ".parquet"
)

True