In [8]:
from pyspark.sql import SparkSession
import pandas as pd

In [9]:
spark = SparkSession.builder.appName('test').getOrCreate()

In [10]:
# Import csv of spam and ham (not spam)
start_data = spark.read.format("csv").option("header", "true").load("./testing/test.csv")
start_data.show()
type(start_data)

+---+------------------+--------------------+
|_c0|           variety|         description|
+---+------------------+--------------------+
|  0|       White Blend|Aromas include tr...|
|  1|    Portuguese Red|This is ripe and ...|
|  2|        Pinot Gris|Tart and snappy t...|
|  3|          Riesling|Pineapple rind le...|
|  4|        Pinot Noir|Much like the reg...|
|  5|Tempranillo-Merlot|Blackberry and ra...|
|  6|          Frappato|Here's a bright i...|
|  7|    Gewürztraminer|This dry and rest...|
|  8|    Gewürztraminer|Savory dried thym...|
|  9|        Pinot Gris|This has great de...|
| 10|Cabernet Sauvignon|Soft supple plum ...|
| 11|    Gewürztraminer|This is a dry win...|
| 12|Cabernet Sauvignon|Slightly reduced ...|
| 13| Nerello Mascalese|This is dominated...|
| 14|        Chardonnay|Building on 150 y...|
| 15|          Riesling|Zesty orange peel...|
| 16|            Malbec|Baked plum molass...|
| 17|            Malbec|Raw black-cherry ...|
| 18| Tempranillo Blend|Desiccated

pyspark.sql.dataframe.DataFrame

In [11]:
# Create a length column to be used as a future feature 
from pyspark.sql.functions import length
data = start_data.withColumn('length', length(start_data['description']))
# data.show(truncate = False)
data.show()


+---+------------------+--------------------+------+
|_c0|           variety|         description|length|
+---+------------------+--------------------+------+
|  0|       White Blend|Aromas include tr...|   166|
|  1|    Portuguese Red|This is ripe and ...|   222|
|  2|        Pinot Gris|Tart and snappy t...|   181|
|  3|          Riesling|Pineapple rind le...|   194|
|  4|        Pinot Noir|Much like the reg...|   241|
|  5|Tempranillo-Merlot|Blackberry and ra...|   252|
|  6|          Frappato|Here's a bright i...|   177|
|  7|    Gewürztraminer|This dry and rest...|   119|
|  8|    Gewürztraminer|Savory dried thym...|   151|
|  9|        Pinot Gris|This has great de...|   157|
| 10|Cabernet Sauvignon|Soft supple plum ...|   237|
| 11|    Gewürztraminer|This is a dry win...|   174|
| 12|Cabernet Sauvignon|Slightly reduced ...|   167|
| 13| Nerello Mascalese|This is dominated...|   228|
| 14|        Chardonnay|Building on 150 y...|   303|
| 15|          Riesling|Zesty orange peel...| 

In [12]:
sparktoPD = data.toPandas()
sparktoPD

Unnamed: 0,_c0,variety,description,length
0,0,White Blend,Aromas include tropical fruit broom brimstone ...,166.0
1,1,Portuguese Red,This is ripe and fruity a wine that is smooth ...,222.0
2,2,Pinot Gris,Tart and snappy the flavors of lime flesh and ...,181.0
3,3,Riesling,Pineapple rind lemon pith and orange blossom s...,194.0
4,4,Pinot Noir,Much like the regular bottling from 2012 this ...,241.0
5,5,Tempranillo-Merlot,Blackberry and raspberry aromas show a typical...,252.0
6,6,Frappato,Here's a bright informal red that opens with a...,177.0
7,7,Gewürztraminer,This dry and restrained wine offers spice in p...,119.0
8,8,Gewürztraminer,Savory dried thyme notes accent sunnier flavor...,151.0
9,9,Pinot Gris,This has great depth of flavor with its fresh ...,157.0


In [13]:
sparktoPD.drop(sparktoPD.columns[[0]], axis=1, inplace=True) # ditch that unnamed row numbers column
sparktoPD.describe(include='all')

Unnamed: 0,variety,description,length
count,129970,129971,129971.0
unique,707,119950,
top,Pinot Noir,Ripe plum game truffle leather and menthol are...,
freq,13272,3,
mean,,,236.469836
std,,,65.203061
min,,,19.0
25%,,,192.0
50%,,,231.0
75%,,,275.0


In [14]:
varieties =  sparktoPD['variety'].value_counts()
varieties

Pinot Noir                    13272
Chardonnay                    11753
Cabernet Sauvignon             9472
Red Blend                      8946
Bordeaux-style Red Blend       6915
Riesling                       5189
Sauvignon Blanc                4967
Syrah                          4142
Rosé                           3564
Merlot                         3102
Nebbiolo                       2804
Zinfandel                      2714
Sangiovese                     2707
Malbec                         2652
Portuguese Red                 2466
White Blend                    2360
Sparkling Blend                2153
Tempranillo                    1810
Rhône-style Red Blend          1471
Pinot Gris                     1455
Champagne Blend                1396
Cabernet Franc                 1353
Grüner Veltliner               1345
Portuguese White               1159
Bordeaux-style White Blend     1066
Pinot Grigio                   1052
Gamay                          1025
Gewürztraminer              

In [15]:
top_wines_df =  sparktoPD.loc[ sparktoPD['variety'].isin(varieties.axes[0][:20])] # first column[0],  rows :20  
top_wines_df['variety'].describe()

count          93914
unique            20
top       Pinot Noir
freq           13272
Name: variety, dtype: object

