# Task 2 - Create a parquet file

#### Create a pyspark dataframe reading by the prostgres database products_project

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://s01:5432/products_project") \
    .option("driver","org.postgresql.Driver") \
    .option("dbtable", "(select * from shoes ) as shoes") \
    .option("user", "prallis_ds") \
    .option("password", "13131966") \
    .load()

df.printSchema()
df.show(10)

root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- upc_id: string (nullable = true)
 |-- descr: string (nullable = true)
 |-- vendor_catalog_url: string (nullable = true)
 |-- buy_url: string (nullable = true)
 |-- manufacturer_name: string (nullable = true)
 |-- sale_price: decimal(38,18) (nullable = true)
 |-- retail_price: decimal(38,18) (nullable = true)
 |-- manufacturer_part_no: string (nullable = true)
 |-- country: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- category_id: integer (nullable = true)

+----------+--------------------+------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------+---------+-------------+-------------+-----------+
|product_id|                name|upc_id|               descr|  vendor_catalog_url|        

#### write the DataFrame in HDFS in parquet format.

In [None]:
df.write.parquet("hdfs:///products_project/shoes.parquet")

# Task 3 - ML

#### Cross Validation

code from Course 7 Machine Learning Pipelines

In [14]:
import numpy as np

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.sql.functions import rand


class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()

        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                metric = eva.evaluate(model.transform(validation, paramMap))
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics))

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
import subprocess

In [8]:

# subprocess.call(["hdfs", "dfs", "-rm", "-r", outPath])
shoes_parquet = spark.read.parquet("hdfs:///products_project/shoes.parquet")

In [9]:
data = shoes_parquet.select('descr','category_code')
data.show(5)

+--------------------+-------------+
|               descr|category_code|
+--------------------+-------------+
|Black sheep skin ...|        boots|
|Black leather '14...|        boots|
|La Canadienne Wom...|        boots|
|Zip closure<br>Ro...|        boots|
|Round toeline<br>...|        boots|
+--------------------+-------------+
only showing top 5 rows



### Random Forest

https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier

https://spark.apache.org/docs/latest/ml-features#tf-idf

In [12]:
from pyspark.ml.classification import RandomForestClassifier

#### TF-IDF

In [35]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="category_code", outputCol="indexedLabel").fit(data)

tokenizer = Tokenizer(inputCol="descr", outputCol="words")
# wordsData = tokenizer.transform(data)

# Automatically identify categorical features, and index them.
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
# featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")

labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol='features', numTrees=10)

pipeline = Pipeline(stages=[labelIndexer, tokenizer, hashingTF, idf,  rf, labelConverter])

(train, test) = data.randomSplit([0.7, 0.3])
train.cache()

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 1000, 10000]) \
    .addGrid(rf.numTrees, [10, 20]) \
    .build()

crossval = CrossValidatorVerbose(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy"),
                      numFolds=5) 

rf_model = crossval.fit(train)


Comparing models on fold 1
params: {'numTrees': 10, 'numFeatures': 10}	accuracy: 0.468217	avg: 0.468217
params: {'numTrees': 20, 'numFeatures': 10}	accuracy: 0.473920	avg: 0.473920
params: {'numTrees': 10, 'numFeatures': 1000}	accuracy: 0.599737	avg: 0.599737
params: {'numTrees': 20, 'numFeatures': 1000}	accuracy: 0.605830	avg: 0.605830
params: {'numTrees': 10, 'numFeatures': 10000}	accuracy: 0.503802	avg: 0.503802
params: {'numTrees': 20, 'numFeatures': 10000}	accuracy: 0.476748	avg: 0.476748
Comparing models on fold 2
params: {'numTrees': 10, 'numFeatures': 10}	accuracy: 0.469645	avg: 0.468931
params: {'numTrees': 20, 'numFeatures': 10}	accuracy: 0.474323	avg: 0.474121
params: {'numTrees': 10, 'numFeatures': 1000}	accuracy: 0.603927	avg: 0.601832
params: {'numTrees': 20, 'numFeatures': 1000}	accuracy: 0.615523	avg: 0.610677
params: {'numTrees': 10, 'numFeatures': 10000}	accuracy: 0.503070	avg: 0.503436
params: {'numTrees': 20, 'numFeatures': 10000}	accuracy: 0.468378	avg: 0.472563
Co

