In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from matplotlib.pyplot import *
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.ml.regression  import LinearRegression

path="./dataset/bike/hour.csv"
test = spark.read.format("CSV").option("header","true").load(path)
test = test.drop("instant","dteday","casual","registered")
colums_cat = ['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit']
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c) for c in colums_cat]
onehotencoder_stages = [OneHotEncoder(inputCol='stringindexed_' + c, outputCol='onehotencoded_' + c) for c in colums_cat]
all_stages = stringindexer_stages + onehotencoder_stages
reg_pipline = Pipeline(stages=all_stages)
reg_pip_model = reg_pipline.fit(test)
reg_test = reg_pip_model.transform(test)
reg_feature_columns = ['onehotencoded_' + c for c in colums_cat] + ['temp', 'atemp', 'hum','windspeed']
reg_labels_columns = ['cnt']
reg_test = reg_test.withColumn("temp",reg_test.temp.cast(DoubleType()))
reg_test = reg_test.withColumn("atemp",reg_test.atemp.cast(DoubleType()))
reg_test = reg_test.withColumn("hum",reg_test.hum.cast(DoubleType()))
reg_test = reg_test.withColumn("windspeed",reg_test.windspeed.cast(DoubleType()))
reg_test = reg_test.withColumn("cnt",reg_test.cnt.cast(DoubleType()))
reg_test = reg_test.select(reg_feature_columns + reg_labels_columns)
temp_columns = reg_test.columns[0:12];
vectorassembler = VectorAssembler(inputCols=temp_columns, outputCol='features')
reg_test =vectorassembler.transform(reg_test)
reg_test = reg_test.select(['features'] + reg_labels_columns)
linReg = LinearRegression(labelCol="cnt", featuresCol="features",maxIter=1000, regParam=0.3, solver="sgd")
linReg_model = linReg.fit(reg_test)
reg_train_summary = linReg_model.summary
reg_train_summary.residuals.show()
print("meanAbsoluteError %2.4f",reg_train_summary.meanAbsoluteError)
print("meanSquaredError %2.4f",reg_train_summary.meanSquaredError)
print("rootMeanSquaredError %2.4f",reg_train_summary.rootMeanSquaredError)
#pylab inline
#targets_count = reg_test.select('cnt').count();
#targets = reg_test.select('cnt').take(10)
#col = ['b' for x in range(10)]
#hist(targets, bins=40, color=col, normed=True)
#fig = matplotlib.pyplot.gcf()
#fig.set_size_inches(16, 10)

+-------------------+
|          residuals|
+-------------------+
|  86.15264166176489|
| 130.90262431531178|
| 131.87347645926954|
| 108.90273592781455|
| 106.14318415977431|
| 106.09883267192475|
| 40.132207438047345|
| -84.26511873837907|
|-237.24335913359255|
|-100.03683886688424|
| -28.98353646869765|
|-18.983003541644592|
| -52.26821970150493|
| -39.33952316107769|
|-11.889556641492788|
| -8.369444259110281|
| -78.82579151690388|
| -263.1258736080613|
| -200.2852960602999|
| -89.95247700274817|
+-------------------+
only showing top 20 rows

('meanAbsoluteError %2.4f', 75.2524435763989)
('meanSquaredError %2.4f', 10320.728049367128)
('rootMeanSquaredError %2.4f', 101.59098409488476)


In [34]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from matplotlib.pyplot import *
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

path="./dataset/bike/hour.csv"
test = spark.read.format("CSV").option("header","true").load(path)
test = test.drop("instant","dteday","casual","registered")
colums_cat = ['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit']
for cul in colums_cat:
    test = test.withColumn(cul, test[cul].cast(DoubleType()))
test = test.withColumn('cnt', test['cnt'].cast(DoubleType()))
assembler = VectorAssembler(inputCols=colums_cat,outputCol="features")
dt_test = assembler.transform(test)
dt_test = dt_test.select(['features','cnt']);
dt = DecisionTreeRegressor(labelCol='cnt',maxDepth=20, maxBins=100)
dt_model = dt.fit(dt_test)
dt_predict = dt_model.transform(dt_test)
#dt_predict = dt_predict.withColumn("prediction", dt_predict.prediction)
evaluator = RegressionEvaluator(
    labelCol="cnt", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dt_predict)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 31.2489


In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



In [7]:
from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()

Chose 351 categorical features: 645, 69, 365, 138, 101, 479, 333, 249, 0, 555, 666, 88, 170, 115, 276, 308, 5, 449, 120, 247, 614, 677, 202, 10, 56, 533, 142, 500, 340, 670, 174, 42, 417, 24, 37, 25, 257, 389, 52, 14, 504, 110, 587, 619, 196, 559, 638, 20, 421, 46, 93, 284, 228, 448, 57, 78, 29, 475, 164, 591, 646, 253, 106, 121, 84, 480, 147, 280, 61, 221, 396, 89, 133, 116, 1, 507, 312, 74, 307, 452, 6, 248, 60, 117, 678, 529, 85, 201, 220, 366, 534, 102, 334, 28, 38, 561, 392, 70, 424, 192, 21, 137, 165, 33, 92, 229, 252, 197, 361, 65, 97, 665, 583, 285, 224, 650, 615, 9, 53, 169, 593, 141, 610, 420, 109, 256, 225, 339, 77, 193, 669, 476, 642, 637, 590, 679, 96, 393, 647, 173, 13, 41, 503, 134, 73, 105, 2, 508, 311, 558, 674, 530, 586, 618, 166, 32, 34, 148, 45, 161, 279, 64, 689, 17, 149, 584, 562, 176, 423, 191, 22, 44, 59, 118, 281, 27, 641, 71, 391, 12, 445, 54, 313, 611, 144, 49, 335, 86, 672, 172, 113, 681, 219, 419, 81, 230, 362, 451, 76, 7, 39, 649, 98, 616, 477, 367, 535, 1

In [83]:
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),(0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"])
df.show()

+-----+------+---------+
|label|weight| features|
+-----+------+---------+
|  1.0|   2.0|    [1.0]|
|  0.0|   2.0|(1,[],[])|
+-----+------+---------+

