In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType,DoubleType,FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import regexp_replace
import locale
import sys
import os
import re

In [2]:
sc

In [3]:
spark

# Leyendo lista de Colegios y Estudiantes

In [4]:
base = '../Colegios'
base2 = '../SB'
colegios = os.listdir('../Colegios')
estudiantes = os.listdir('../SB')

In [5]:
estudiantes.sort()
colegios.sort()
estuDic = {}
coleDic = {}
for estu in estudiantes:
    estuDic[re.findall('\d+', estu)[1]] = estu

for colegio in colegios:
    coleDic[re.findall('\d+', colegio)[1]] = colegio

In [6]:
# Funciòn para unir cualquier nùmero de dataframes con la misma estructura 
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [7]:
print("Archivos de estudiantes: ", estuDic['20091'],estuDic['20092'])
print("Archivos de colegios: ",coleDic['2009'])

Archivos de estudiantes:  SB11_20091.txt SB11_20092.txt
Archivos de colegios:  SB11-CLASIFI-PLANTELES-2009.txt


In [8]:
columnasPuntos = [
 'cole_cod_dane_institucion',
 'cole_nombre_sede',
 'punt_lenguaje',
 'punt_matematicas',
 'punt_c_sociales',
 'punt_filosofia',
 'punt_biologia',
 'punt_quimica',
 'punt_fisica',
 'punt_ingles',
 'punt_profundizacion',
 'punt_interdisciplinar']

In [9]:
# SE leen los dos archivos y se crean dos dataframes
df1 = spark.read.load(base2 + '/' + estuDic['20091'],"com.databricks.spark.csv",header='true',inferSchema='true',sep='¬').select(columnasPuntos)
df2 = spark.read.load(base2 + '/' + estuDic['20092'],"com.databricks.spark.csv",header='true',inferSchema='true',sep='¬').select(columnasPuntos)

In [10]:
# SE unen los dos dataframes
columnasPuntosc = [
 'punt_lenguaje',
 'punt_matematicas',
 'punt_c_sociales',
 'punt_filosofia',
 'punt_biologia',
 'punt_quimica',
 'punt_fisica',
 'punt_ingles',
 'punt_profundizacion',
 'punt_interdisciplinar']
df = unionAll(df1,df2)
for cp in columnasPuntosc:
    df = df.withColumn(cp, regexp_replace(cp, ',', '.'))
    df = df.withColumn(cp, df[cp].cast("float"))
    
df1 = df2 = None

In [11]:
print("Numero de filas ", df.count())

Numero de filas  524811


In [12]:
print(df.printSchema())

root
 |-- cole_cod_dane_institucion: string (nullable = true)
 |-- cole_nombre_sede: string (nullable = true)
 |-- punt_lenguaje: float (nullable = true)
 |-- punt_matematicas: float (nullable = true)
 |-- punt_c_sociales: float (nullable = true)
 |-- punt_filosofia: float (nullable = true)
 |-- punt_biologia: float (nullable = true)
 |-- punt_quimica: float (nullable = true)
 |-- punt_fisica: float (nullable = true)
 |-- punt_ingles: float (nullable = true)
 |-- punt_profundizacion: float (nullable = true)
 |-- punt_interdisciplinar: float (nullable = true)

None


In [14]:
# Contando el numero de estudiantes por instituciòn y mostrandola de forma descendente
dsCountEst = df.groupBy('cole_cod_dane_institucion').count().orderBy('count',ascending=
False)

In [15]:
dsCountEst = dsCountEst.withColumnRenamed("count","num_est")
dsCountEst.show(10,False)

+-------------------------+-------+
|cole_cod_dane_institucion|num_est|
+-------------------------+-------+
|null                     |5378   |
|105001000108             |1062   |
|311001105391             |826    |
|325754005072             |802    |
|305686001228             |763    |
|105001013340             |739    |
|168001003591             |722    |
|354001000027             |701    |
|111001024732             |684    |
|111001019411             |648    |
+-------------------------+-------+
only showing top 10 rows



In [16]:
#Se crea una columna con el total de los puntajes   
df= df.withColumn("Suma",df['punt_lenguaje']+df['punt_matematicas']+df['punt_c_sociales']+
                     df['punt_filosofia']+df['punt_biologia']+df['punt_quimica']+df['punt_fisica']+
                     df['punt_ingles']+df['punt_profundizacion']+df['punt_interdisciplinar'])

## Resultados finales por instituciòn

In [17]:
# Còdigo para obtener la media de los resultados de las evaluaciones de todos los estudiantes por instituciòn 
# y posiciòn en orden descendente.
from pyspark.sql.functions import *
from pyspark.sql.window import Window

df = df.groupBy('cole_cod_dane_institucion','cole_nombre_sede').mean()
df = df.withColumn("rank", dense_rank().over(Window.orderBy(desc("avg(Suma)"))))

In [18]:
dfjoin = df.join(dsCountEst,"cole_cod_dane_institucion")
dfjoin = dfjoin.withColumn("Media",round(dfjoin['avg(Suma)'],2))
dfjoin = dfjoin.orderBy('rank',ascending= True)

In [19]:
dfjoin.select('cole_cod_dane_institucion','cole_nombre_sede','Media','rank','num_est').show(10)