In [10]:
prediction = rf_model.transform(test)

In [11]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")


In [12]:
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.414736 


### Logistic Regression

In [14]:
from pyspark.ml.classification import LogisticRegression, OneVsRest

In [15]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="category_code", outputCol="label").fit(data)

tokenizer = Tokenizer(inputCol="descr", outputCol="words")
wordsData = tokenizer.transform(data)

# Automatically identify categorical features, and index them.
# hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")

(train, test) = data.randomSplit([0.7, 0.3])
train.cache()

DataFrame[descr: string, category_code: string]

In [16]:
# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

In [17]:
# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

In [18]:
pipeline = Pipeline(stages=[labelIndexer, tokenizer, hashingTF,idf , ovr])


In [19]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 1000, 10000]) \
    .build()
crossval = CrossValidatorVerbose(estimator=pipeline,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                      numFolds=5) 

In [20]:
# train the multiclass model.
ovrModel = crossval.fit(train)

Comparing models on fold 1
params: {'numFeatures': 10}	accuracy: 0.463998	avg: 0.463998
params: {'numFeatures': 1000}	accuracy: 0.897535	avg: 0.897535
params: {'numFeatures': 10000}	accuracy: 0.946156	avg: 0.946156
Comparing models on fold 2
params: {'numFeatures': 10}	accuracy: 0.457295	avg: 0.460646
params: {'numFeatures': 1000}	accuracy: 0.894005	avg: 0.895770
params: {'numFeatures': 10000}	accuracy: 0.945710	avg: 0.945933
Comparing models on fold 3
params: {'numFeatures': 10}	accuracy: 0.462781	avg: 0.461358
params: {'numFeatures': 1000}	accuracy: 0.896687	avg: 0.896076
params: {'numFeatures': 10000}	accuracy: 0.948515	avg: 0.946793
Comparing models on fold 4
params: {'numFeatures': 10}	accuracy: 0.463759	avg: 0.461958
params: {'numFeatures': 1000}	accuracy: 0.898209	avg: 0.896609
params: {'numFeatures': 10000}	accuracy: 0.947994	avg: 0.947094
Comparing models on fold 5
params: {'numFeatures': 10}	accuracy: 0.461410	avg: 0.461849
params: {'numFeatures': 1000}	accuracy: 0.896718	avg

In [21]:
# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0511128


In [22]:
lr_model = ovrModel.bestModel

In [23]:
outPath = "hdfs:///products_project/model"

In [28]:
#Save model
lr_model.save(outPath)

AttributeError: 'function' object has no attribute 'overwrite'

### Load Model

https://stackoverflow.com/questions/39776617/spark-streaming-how-to-load-a-pipeline-on-a-stream|

In [3]:
from pyspark.ml import PipelineModel
outPath = "hdfs:///products_project/model"
load_lr_model = PipelineModel.load(outPath)


In [30]:
predictions = load_lr_model.transform(test)

In [32]:
predictions.show(10)

