In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, BooleanType, LongType, DoubleType
import json

In [2]:
spark = SparkSession.builder \
    .appName("Lendo arquivos Parquet") \
    .getOrCreate()

In [3]:
def _get_schema_data(name: str) -> str:
    with open(f"../schemas/{name}.json") as arquivo:
        schema = json.load(arquivo)
    return ",".join(map(lambda p: f"{p['name']}:{p['type']}", schema['fields']))

In [4]:
def _build_schema(schema_arg: str):
    d_types = {
        "string": StringType(),
        "long": LongType(),
        "double": DoubleType(),
        "bool": BooleanType()
    }
    
    split_values = schema_arg.split(",")
    schema = StructType()
    
    for word in split_values:
        x = word.split(":")
        schema.add(x[0], d_types[x[1]], True)
    
    return schema

In [5]:
def _get_file(table: str):
    df = spark.read \
                .schema(_build_schema(_get_schema_data(table))) \
                .parquet(f"../data/silver/{table}")
    return df.show()

In [12]:
_get_file("Municipios")

+-------------+--------------------+
|cod_municipio|       des_municipio|
+-------------+--------------------+
|            1|       GUAJARA-MIRIM|
|            2|ALTO ALEGRE DOS P...|
|            3|         PORTO VELHO|
|            4|             BURITIS|
|            5|           JI-PARANA|
|            6|         CHUPINGUAIA|
|            7|           ARIQUEMES|
|            8|             CUJUBIM|
|            9|              CACOAL|
|           10|          NOVA UNIAO|
|           11|       PIMENTA BUENO|
|           12|             PARECIS|
|           13|             VILHENA|
|           14|PIMENTEIRAS DO OESTE|
|           15|                JARU|
|           16|PRIMAVERA DE ROND...|
|           17| OURO PRETO DO OESTE|
|           18|  SAO FELIPE D'OESTE|
|           19|   PRESIDENTE MEDICI|
|           20|SAO FRANCISCO DO ...|
+-------------+--------------------+
only showing top 20 rows

