In [1]:
births = spark.table("births_2018")
display(births)
births.columns

NameError: name 'spark' is not defined

In [2]:
# wybranie kolumn gdzie bmi > 0
data = (
  births
    .where(births['bmi'] > 0)
)
print("Liczba wierszy: %d" % (data.count()))


In [3]:
# usunięcie kolumn z pustymi wartościami
births_data = data.dropna(how='any')
print("Liczba usuniętych pusty wierszy: ", data.count() - births_data.count())
print("Liczba pozostałych wierszy:", births_data.count())

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

string_columns = ['ip_gon', 'mm_aicu', 'sex', 'ld_indl', 'mtran', 'rf_cesar', 'rf_cesarn']
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(births_data) for column in string_columns]

vector_assembler = VectorAssembler(inputCols = list(set(births_data.columns) - set(string_columns) - set(['bmi'])), outputCol = 'features')\


pipeline = Pipeline(stages=indexers+[vector_assembler])
births_data = pipeline.fit(births_data).transform(births_data).select(['features', 'bmi'])

In [5]:
# podstawowe statystyki dla danych
births_data.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
bmi,3134569,28.759007827286588,12.483797151774885,13.0,99.9


In [6]:
splits = births_data.randomSplit([0.7, 0.3])
trainingData = splits[0]
testData = splits[1]

In [7]:
trainingData.describe().show()

In [8]:
#budowa modelu regresji liniowej 
from pyspark.ml.regression import LinearRegression
linear = LinearRegression(featuresCol = "features", labelCol='bmi', maxIter=10)
linear_model = linear.fit(trainingData)
print("Wspolczynniki: " + str(linear_model.coefficients))
print("Wyraz wolny: " + str(linear_model.intercept))

trainingSummary = linear_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
#Testowanie na danych testowych
linear_predictions = linear_model.transform(testData)
linear_predictions.select("prediction","bmi","features").show(5)
linear_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="bmi",metricName="r2")
print("R Squared (R2) dla danych testowych = %g" % linear_evaluator.evaluate(linear_predictions))
test_result = linear_model.evaluate(testData)
print("Root Mean Squared Error (RMSE) dla danych testowych = %g" % test_result.rootMeanSquaredError)

In [10]:
# Drzewa decyzyjne
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer

# Wykrycie zmiennych kategorycznyc, jezeli jest 5 lub mniej różnych wartości dla kolumny to traktujemy
# ją jako zmienną kategoryczną
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=5).fit(births_data)
decisionTreeReg = DecisionTreeRegressor(featuresCol ='indexedFeatures', labelCol = 'bmi')
pipeline = Pipeline(stages=[featureIndexer, decisionTreeReg])

decisionTreeReg_model = pipeline.fit(trainingData)

In [11]:
#Testowanie na danych testowych
dt_predictions = decisionTreeReg_model.transform(testData)
print(dt_predictions)
rmse = RegressionEvaluator(labelCol="bmi", predictionCol="prediction", metricName="rmse").evaluate(dt_predictions)
r2 = RegressionEvaluator(predictionCol="prediction", labelCol="bmi", metricName="r2").evaluate(dt_predictions)
dt_predictions.select("prediction", "bmi", "features").show(5)
print("Root Mean Squared Error (RMSE) na danych testowych = %g" % rmse)
print("R Squared (R2) dla danych testowych = %g" % r2)
# summary only
print(decisionTreeReg_model)