In [None]:
# File location and type
file_location = "/FileStore/tables/STAR.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)


In [None]:
from pyspark.sql.types import DoubleType
df = df.withColumn("readk",df["readk"].cast(DoubleType()))
df = df.withColumn("read1",df["read1"].cast(DoubleType()))
df = df.withColumn("read2",df["read2"].cast(DoubleType()))
df = df.withColumn("read3",df["read3"].cast(DoubleType()))
df = df.withColumn("mathk",df["mathk"].cast(DoubleType()))
df = df.withColumn("math1",df["math1"].cast(DoubleType()))
df = df.withColumn("math2",df["math2"].cast(DoubleType()))
df = df.withColumn("math3",df["math3"].cast(DoubleType()))
df = df.withColumn("experiencek",df["experiencek"].cast(DoubleType()))
df = df.withColumn("experience1",df["experience1"].cast(DoubleType()))
df = df.withColumn("experience2",df["experience2"].cast(DoubleType()))
df = df.withColumn("experience3",df["experience3"].cast(DoubleType()))


In [None]:
dbutils.fs.rm('/FileStore/parquet/star', True)
df.write.save('/FileStore/parquet/star', format='delta')
df = spark.read.load('/FileStore/parquet/star')
display(df)


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,OneHotEncoder
colk = ['gender','ethnicity','birth','stark','star1','star2','star3','lunchk','lunch1','lunch2','lunch3','schoolk','school1','school2','school3','degreek','degree1','degree2','degree3','ladderk','ladder1','ladder2','ladder3','tethnicityk','tethnicity1','tethnicity2','tethnicity3','systemk','system1','system2','system3','schoolidk','schoolid1','schoolid2','schoolid3']
colkI = [a+'I' for a in colk]
colkE = [a+'E' for a in colk]
indexer = StringIndexer(inputCols=colk, outputCols=colkI)
encoder = OneHotEncoder(inputCols=colkI, outputCols=colkE)
pipeline = Pipeline(stages=[indexer,encoder])
df = pipeline.fit(df).transform(df)


In [None]:
display(df)


In [None]:
colinputk = ['genderI','ethnicityI','birthI','starkI','lunchkI','schoolkI','degreekI','ladderkI','tethnicitykI']

# Dividir con randomSplit en 75% y 25%
train, test = df.randomSplit([0.75, 0.25])

# Crear un pipeline con tres etapas y aplicársela al conjunto de entrenamiento

# Etapa 1. Con pyspark.ml.feature.Imputer, rellena losv alores perdidos de la
#          variable de salida (strategy="mean") en otra variable.
from pyspark.ml.feature import Imputer
im = Imputer(inputCol="readk", outputCol="readk_imputado", strategy="mean")

# Etapa 2. Con pyspark.ml.feature.VectorAssembler, combina todas las variables
#          de entrada en una columna llamada "myfeatures".
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=colinputk, outputCol="myfeatures")

# Etapa 3. Con pyspark.ml.regression.LinearRegression, ajusta un modelo a las
#          variables de entrada "myfeatures" y de salida "myreadk_imputado".
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="myfeatures", labelCol="readk_imputado")

# Crear un pipeline con las tres etapas anteriores
df = Pipeline(stages=[im, assembler, lr]).fit(train).transform(train)

# Por último, calcula el error cuadrático medio y R2 en conjunto de test.
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="readk_imputado", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(test)
print("RMSE = %g" % rmse)
evaluator = RegressionEvaluator(labelCol="readk_imputado", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(test)
print("R2 = %g" % r2)
