In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=a628143883286a53b173ff59dc8ba74f74e1e4e5c9ded8f8d959704a854c464a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder\
        .master("local")\
        .appName("wine")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark
sc = spark.sparkContext
sc

In [None]:
wine = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/wines_SPA.csv", inferSchema=True, header =True)
wine.printSchema()

root
 |-- winery: string (nullable = true)
 |-- wine: string (nullable = true)
 |-- year: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- price: double (nullable = true)
 |-- type: string (nullable = true)
 |-- body: string (nullable = true)
 |-- acidity: string (nullable = true)



In [None]:
wine.show()

+-------------------+--------------------+----+------+-----------+-------+----------------+----------------+--------------------+----+-------+
|             winery|                wine|year|rating|num_reviews|country|          region|           price|                type|body|acidity|
+-------------------+--------------------+----+------+-----------+-------+----------------+----------------+--------------------+----+-------+
|      Teso La Monja|               Tinto|2013|   4.9|         58| Espana|            Toro|           995.0|            Toro Red|   5|      3|
|             Artadi|       Vina El Pison|2018|   4.9|         31| Espana|  Vino de Espana|           313.5|         Tempranillo|   4|      2|
|       Vega Sicilia|               Unico|2009|   4.8|       1793| Espana|Ribera del Duero|          324.95|Ribera Del Duero Red|   5|      3|
|       Vega Sicilia|               Unico|1999|   4.8|       1705| Espana|Ribera del Duero|          692.96|Ribera Del Duero Red|   5|      3|

In [None]:
wine = wine.na.drop()

In [None]:
import six
for i in wine.columns:
    if not( isinstance(wine.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to rating for ", i, wine.stat.corr('rating',i))

Correlation to rating for  rating 1.0
Correlation to rating for  num_reviews 0.007113627120754117
Correlation to rating for  price 0.5443991729328242


In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [None]:
featureassembler = VectorAssembler(inputCols = ["num_reviews","price"], outputCol = "features")

In [None]:
output = featureassembler.transform(wine)
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|        [58.0,995.0]|
|        [31.0,313.5]|
|     [1793.0,324.95]|
|     [1705.0,692.96]|
|     [1309.0,778.06]|
|      [1209.0,490.0]|
|      [1201.0,349.0]|
|      [926.0,810.89]|
|       [643.0,345.0]|
|       [630.0,315.0]|
|      [591.0,514.85]|
|      [454.0,105.15]|
|       [438.0,430.0]|
|       [417.0,925.0]|
|       [398.0,350.0]|
|      [372.0,166.18]|
|      [295.0,1620.0]|
|      [250.0,431.36]|
|[217.0,195.627379...|
|      [211.0,824.43]|
+--------------------+
only showing top 20 rows



In [None]:
finalised_data = output.select("features", "rating")

In [None]:
train_data, test_data = finalised_data.randomSplit([0.7, 0.3])

In [None]:
lr = LinearRegression(featuresCol = 'features', labelCol='rating', maxIter=15, regParam=0.2, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0]
Intercept: 4.256094289000018


In [None]:
trainingSummary = lr_model.summary
print("RMSE on train data : %f" % trainingSummary.rootMeanSquaredError)
print("r2 on train data : %f" % trainingSummary.r2)

RMSE on train data : 0.120456
r2 on train data : -0.000000


In [None]:
lr_predictions = lr_model.transform(test_data)
lr_predictions.select("features","rating","prediction").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="rating",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------+------+-----------------+
|    features|rating|       prediction|
+------------+------+-----------------+
|[25.0,46.95]|   4.3|4.256094289000018|
|[25.0,48.81]|   4.3|4.256094289000018|
| [25.0,68.9]|   4.3|4.256094289000018|
|[25.0,130.0]|   4.6|4.256094289000018|
|[25.0,181.5]|   4.6|4.256094289000018|
+------------+------+-----------------+
only showing top 5 rows

R Squared (R2) on test data = -0.00132909


In [None]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 0
objectiveHistory: [0.5]
+--------------------+
|           residuals|
+--------------------+
|0.043905710999982084|
| 0.14390571099998262|
|0.043905710999982084|
|0.043905710999982084|
|0.043905710999982084|
|0.043905710999982084|
|0.043905710999982084|
|0.043905710999982084|
| 0.24390571099998226|
| 0.14390571099998262|
|0.043905710999982084|
| 0.14390571099998262|
|  0.3439057109999819|
| 0.14390571099998262|
|  0.3439057109999819|
|0.043905710999982084|
|0.043905710999982084|
|0.043905710999982084|
|0.043905710999982084|
| 0.14390571099998262|
+--------------------+
only showing top 20 rows



In [None]:
regressor = LinearRegression(featuresCol = 'features', labelCol = 'rating')
regressor = regressor.fit(train_data)

In [None]:
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+-------------+------+------------------+
|     features|rating|        prediction|
+-------------+------+------------------+
| [25.0,46.95]|   4.3| 4.247303559025819|
| [25.0,48.81]|   4.3|4.2481005558228535|
|  [25.0,68.9]|   4.3|  4.25670897821663|
| [25.0,130.0]|   4.6| 4.282889894506313|
| [25.0,181.5]|   4.6| 4.304957278940334|
|  [26.0,37.9]|   4.3| 4.243431572884294|
| [26.0,39.89]|   4.3| 4.244284273758541|
|  [26.0,51.5]|   4.5| 4.249259076346482|
|  [26.0,68.0]|   4.3| 4.256329209223401|
|  [26.0,69.0]|   4.5| 4.256757702125033|
| [26.0,97.07]|   4.3| 4.268785497873829|
|[26.0,1098.9]|   4.8| 4.698062541515289|
|   [27.0,7.9]|   4.4|  4.23058266045359|
|  [27.0,11.5]|   4.4|4.2321252348994625|
| [27.0,13.45]|   4.5| 4.232960796057644|
| [27.0,26.45]|   4.4| 4.238531203778853|
| [27.0,31.75]|   4.3|   4.2408022161575|
|  [27.0,40.2]|   4.4| 4.244422981176286|
|[27.0,2750.0]|   4.6| 5.405553046017256|
| [28.0,15.84]|   4.4| 4.233990768710783|
+-------------+------+------------

In [None]:
test_resultt = lr_model.evaluate(test_data)
print("Mean Absolute Error (MAE) on test data = %g" % 
      test_resultt.meanAbsoluteError)
print("Mean Squared Error (MSE) on test data = %g" % 
      test_result.meanSquaredError)
print("Root Mean Squared Error (RMSE) on test data = %g" % 
      test_result.rootMeanSquaredError)

Mean Absolute Error (MAE) on test data = 0.0812456
Mean Squared Error (MSE) on test data = 0.0125213
Root Mean Squared Error (RMSE) on test data = 0.111899