In [16]:
data 

DataFrame[_c0: string, variety: string, description: string, length: int]

In [17]:
#convert pd to spark
spark_df = spark.createDataFrame(top_wines_df)
spark_df

DataFrame[variety: string, description: string, length: double]

In [18]:
len(sparktoPD['variety'].unique())

708

# Feature Transformations

In [19]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# Create all the features to the data set
wine_variety_to_num = StringIndexer(inputCol= 'variety',outputCol='label')
tokenizer = Tokenizer(inputCol="description", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
# hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token', numFeatures=1000)
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token', numFeatures=10000)
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [20]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
# See https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
# This just creates a new, single vector of features that is the concatenation
# of tf-idf data and the length of the wine varietal
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [21]:
# Create a and run a data processing Pipeline
# See https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[wine_variety_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [22]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(spark_df)
# Running the transform method here _actually_ runs the pipeline on our 
# DataFrame and returns the transformed data
cleaned = cleaner.transform(spark_df)

In [23]:
# Show label of wine variety and resulting features
cleaned.select(['label', 'features']).show(truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                               

In [24]:
# Break data down into a training set and a testing set
(training, testing) = cleaned.randomSplit([0.7, 0.3])

In [25]:
training.count()

65706

In [26]:
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel

# Create a Naive Bayes model and fit training data
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')
wine_predictor = nb.fit(training)
 

In [114]:
from pyspark.ml.classification import NaiveBayesModel

In [27]:
# Tranform the model with the testing data
test_results = wine_predictor.transform(testing)
test_results.show(5)

+--------------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|             variety|         description|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Bordeaux-style Re...|86-88 Barrel samp...| 194.0|  4.0|[86-88, barrel, s...|[86-88, barrel, s...|(10000,[409,697,1...|(10000,[409,697,1...|(10001,[409,697,1...|[-1018.3026185241...|[3.51174243884331...|       4.0|
|Bordeaux-style Re...|87-89 Barrel samp...| 163.0|  4.0|[87-89, barrel, s...|[87-89, barrel, s...|(10000,[161,173,1...|(10000,[161,173,1...|

In [119]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting wine variety was: %s" % (acc))

Accuracy of model at predicting wine variety was: 0.5942089730643539


# Saving a Trained Model
We can save our trained models using the HDF5 binary format with the extension .h5

In [42]:
!open NLP_NaiveBayes_trained.h5/

In [52]:
!rm -r NLP_NaiveBayes_trained.h5/

In [55]:
!open NLP_NaiveBayes_trained/

In [95]:
# Save the model
wine_predictor.save("my_model")

In [99]:
!ls my_model/

[34mdata[m[m     [34mmetadata[m[m


In [116]:
myModel = NaiveBayesModel.load("my_model")

In [118]:
more_test_results = myModel.transform(testing)
more_test_results.show(5)

+--------------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|             variety|         description|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Bordeaux-style Re...|86-88 Barrel samp...| 194.0|  4.0|[86-88, barrel, s...|[86-88, barrel, s...|(10000,[409,697,1...|(10000,[409,697,1...|(10001,[409,697,1...|[-1018.3026185241...|[3.51174243884331...|       4.0|
|Bordeaux-style Re...|87-89 Barrel samp...| 163.0|  4.0|[87-89, barrel, s...|[87-89, barrel, s...|(10000,[161,173,1...|(10000,[161,173,1...|

In [39]:
import pickle

In [40]:
loaded_model = pickle.load(open("NLP_NaiveBayes_trained.h5", 'rb'))

IsADirectoryError: [Errno 21] Is a directory: 'NLP_NaiveBayes_trained.h5'

In [28]:
type(nb)

pyspark.ml.classification.NaiveBayes

In [44]:
vars(spark)

{'_conf': <pyspark.sql.conf.RuntimeConfig at 0x10f6eec88>,
 '_jsc': JavaObject id=o11,
 '_jsparkSession': JavaObject id=o24,
 '_jvm': <py4j.java_gateway.JVMView at 0x10a84f2e8>,
 '_jwrapped': JavaObject id=o25,
 '_sc': <SparkContext master=local[*] appName=PythonNaiveBayesExample>,
 '_wrapped': <pyspark.sql.context.SQLContext at 0x10f6eee80>}

In [46]:
sc = spark._sc

In [None]:
# spark.stop()

In [48]:
# Save and load model
nb.save(sc, "myNaiveBayesModel")
sameModel = NaiveBayesModel.load(sc, "myNaiveBayesModel")

TypeError: save() takes 2 positional arguments but 3 were given

In [49]:
nb.save

<bound method MLWritable.save of NaiveBayes_4cf49269be983d4d3ce1>

In [50]:
nb

NaiveBayes_4cf49269be983d4d3ce1

In [62]:
sameModel = NaiveBayes.load(sc,"NLP_NaiveBayes_trained")

Py4JJavaError: An error occurred while calling z:org.apache.spark.mllib.classification.NaiveBayesModel.load.
: org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String
	at org.json4s.Extraction$.convert(Extraction.scala:603)
	at org.json4s.Extraction$.extract(Extraction.scala:350)
	at org.json4s.Extraction$.extract(Extraction.scala:42)
	at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
	at org.apache.spark.mllib.util.Loader$.loadMetadata(modelSaveLoad.scala:131)
	at org.apache.spark.mllib.classification.NaiveBayesModel$.load(NaiveBayes.scala:271)
	at org.apache.spark.mllib.classification.NaiveBayesModel.load(NaiveBayes.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
