In [None]:
import re

import pyspark as ps    # for the pyspark suite
import os               # for environ variables in Part 3

%load_ext autoreload
%autoreload 2

spark = ps.sql.SparkSession.builder \
            .appName("df lecture") \
            .getOrCreate()
        
import numpy as np
import pandas as pd

from pyspark.sql.functions import rand

In [287]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.feature import NGram

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


In [77]:
# ### this is for converting a silly pandas format to something that is more like a normal json you may want it later

# panda = items.toPandas()

# pcols = panda.columns

# pcols = cycle(pcols)

# jint = 0
# jlist = []
# jdict = {}

# for i in panda.values.flatten():
#     key = pcols.next()
#     jdict[key] = i
#     if key == 'user_id':
#         jlist.append(jdict)
#         jdict = {}
#         jint += 1
   


# with open('../data/acc_dataset_local.json', 'w') as jfile:
#     json.dump(jlist, jfile)

In [30]:
# yelp = spark.read.json('../data/yelp_academic_reviews.json')

In [86]:
yelp = spark.read.json('../data/acc_dataset_local.json')

In [88]:
yelp.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [216]:
def kill_non_ascii(text):
    lets = []
    ntext = text.lower()
    ntext = re.sub("[^a-z' ]",' ',ntext)
#     for letter in ntext:
#         if ord(letter) < 128 :
#             lets.append(letter.lower())
    return ntext.split()

In [240]:
spark.udf.register('sbin', lambda x: 1 if x > 3 else 0)
spark.udf.register('imbin', lambda x: 1 if x > 2 else 0)
spark.udf.register('mkascii', kill_non_ascii)
spark.udf.register('listjoin', lambda x: ' '.join(x))

In [228]:
yelp.registerTempTable('yelp')

r_bin =  spark.sql('''
            SELECT array(mkascii(text)) as content, int(imbin(useful + funny + cool)) as relevant, int(sbin(stars)) as good
            FROM yelp
        ''')

In [229]:
r_bin.first()