+-------------------------+--------------------+------+----+-------+
|cole_cod_dane_institucion|    cole_nombre_sede| Media|rank|num_est|
+-------------------------+--------------------+------+----+-------+
|             368276000826|COL LA QUINTA DEL...|598.64|   1|     34|
|             311848000278|GIMN VERMONT     ...|587.86|   2|     65|
|             311769000165|COL SAN JORGE DE ...|584.11|   3|     71|
|             376001013441|COLEGIO BILINGUE ...|575.11|   4|      9|
|             311848002424|COL SAN CARLOS   ...|572.29|   5|    103|
|             311001087288|     GIMN LA MONTAÑA|568.73|   6|     37|
|             311848002351|COL SANTA FRANCIS...|568.45|   7|     52|
|             311769003342|COL LOS NOGALES  ...|566.43|   8|     36|
|             311001019568|LIC FRANCES LOUIS...|565.09|   9|     80|
|             311001089221|INST ALBERTO MERA...|563.35|  10|     19|
+-------------------------+--------------------+------+----+-------+
only showing top 10 rows



## Colegios

In [18]:
# se crea un dataframe con una columna que lo señala el año 2006.
from pyspark.sql.functions import lit
df = df.withColumn('año',lit(2009))
df.show(5,False)

+-------------------------+------------------+---------------------+--------------------+-------------------+------------------+------------------+------------------+-----------------+------------------------+--------------------------+-----------------+----+----+
|cole_cod_dane_institucion|avg(punt_lenguaje)|avg(punt_matematicas)|avg(punt_c_sociales)|avg(punt_filosofia)|avg(punt_biologia)|avg(punt_quimica) |avg(punt_fisica)  |avg(punt_ingles) |avg(punt_profundizacion)|avg(punt_interdisciplinar)|avg(Suma)        |rank|año |
+-------------------------+------------------+---------------------+--------------------+-------------------+------------------+------------------+------------------+-----------------+------------------------+--------------------------+-----------------+----+----+
|368276000826             |62.61735287834616 |84.55176499310662    |58.171176573809454  |59.140587862800146 |62.60470659592573 |61.67117702259737 |62.11235315659467 |84.07588207020478|6.905588164049036    

In [19]:
# SE crea Dataframe de colegios en este año desaparecio la columna COLE_CODMPIO_COLEGIO
dfc = spark.read.load(base + '/' + coleDic['2009'],"com.databricks.spark.csv",header='true',inferSchema='true',sep='\t')
dfc = dfc.withColumn('COLE_CODIGO_COLEGIO', dfc['COLE_CODIGO_COLEGIO'].cast("string"))

In [20]:
print(dfc.printSchema())

root
 |-- COLE_CODIGO_COLEGIO: string (nullable = true)
 |-- COLE_INST_NOMBRE: string (nullable = true)
 |-- COLE_COD_CIUDAD: integer (nullable = true)
 |-- COLE_MPIO_COLEGIO: string (nullable = true)
 |-- COLE_DEPTO_COLEGIO: string (nullable = true)
 |-- COLE_INST_JORNADA: string (nullable = true)
 |-- COLE_CALENDARIO_COLEGIO: string (nullable = true)
 |-- COLE_GENEROPOBLACION: string (nullable = true)
 |-- COLE_NATURALEZA: string (nullable = true)
 |-- COLE_CIENCIAS_SOCIALES: integer (nullable = true)
 |-- COLE_QUIMICA: integer (nullable = true)
 |-- COLE_FISICA: integer (nullable = true)
 |-- COLE_BIOLOGIA: integer (nullable = true)
 |-- COLE_FILOSOFIA: integer (nullable = true)
 |-- COLE_MATEMATICAS: integer (nullable = true)
 |-- COLE_LENGUAJE: integer (nullable = true)
 |-- COLE_INGLES: integer (nullable = true)
 |-- COLE_CATEGORIA: string (nullable = true)
 |-- COLE_ESTUDIANTES_PRESENTES: integer (nullable = true)

None


In [21]:
colsColegio = ['COLE_CODIGO_COLEGIO',
 'COLE_CIENCIAS_SOCIALES',
 'COLE_QUIMICA',
 'COLE_FISICA',
 'COLE_BIOLOGIA',
 'COLE_FILOSOFIA',
 'COLE_MATEMATICAS',
 'COLE_LENGUAJE',
 'COLE_INGLES']

In [22]:
dfc = dfc.select(colsColegio)

In [23]:
dfc.show(5,False)

+-------------------+----------------------+------------+-----------+-------------+--------------+----------------+-------------+-----------+
|COLE_CODIGO_COLEGIO|COLE_CIENCIAS_SOCIALES|COLE_QUIMICA|COLE_FISICA|COLE_BIOLOGIA|COLE_FILOSOFIA|COLE_MATEMATICAS|COLE_LENGUAJE|COLE_INGLES|
+-------------------+----------------------+------------+-----------+-------------+--------------+----------------+-------------+-----------+
|18                 |8                     |8           |7          |8            |7             |7               |7            |9          |
|59                 |8                     |7           |7          |7            |8             |7               |7            |7          |
|75                 |7                     |7           |7          |7            |8             |7               |7            |7          |
|83                 |9                     |8           |7          |8            |9             |8               |8            |9          |
|91   

In [24]:
print("El numero de colegios es: ", dfc.select('COLE_CODIGO_COLEGIO').count())

El numero de colegios es:  10375


## Anàlisis comparativo entre estudiantes y colegios

In [25]:
dfjoin = df.join(dfc, df.cole_cod_dane_institucion == dfc.COLE_CODIGO_COLEGIO)

In [26]:
print("Nùmero de colegios coincidentes entre el archivo de estudiantes y colegios:", dfjoin.count())

Nùmero de colegios coincidentes entre el archivo de estudiantes y colegios: 0


La informaciòn de colegios no coincide con la informaciòn de estudiantes, no existe cole_cod_dane_institucion en el archivo de colegios.