In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,") \
    .appName("ProcessingBronzeToSilver") \
    .getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
def generate_access_keys():
    aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
    aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
    return aws_access_key_id, aws_secret_key

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
aws_access_key_id, aws_secret_key = generate_access_keys()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
sc = spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", aws_access_key_id)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", aws_secret_key)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
def generate_datalake_layer(layer: str) -> str:
    return f"university-datalake-{layer}"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
def generate_path(layer: str, table: str, without_partition=None) -> str:
    if without_partition == True:
        return f"s3a://{layer}/table={table}/{table}.parquet"
    return f"s3a://{layer}/*/*/*/table={table}/{table}.parquet"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
silver = generate_datalake_layer(layer="silver")
gold = generate_datalake_layer(layer="gold")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df_alunos = spark.read.format('parquet').load(generate_path(layer=silver, table="alunos", without_partition=True))
df_cursos = spark.read.format('parquet').load(generate_path(layer=silver, table="cursos", without_partition=True))
df_departamentos = spark.read.format('parquet').load(generate_path(layer=silver, table="departamentos", without_partition=True))
df_disciplinas = spark.read.format('parquet').load(generate_path(layer=silver, table="disciplinas", without_partition=True))
df_matriculas = spark.read.format('parquet').load(generate_path(layer=silver, table="matriculas", without_partition=True))
df_matrizes_cursos = spark.read.format('parquet').load(generate_path(layer=silver, table="matrizes_cursos", without_partition=True))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
dim_curso = df_cursos.select(col('cod_curso').alias('id_curso'),
                              col('nome_curso'),
                              col('turno'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
dim_disciplina = df_disciplinas.select(col('cod_disc').alias('id_disc'),
                                        col('nome_disc'),
                                        col('cod_disc'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
dim_aluno = df_alunos.select(col('mat_alu').alias('id_aluno'),
                              col('nome'),
                              col('cotista'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
dim_departamento = df_departamentos.select(col('cod_dpto').alias('id_departamento'),
                                            col('nome_dpto').alias('nome_departamento'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
dim_tempo = df_matriculas.select(col('semestre').cast(StringType()))


def generate_year(column):
    return column[:4]

def generate_id(column):
    pass

generate_year_udf = udf(generate_year)
        
dim_tempo = dim_tempo.withColumn('ano', generate_year_udf(col('semestre')))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
df_matrizes_cursos.createOrReplaceTempView("matrizes_cursos")
df_matriculas.createOrReplaceTempView("matriculas")
df_cursos.createOrReplaceTempView("cursos")

query = """SELECT c.cod_dpto,
                mc.cod_curso,
                mc.cod_disc,
                mc.periodo,
                m.semestre,
                m.mat_alu,
                m.nota,
                m.faltas,
                m.status
            FROM matrizes_cursos mc 
            INNER JOIN matriculas m
            ON mc.cod_disc = m.cod_disc
            INNER JOIN cursos c
            ON mc.cod_curso = c.cod_curso"""

fact_historico = spark.sql(query)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
fact_historico = fact_historico.select(col('cod_dpto').alias('id_departamento'),
                                       col('cod_curso').alias('id_curso'),
                                       col('cod_disc').alias('id_disc'),
                                       col('mat_alu').alias('id_aluno'),
                                       col('periodo'),
                                       col('semestre'),
                                       col('nota'),
                                       col('faltas'),
                                       col('status'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
dim_curso.repartition(1).write.format('parquet') \
                .mode('overwrite') \
                .option("header", True) \
                .option("path", f"s3a://{gold}/dim_curso.parquet").save()
dim_disciplina.repartition(1).write.format('parquet') \
                .mode('overwrite') \
                .option("header", True) \
                .option("path", f"s3a://{gold}/dim_disciplina.parquet").save()
dim_aluno.repartition(1).write.format('parquet') \
                .mode('overwrite') \
                .option("header", True) \
                .option("path", f"s3a://{gold}/dim_aluno.parquet").save()
dim_departamento.repartition(1).write.format('parquet') \
                .mode('overwrite') \
                .option("header", True) \
                .option("path", f"s3a://{gold}/dim_departamento.parquet").save()
dim_tempo.repartition(1).write.format('parquet') \
                .mode('overwrite') \
                .option("header", True) \
                .option("path", f"s3a://{gold}/dim_tempo.parquet").save()
fact_historico.repartition(1).write.format('parquet') \
                .mode('overwrite') \
                .option("header", True) \
                .option("path", f"s3a://{gold}/fact_historico.parquet").save()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
print("The processing was finished and the data it's on the gold layer!")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The processing was finished and the data it's on the gold layer!