Row(content=[u'[saturday, night, late, i, was, getting, warm, when, i, checked, the, thermostat, to, see, if, the, central, ac, was, on, and, yes, it, was, but, it, was, blowing, warm, air, oh, no, so, now, my, air, conditioning, decided, the, day, it, was, degrees, to, stop, working, i, called, sunday, afternoon, and, spoke, with, mark, i, told, him, about, the, issue, with, my, ac, and, he, said, the, earliest, he, could, get, here, was, sometime, monday, i, was, fine, with, that, even, tough, it, was, degrees, in, the, house, my, wife, and, i, were, ok, but, a, bit, worried, about, our, dogs, and, how, they, would, take, the, heat, amy, marks, wife, called, this, morning, to, confirm, that, i, would, be, home, mark, came, around, asked, a, few, questions, and, went, right, to, work, after, diagnosing, the, problem, he, came, back, and, told, us, what, was, wrong, what, needed, to, be, done, and, the, cost, to, have, it, repaired, i, agreed, to, the, repair, about, hour, later, he, w

In [230]:
mincount =  r_bin.filter('relevant > 0').count()

In [231]:
dataset_neg = r_bin.filter('relevant = 0').orderBy(rand()).limit(mincount)
dataset_pos = r_bin.filter('relevant = 1').orderBy(rand()).limit(mincount)

df_relevance = dataset_pos.union(dataset_neg)

In [232]:
df_relevance = df_relevance.drop('good')

In [233]:
df_relevance.printSchema()

root
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- relevant: integer (nullable = true)



In [238]:
remover = StopWordsRemover(inputCol="content", outputCol="filtered")
df_rel_stopped = remover.transform(df_relevance)

In [241]:
df_rel_stopped.printSchema()

root
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- relevant: integer (nullable = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [295]:
ngram = NGram(n=2, inputCol="filtered", outputCol="ngrams")

ngramDataFrame = ngram.transform(df_rel_stopped)

In [298]:
ngramDataFrame.printSchema()

root
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- relevant: integer (nullable = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [302]:
df_rel_stopped.registerTempTable('df_rel_stopped')

stop_strings = spark.sql('''
            SELECT listjoin(filtered) as filtered, content, relevant
            FROM df_rel_stopped
            ''')

### add ngrams later if needed
# ngramDataFrame.registerTempTable('ngrammed_stopped_rel')

# stop_strings = spark.sql('''
#             SELECT listjoin(filtered) as filtered, ngrams, content, relevant
#             FROM ngrammed_stopped_rel
#             ''')

In [303]:
tokenizer = Tokenizer(inputCol="filtered", outputCol="words")
wordsData = tokenizer.transform(stop_strings)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2500)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# rescaledData.select("label", "features").show()

In [308]:
rescaledData.printSchema()

rescaledData.select('features').first()

root
 |-- filtered: string (nullable = true)
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- relevant: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



Row(features=SparseVector(2500, {61: 3.2888, 117: 1.9087, 132: 1.286, 224: 1.8984, 232: 2.193, 297: 0.4224, 405: 2.6723, 614: 1.1202, 666: 1.6664, 696: 0.8122, 813: 4.201, 855: 1.2046, 906: 1.1156, 1103: 0.9572, 1142: 4.3087, 1171: 3.3924, 1183: 3.662, 1184: 0.2636, 1207: 2.9454, 1220: 1.1831, 1309: 2.765, 1388: 2.9338, 1468: 0.5055, 1658: 4.9574, 1663: 4.6917, 1685: 0.3915, 1768: 1.6893, 1848: 0.8515, 1863: 2.3563, 1873: 2.0015, 1946: 0.6801, 2092: 1.977, 2096: 2.6723, 2139: 3.6502, 2179: 1.2547, 2200: 0.3792, 2299: 2.713, 2304: 2.258, 2432: 9.5186, 2459: 4.0675, 2473: 2.8406}))

In [78]:
def autostem(cell):
    return cell.asDict()['text']

In [82]:
spark.udf.register('pstem', autostem )

all_corpus = spark.sql('''
                    SELECT count(text)
                    FROM yelp
                    '''
                    )

all_corpus.first()

Row(count(text)=4153150)

# TESTING

In [310]:
!cd ..

In [375]:
from src.sparktools import SparkNLPClassifier

In [378]:
model = SparkNLPClassifier('../data/acc_dataset_local.json')

In [384]:
model.train_vectorize('useful + funny + cool')

Exception AttributeError: "'GBTClassifier' object has no attribute '_java_obj'" in <object repr() failed> ignored


NameError: global name 'label' is not defined

In [400]:
test = model.train_test_split()

In [401]:
test.printSchema()

root
 |-- filtered: string (nullable = true)
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



In [402]:
model.train_boosted_forest()

TypeError: __init__() got an unexpected keyword argument 'probabilityCol'

In [388]:
prediction = model.predict(test)

In [389]:
prediction.printSchema()

root
 |-- filtered: string (nullable = true)
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- indexedFeatures: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [391]:
prediction.select('label', 'prediction').show(50)

+-----+----------+
|label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       0.0|
|    1|       1.0|
|    1|       0.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       0.0|
|    1|       0.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       0.0|
|    1|       0.0|
|    1|       0.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       0.0|
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|     

In [339]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
### TRAINING RANDOM FOREST

# Load and parse the data file, converting it to a DataFrame.
# data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# this is done

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
# featureIndexer =\
#     VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
# (trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only

In [352]:
### TRAIN NAIVE BAYES

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(model.train_popular)

# select example rows to display.
predictions = model.transform(mpos)
predictions.show()

+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|            filtered|             content|label|               words|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|['d, just, like, ...|[['d, just, like,...|    1|[['d,, just,, lik...|(25,[0,1,2,3,4,5,...|(25,[0,1,2,3,4,5,...|[-258.27869605749...|[0.49853738144182...|       1.0|
|[a, fan, for, sur...|[[a, fan, for, su...|    1|[[a,, fan,, for,,...|(25,[0,2,3,4,5,6,...|(25,[0,2,3,4,5,6,...|[-38.758179370807...|[0.49716334288939...|       1.0|
|[a, friend, of, a...|[[a, friend, of, ...|    1|[[a,, friend,, of...|(25,[0,1,2,3,4,5,...|(25,[0,1,2,3,4,5,...|[-94.157227336214...|[0.48563119313127...|       1.0|
|[a,

In [355]:
predictions.printSchema()

root
 |-- filtered: string (nullable = true)
 |-- content: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [394]:
prediction.select('prediction', 'label').show(30).sortby('probability')

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       0.0|    1|
|       0.0|    1|
|       1.0|    1|
|       0.0|    1|
|       0.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       0.0|    1|
|       0.0|    1|
|       0.0|    1|
|       0.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       0.0|    1|
|       0.0|    1|
|       0.0|    1|
|       0.0|    1|
|       0.0|    1|
|       1.0|    1|
|       1.0|    1|
+----------+-----+
only showing top 30 rows



AttributeError: 'NoneType' object has no attribute 'sortby'

In [398]:
prediction.registerTempTable('pred')
spark.udf.register('acc', lambda x,y: 1 if x==y else 0)

In [399]:
accs = spark.sql('''
        SELECT acc(prediction, label) as accur
        FROM pred
        ORDER BY probability DESC
''')

ERROR:root:An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line string', (1, 0))



AnalysisException: u"cannot resolve '`probability`' given input columns: [accur]; line 4 pos 17;\n'Sort ['probability DESC NULLS LAST], true\n+- Project [acc(prediction#4006, label#3539) AS accur#4143]\n   +- SubqueryAlias pred\n      +- Project [filtered#3588, content#3538, label#3539, words#3593, rawFeatures#3599, features#3635, indexedFeatures#3997, UDF(features#3635) AS prediction#4006]\n         +- Project [filtered#3588, content#3538, label#3539, words#3593, rawFeatures#3599, features#3635, UDF(features#3635) AS indexedFeatures#3997]\n            +- Sample 0.8, 1.0, false, 202007017856859376\n               +- Sort [filtered#3588 ASC NULLS FIRST, content#3538 ASC NULLS FIRST, label#3539 ASC NULLS FIRST, words#3593 ASC NULLS FIRST, rawFeatures#3599 ASC NULLS FIRST, features#3635 ASC NULLS FIRST], false\n                  +- Project [filtered#3588, content#3538, label#3539, words#3593, rawFeatures#3599, UDF(rawFeatures#3599) AS features#3635]\n                     +- Project [filtered#3588, content#3538, label#3539, words#3593, UDF(words#3593) AS rawFeatures#3599]\n                        +- Project [filtered#3588, content#3538, label#3539, UDF(filtered#3588) AS words#3593]\n                           +- Project [listjoin(filtered#3582) AS filtered#3588, content#3538, label#3539]\n                              +- SubqueryAlias df_lab_stopped\n                                 +- Project [content#3538, label#3539, UDF(content#3538) AS filtered#3582]\n                                    +- Union\n                                       :- GlobalLimit 3858\n                                       :  +- LocalLimit 3858\n                                       :     +- Project [content#3538, label#3539]\n                                       :        +- Sort [_nondeterministic#3576 ASC NULLS FIRST], true\n                                       :           +- Project [content#3538, label#3539, rand(-3305983844601206200) AS _nondeterministic#3576]\n                                       :              +- Filter (label#3539 = 1)\n                                       :                 +- Project [array(words_only(text#3522)) AS content#3538, cast(cast(imbin(((useful#3524L + funny#3519L) + cool#3517L)) as decimal(20,0)) as int) AS label#3539]\n                                       :                    +- SubqueryAlias df\n                                       :                       +- Relation[business_id#3516,cool#3517L,date#3518,funny#3519L,review_id#3520,stars#3521L,text#3522,type#3523,useful#3524L,user_id#3525] json\n                                       +- GlobalLimit 3858\n                                          +- LocalLimit 3858\n                                             +- Project [content#3538, label#3539]\n                                                +- Sort [_nondeterministic#3572 ASC NULLS FIRST], true\n                                                   +- Project [content#3538, label#3539, rand(3810529930739689851) AS _nondeterministic#3572]\n                                                      +- Filter (label#3539 = 0)\n                                                         +- Project [array(words_only(text#3522)) AS content#3538, cast(cast(imbin(((useful#3524L + funny#3519L) + cool#3517L)) as decimal(20,0)) as int) AS label#3539]\n                                                            +- SubqueryAlias df\n                                                               +- Relation[business_id#3516,cool#3517L,date#3518,funny#3519L,review_id#3520,stars#3521L,text#3522,type#3523,useful#3524L,user_id#3525] json\n"