In [4]:
import json
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import (udf, col)

from google.cloud import storage

spark = SparkSession.builder.appName("censo").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [5]:
def add_prefix_in_columns(df, prefix):
    return df.select([col(column).alias(f"{prefix}_{column}") for column in df.columns])

def load_json(name, bucket):
    bucket = storage.Client().get_bucket(bucket)
    blob = bucket.blob(f'aux/{name}.json')
    maps = json.loads(blob.download_as_string())
    return maps


def mapping(df, map_, column, type_return):
    map_func = udf(lambda key: map_.get(str(key)), type_return())
    df = df.withColumn(column, map_func(col(column)))
    return df

def string_to_date(df, column, year):
    if year > 2014:
        pattern = '%d/%m/%Y'
    else:
        pattern = "%d%b%Y:%H:%M:%S"
    map_func =  udf (lambda date: datetime.strptime(date, pattern) 
                     if type(date) == str 
                     else None, DateType())
    df = df.withColumn(column, map_func(col(column)))
    return df

def load_csv(file, bucket, year, region=None):
    schema = load_json(f"schemas/{file}_schema", bucket)
    try:
        schema = StructType.fromJson(schema)
    except:
        schema = StructType.fromJson(json.loads(schema))

    if file == "matricula":
        file = f"gs://{bucket}/landing_zone/censo-escolar/{year}/matricula_{region}.csv"
    else:
        file = f"gs://{bucket}/landing_zone/censo-escolar/{year}/{file}.csv"

    df = spark \
            .read \
            .options(header=True, delimiter="|", encoding="utf8") \
            .schema(schema=schema) \
            .csv(file)
    return df

def transform_string_columns(df, file, bucket):
    maps = load_json("maps", bucket)
    string_columns = [column for column in df.columns 
                      if column.startswith("TP") or column.startswith("CO")]

    for column in string_columns:
        if column in maps:
            df = mapping(df, maps[column], column, StringType)
            
    return df

def transform_boolean_columns(df):
    boolean_columns = [column for column in df.columns 
                  if column.startswith("IN")]

    mapping_bool = {
        "0": False,
        "1": True
    }

    for column in boolean_columns:
        df = mapping(df, mapping_bool, column, BooleanType)
        
    return df
        
def transform_integer_columns(df):
    integer_columns = [column for column in df.columns 
                      if column.startswith("NU") or column.startswith("QT")]
    for column in integer_columns:
        df = df.withColumn(column, col(column).cast(IntegerType()))
    
    return df

def transform_date_columns(df, file):
    if file == "escolas":
        df = string_to_date(df, "DT_ANO_LETIVO_INICIO", YEAR)
        df = string_to_date(df, "DT_ANO_LETIVO_TERMINO", YEAR)
    
    return df

def drop_columns(df, file):
    drops = []
    if file in ["turmas", "gestor", "matricula"]:
        drops.extend(['TP_REGULAMENTACAO',
                     'CO_UF',
                     'IN_MANT_ESCOLA_PRIVADA_ONG',
                     'NU_ANO_CENSO',
                     'CO_MUNICIPIO',
                     'IN_CONVENIADA_PP',
                     'IN_ESPECIAL_EXCLUSIVA',
                     'TP_CATEGORIA_ESCOLA_PRIVADA',
                     'IN_MANT_ESCOLA_PRIVADA_OSCIP',
                     'IN_MANT_ESCOLA_PRIV_ONG_OSCIP',
                     'IN_MANT_ESCOLA_PRIVADA_S_FINS',
                     'IN_MANT_ESCOLA_PRIVADA_SIST_S',
                     'CO_DISTRITO',
                     'IN_EDUCACAO_INDIGENA',
                     'CO_MICRORREGIAO',
                     'TP_DEPENDENCIA',
                     'IN_EJA',
                     'IN_REGULAR',
                     'IN_PROFISSIONALIZANTE',
                     'TP_LOCALIZACAO_DIFERENCIADA',
                     'TP_CONVENIO_PODER_PUBLICO',
                     'TP_LOCALIZACAO',
                     'CO_REGIAO',
                     'CO_MESORREGIAO',
                     'IN_MANT_ESCOLA_PRIVADA_EMP',
                     'IN_MANT_ESCOLA_PRIVADA_SIND',
                    ]
                )
        
    if file == 'matricula':
        drops.extend([ 'NU_DIAS_ATIVIDADE', 
                         'NU_DURACAO_TURMA'])
        
    if file != "turmas":
        drops.extend(["TP_MEDIACAO_DIDATICO_PEDAGO", 
                      "TP_TIPO_ATENDIMENTO_TURMA",
                      "TP_TIPO_LOCAL_TURMA"])
    
        
    df = df.drop(*drops)

    return df
        

def transform(file, bucket, year, region=None):
        df = load_csv(file, bucket, year, region)
        df = drop_columns(df, file)
        df = transform_string_columns(df, file, bucket)
        df = transform_boolean_columns(df)
        df = transform_integer_columns(df)
        df = transform_date_columns(df, bucket)
        return df

In [6]:
#regions =  ["co", "nordeste", "norte", "sudeste", "sul"]
regions =  ["norte"]
if __name__ == "__main__":
    if sys.argv[4:]:
        bucket, year = sys.argv[4:]
    else:
        bucket = "rjr-portal-da-transparencia"
        year = "2020"
    escolas = transform("escolas", bucket, year)
    escolas = add_prefix_in_columns(escolas, "E")
    turmas =  transform("turmas", bucket, year)
    turmas = add_prefix_in_columns(turmas, "T")
    for region in regions:
        matriculas = transform("matricula", bucket, year, region)
        matriculas = add_prefix_in_columns(matriculas, "M")

In [7]:
escolas_turmas = escolas.join(turmas, escolas.E_CO_ENTIDADE == turmas.T_CO_ENTIDADE)

In [8]:
matriculas_turmas_escolas = matriculas.join(escolas_turmas, 
                                            matriculas.M_ID_TURMA == escolas_turmas.T_ID_TURMA
                                            )

In [9]:
partitions = ["E_NU_ANO_CENSO", "E_CO_REGIAO", "E_CO_UF", 
              "E_CO_MESORREGIAO", "E_CO_MICRORREGIAO", 
              "E_CO_MUNICIPIO"]

In [None]:
matriculas_turmas_escolas \
 .write \
 .partitionBy(partitions) \
 .parquet(f"gs://{bucket}/processing_zone/censo-escolar", compression="snappy")

21/07/27 01:37:36 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/07/27 01:39:38 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1560.5 KiB
                                                                                

In [86]:
# if FILE == "matricula":
#     file =  f"{FILE}_{REGION}"
# else:
#     file = FILE
    
# df.write.parquet(f"gs://{BUCKET}/temp/censo-escolar/{YEAR}/{FILE}.parquet")  

                                                                                

In [87]:
# a = spark \
#         .read \
#         .parquet(f"gs://{BUCKET}/temp/censo-escolar/{YEAR}/{FILE}.parquet")  