In [1]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField,StringType, FloatType
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import create_map, struct, col, udf, desc

In [2]:
import pyspark.sql.functions as F

In [3]:
builder = SparkSession.builder.appName("QUALIS SPARK")
builder.config(
    "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
builder.config("spark.speculation", "false")
builder.config("spark.sql.parquet.compression.codec", "gzip")
builder.config("spark.debug.maxToStringFields", "100")
builder.config("spark.driver.memory", "1g")
builder.config("spark.driver.cores", "1")
builder.config("spark.executor-memory", "20g")
builder.config("spark.executor.cores", "4")


builder.master("local[*]")

spark = builder.getOrCreate()
spark

In [4]:
RAW = './'
MODELED = '../modeled/'

In [6]:
file = spark.read.parquet(RAW+"producao_qualis_computacao")

In [9]:
filtered_file = file\
    .groupBy("SG_ENTIDADE_ENSINO", "AN_BASE_PRODUCAO", "QUALIS")\
    .count().withColumnRenamed('count', 'QTD_QUALIS')\
    .orderBy("SG_ENTIDADE_ENSINO")

filtered_file.toPandas().head()

Unnamed: 0,SG_ENTIDADE_ENSINO,AN_BASE_PRODUCAO,QUALIS,QTD_QUALIS
0,CEFET/RJ,2016,B3,2
1,CEFET/RJ,2016,A2,6
2,CEFET/RJ,2016,B5,1
3,CEFET/RJ,2016,B1,1
4,CEFET/RJ,2016,C,3


In [10]:
#NPGeral = NPA(1) x 1,0 + NPA(2) x 0,85 + NPB(1) x 0,70 + NPB(2) x 0,50 + NPB(3) x 0,20 + NPB(4) x 0,10 + NPB(5) x 0,05
def calculate_score(qualis):
    score = 0.0
    print(qualis)
    if qualis == "A1":
        score = 1.0
    elif qualis == "A2":
        score = 0.85
    elif qualis == "B1":
        score = 0.7
    elif qualis == "B2":
        score = 0.5
    elif qualis == "B3":
        score = 0.2
    elif qualis == "B4":
        score = 0.1
    elif qualis == "A2":
        score = 0.05
    return score

In [11]:
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfCalculateScore = udf(calculate_score, StringType())
file_with_partial_score = filtered_file\
    .withColumn("SCORE_CONSTANT", udfCalculateScore(filtered_file['QUALIS']))

file_with_partial_score.toPandas().head()

Unnamed: 0,SG_ENTIDADE_ENSINO,AN_BASE_PRODUCAO,QUALIS,QTD_QUALIS,SCORE_CONSTANT
0,CEFET/RJ,2016,B3,2,0.2
1,CEFET/RJ,2016,A2,6,0.85
2,CEFET/RJ,2016,B5,1,0.0
3,CEFET/RJ,2016,B1,1,0.7
4,CEFET/RJ,2016,C,3,0.0


In [13]:
file_final_score = file_with_partial_score\
    .withColumn("SCORE", file_with_partial_score['QTD_QUALIS'] * file_with_partial_score['SCORE_CONSTANT'])

file_final_score.toPandas().head()

Unnamed: 0,SG_ENTIDADE_ENSINO,AN_BASE_PRODUCAO,QUALIS,QTD_QUALIS,SCORE_CONSTANT,SCORE
0,CEFET/RJ,2016,B3,2,0.2,0.4
1,CEFET/RJ,2016,A2,6,0.85,5.1
2,CEFET/RJ,2016,B5,1,0.0,0.0
3,CEFET/RJ,2016,B1,1,0.7,0.7
4,CEFET/RJ,2016,C,3,0.0,0.0


In [14]:
score_entidade_ano = file_final_score\
    .groupBy("SG_ENTIDADE_ENSINO", "AN_BASE_PRODUCAO")\
    .agg(F.sum("SCORE").alias("TOTAL_SCORE"))\
    .orderBy("AN_BASE_PRODUCAO", desc("TOTAL_SCORE"))

In [16]:
score_entidade_ano.write.mode("overwrite").parquet(MODELED+"score_entidade_ano")