In [25]:
from __future__ import print_function

In [27]:
import matplotlib.pyplot as plt

In [1]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

In [2]:
file_dir = "processed_data.parquet"
#file_dir = "hdfs://localhost:8020/data/processed_tweet_price.csv"

In [3]:
df = spark.read.parquet(file_dir)

In [4]:
df.printSchema()

root
 |-- _c0: long (nullable = true)
 |-- id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- is_retweet: double (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- sentiment_compound: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_pos: double (nullable = true)
 |-- source: string (nullable = true)
 |-- hour: long (nullable = true)
 |-- day: long (nullable = true)
 |-- week: long (nullable = true)
 |-- month: long (nullable = true)
 |-- year: long (nullable = true)
 |-- price: double (nullable = true)



In [5]:
df = df.drop("_c0").drop('id').drop('text')

In [6]:
str_indexer = StringIndexer().setInputCol("source").setOutputCol("source_index").fit(df)

In [7]:
df_2 = str_indexer.transform(df)

In [8]:
df_2.printSchema()

root
 |-- favorite_count: long (nullable = true)
 |-- is_retweet: double (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- sentiment_compound: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_pos: double (nullable = true)
 |-- source: string (nullable = true)
 |-- hour: long (nullable = true)
 |-- day: long (nullable = true)
 |-- week: long (nullable = true)
 |-- month: long (nullable = true)
 |-- year: long (nullable = true)
 |-- price: double (nullable = true)
 |-- source_index: double (nullable = false)



In [9]:
# Split the dataset randomly into 70% for training and 30% for testing.
train, test = df_2.randomSplit([0.7, 0.3])
print("We have %d training examples and %d test examples." % (train.count(), test.count()))

We have 10034 training examples and 4318 test examples.


In [10]:
featuresCols = df_2.columns
featuresCols.remove("source")
featuresCols.remove('price')

In [11]:
featuresCols

['favorite_count',
 'is_retweet',
 'retweet_count',
 'sentiment_compound',
 'sentiment_neg',
 'sentiment_neu',
 'sentiment_pos',
 'hour',
 'day',
 'week',
 'month',
 'year',
 'source_index']

In [12]:
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures").setHandleInvalid("skip")

In [13]:
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [14]:
# Takes the "features" column and learns to predict "cnt"
gbt = GBTRegressor(labelCol="price")

In [15]:
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()

In [16]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())

In [17]:
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [18]:
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [19]:
pipelineModel = pipeline.fit(train)

In [39]:
test

DataFrame[favorite_count: bigint, is_retweet: double, retweet_count: bigint, sentiment_compound: double, sentiment_neg: double, sentiment_neu: double, sentiment_pos: double, source: string, hour: bigint, day: bigint, week: bigint, month: bigint, year: bigint, price: double, source_index: double]

In [20]:
predictions = pipelineModel.transform(test)

In [29]:
predictions

DataFrame[favorite_count: bigint, is_retweet: double, retweet_count: bigint, sentiment_compound: double, sentiment_neg: double, sentiment_neu: double, sentiment_pos: double, source: string, hour: bigint, day: bigint, week: bigint, month: bigint, year: bigint, price: double, source_index: double, rawFeatures: vector, features: vector, prediction: double]

In [23]:
rmse = evaluator.evaluate(predictions)

In [24]:
print("RMSE on our test set: %g" % rmse)

RMSE on our test set: 15.4841


In [37]:
pipelineModel.save("gbtregressor.model")