+--------------------+-------------+-----+--------------------+--------------------+--------------------+----------+
|               descr|category_code|label|               words|         rawFeatures|            features|prediction|
+--------------------+-------------+-----+--------------------+--------------------+--------------------+----------+
|"A blast" is what...|      sandals|  1.0|["a, blast", is, ...|(10000,[167,628,7...|(10000,[167,628,7...|       1.0|
|"A “multi-talente...|      sandals|  1.0|["a, “multi-talen...|(10000,[638,848,9...|(10000,[638,848,9...|       1.0|
|"Can a casual sli...|      sandals|  1.0|["can, a, casual,...|(10000,[100,304,5...|(10000,[100,304,5...|       1.0|
|"Casual" just sla...|mens-sneakers|  3.0|["casual", just, ...|(10000,[161,307,4...|(10000,[161,307,4...|       3.0|
|"Modern and sexy,...|      sandals|  1.0|["modern, and, se...|(10000,[36,158,28...|(10000,[36,158,28...|       1.0|
|"Mom they make me...|  girls-shoes|  2.0|["mom, they, make...|(

## Task 5 Spark streaming

In [33]:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
from __future__ import print_function

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('offers',0)
topic = 'offers'
fromOffset = {topicPartion: 0}

twitterKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

tweets = twitterKafkaStream. \
        map(lambda (key, value): json.loads(value)). \
        map(lambda json_object: (json_object["text"]))

tweets.saveAsTextFiles('/tweets/')
tweets.pprint(10)


ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2020-07-04 12:32:36
-------------------------------------------
here you can find sneakers offers #shoesoffers
the best shoes offers for ladies #shoesoffers
boots offers in summer why not

-------------------------------------------
Time: 2020-07-04 12:32:37
-------------------------------------------

-------------------------------------------
Time: 2020-07-04 12:32:38
-------------------------------------------

-------------------------------------------
Time: 2020-07-04 12:32:39
-------------------------------------------

-------------------------------------------
Time: 2020-07-04 12:32:40
-------------------------------------------

-------------------------------------------
Time: 2020-07-04 12:32:41
-------------------------------------------

-------------------------------------------
Time: 2020-07-04 12:32:42
-------------------------------------------

-------------------------------------------
Time: 2020-07-04 12:32:43
-

KeyboardInterrupt: 

In [1]:
new_tweets = spark.read.csv("hdfs:///tweets/-1593865956000/part-00000")

In [2]:
new_tweets.show(10)

+--------------------+
|                 _c0|
+--------------------+
|here you can find...|
|the best shoes of...|
|boots offers in s...|
+--------------------+



In [4]:
oldColumns = new_tweets.schema.names
newColumns = ["descr"]

df = reduce(lambda new_tweets, idx: new_tweets.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), new_tweets)
df.printSchema()
df.show()

root
 |-- descr: string (nullable = true)

+--------------------+
|               descr|
+--------------------+
|here you can find...|
|the best shoes of...|
|boots offers in s...|
+--------------------+



In [None]:
from pyspark.ml import PipelineModel
outPath = "hdfs:///products_project/model"
load_lr_model = PipelineModel.load(outPath)

In [36]:
predictions = load_lr_model.transform(df)

In [37]:
predictions.show(3)

+--------------------+--------------------+--------------------+--------------------+----------+
|               descr|               words|         rawFeatures|            features|prediction|
+--------------------+--------------------+--------------------+--------------------+----------+
|here you can find...|[here, you, can, ...|(10000,[1135,1425...|(10000,[1135,1425...|       3.0|
|the best shoes of...|[the, best, shoes...|(10000,[219,763,1...|(10000,[219,763,1...|       1.0|
|boots offers in s...|[boots, offers, i...|(10000,[1445,3965...|(10000,[1445,3965...|       0.0|
+--------------------+--------------------+--------------------+--------------------+----------+



In [18]:
labelIndexer = StringIndexer(inputCol="category_code", outputCol="label").fit(data)


In [25]:
label_df = labelIndexer.transform(data)

In [34]:
label_df.filter(label_df['label'] == 3 ).show(1)

+--------------------+-------------+-----+
|               descr|category_code|label|
+--------------------+-------------+-----+
|Orange 'Flyknit' ...|mens-sneakers|  3.0|
+--------------------+-------------+-----+
only showing top 1 row



In [38]:
predictions.write.parquet("hdfs:///tweets/predictions/predictions.parquet")

In [53]:
import pyspark.sql.functions as F
def myConcat(*cols):
    concat_columns = []
    for c in cols[:-1]:
        concat_columns.append(F.coalesce(c, F.lit("*")))
        concat_columns.append(F.lit(" "))  
    concat_columns.append(F.coalesce(cols[-1], F.lit("*")))
    return F.concat(*concat_columns)

df_text = predictions.withColumn("combined", myConcat(*df.columns)).select("combined")

df_text.show()

df_text.coalesce(1).write.format("text").option("header", "false").mode("append").save("hdfs:///tweets/predictions/predictions.txt")

+--------------------+
|            combined|
+--------------------+
|here you can find...|
|the best shoes of...|
|boots offers in s...|
+--------------------+

