In [1]:
import findspark
findspark.init('/home/kakade/spark')
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
spark = SparkSession.builder.appName("kakade").getOrCreate()

In [21]:
from pyspark.sql.types import *

features_schema = [StructField("count_num", IntegerType(), True), StructField("freq_num", DoubleType(), True),\
                StructField("count_adj",IntegerType(), True), StructField("freq_adj",DoubleType(), True), \
                StructField("count_noun",IntegerType(), True), StructField("freq_noun",DoubleType(), True), \
                StructField("count_conj",IntegerType(), True), StructField("freq_conj", DoubleType(), True), \
                StructField("count_color",IntegerType(), True), StructField("freq_color",DoubleType(), True), \
                StructField("count_brand",IntegerType(), True), StructField("freq_brand",DoubleType(), True), \
                StructField("count_words",IntegerType(), True), StructField("count_chars",IntegerType(), True), \
                StructField("w2v_score",DoubleType(), True), StructField("tf_score",DoubleType(), True), \
                StructField("sp_score",DoubleType(), True)]

data_schema = [StructField("index", IntegerType(), True), StructField("title", StringType(), True),\
                StructField("new_title",StringType(), True), StructField("description",StringType(), True), \
                StructField("new_des",StringType(), True), StructField("category_1",StringType(), True), \
                StructField("concise",IntegerType(), True), StructField("label", IntegerType(), True)]

# country sku_id title category_1 category_2 category_3 short_description price product_type 
features_struc = StructType(fields=features_schema)
data_struc = StructType(fields=data_schema)



In [27]:
from pyspark.sql.functions import *
whole_data = spark.read.csv("/home/kakade/Support_files/With_catgory_index_Processed_title_des.csv", escape='"', schema=data_struc)
final_features = spark.read.csv("/home/kakade/Support_files/final_features.csv", schema=features_struc).withColumn("index", monotonically_increasing_id() + 1)
raw_data=spark.read.csv("../data/Clean_Data_Frame.csv", escape='"', header=True).select("category_1","category_2","category_3").withColumn("index", monotonically_increasing_id() + 1)


In [5]:
used_index=whole_data.filter("category_1 != '''Fashion'''").filter("category_1 != '''Watches Sunglasses Jewellery'''").select("category_1","index","label")

In [67]:
combine_data=raw_data.join(whole_data.select("index","new_title","new_des","label"),"index","inner")

In [68]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer,StopWordsRemover,CountVectorizer,VectorAssembler
tokenizer_tit = RegexTokenizer(inputCol="new_title", outputCol="title_words", pattern="\\W")
tokenizer_cat1 = RegexTokenizer(inputCol="category_1", outputCol="cat1_words", pattern="\\W")
tokenizer_cat2 = RegexTokenizer(inputCol="category_2", outputCol="cat2_words", pattern="\\W")
tokenizer_cat3 = RegexTokenizer(inputCol="category_3", outputCol="cat3_words", pattern="\\W")
tokenizer_des = RegexTokenizer(inputCol="new_des", outputCol="des_words", pattern="\\W")

combine_data = tokenizer_tit.transform(combine_data)
combine_data = tokenizer_cat1.transform(combine_data)
combine_data = tokenizer_cat2.transform(combine_data)
combine_data = tokenizer_cat3.transform(combine_data)
combine_data = tokenizer_des.transform(combine_data)
combine_data.select("title_words","cat1_words","cat2_words","cat3_words","des_words").show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|         title_words|          cat1_words|          cat2_words|          cat3_words|           des_words|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[adana, galleri, ...|           [fashion]|             [women]|      [muslim, wear]|[materi, non, she...|
|[cuba, heartbreak...|    [health, beauty]|        [bath, body]|  [hand, foot, care]|[formul, with, oi...|
|[andoer, 150cm, c...|[tv, audio, video...|             [audio]|[live, sound, stage]|[150cm, mini, mic...|
|[anmyna, complain...|    [health, beauty]|        [hair, care]|[shampoos, condit...|[anmyna, complain...|
|[argit, argiltubo...|    [health, beauty]|      [men, s, care]|[body, and, skin,...|[100, authent, rr...|
|[asus, tp300lj, d...|[computers, laptops]|           [laptops]|[traditional, lap...|[genuin, windows,...|
|[ng, 40c, ring, s...|           [cam

In [57]:
# from pyspark.sql.functions import udf,regexp_replace
# def same_word_func(col1,col2,col3,col4):
#     total_set=set(col2).union(set(col3)).union(set(col4))
#     return len(set(col1).intersection(total_set))

# same_word = udf(same_word_func, IntegerType())
# test = combine_data.withColumn("cat_same",same_word("title_words","cat1_words","cat2_words","cat3_words"))

def des_same_func(col1,col2):
    return len(set(col1).intersection(set(col2)))
des_same = udf(des_same_func, IntegerType())
test = combine_data.withColumn("des_same",des_same("title_words","des_words"))

In [71]:
# combine_data=combine_data.select("index","title_words")
# combine_data.show()
combine_data.repartition(1).write.mode("overwrite").json("test")

In [74]:
test.filter("des_same>0").filter("label==1").count()

18474

clarity==1 25833
clarity==0 505
19587

In [6]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["count_num", "freq_num",'count_adj','freq_adj','count_noun','freq_noun','count_conj','freq_conj',\
               'count_color','freq_color','count_brand','freq_brand','count_words','count_chars','w2v_score','tf_score','sp_score'],
    outputCol="features")
vector_data = assembler.transform(other_data).select("index","features","label")


In [25]:
assembler_1 = VectorAssembler(
    inputCols=["freq_num",'freq_adj','freq_noun', 'freq_conj','freq_color','freq_brand',\
               'count_words','count_chars','w2v_score','tf_score','sp_score'],
    outputCol="features")
vector_data = assembler_1.transform(other_data).select("index","features","label")

In [19]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors


pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(vector_data)

result = pca_model.transform(vector_data).select("pcaFeatures","label")
result.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|pcaFeatures                                                                                                                                                                                                |label|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[-55.11052481527143,2.1133371440652633,-1.489093776250173,0.6365763920451244,0.20582165889843246,0.15099281169192152,-0.01846540042068144,-0.11997140090553583,0.11107477348145467,-0.7673217914368877]    |1    |
|[-43.50550147481198,-1.285913484380312,-0.599789202383693,0.5560292583918586,0.1233575467353137,0.16844326957013328,0.08210374077258595,-0.138520201331

In [11]:
vector_data=vector_data.cache()

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

(trainingData, testData) = result.randomSplit([0.8, 0.2])

lr = LogisticRegression(maxIter=30,featuresCol="pcaFeatures")

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid_lr = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .build()


# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
lr_tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid_lr,
                           evaluator=BinaryClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
lr_model = lr_tvs.fit(trainingData)

lr_prediction = lr_model.transform(testData)
evaluator = BinaryClassificationEvaluator()
accuracy_lr = evaluator.evaluate(lr_prediction)
print("LR accuracy = %g " % accuracy_lr)

LR accuracy = 0.589834 


In [26]:
lr_prediction = lr_model.transform(trainingData)

In [28]:
lr_prediction.filter("label==0").filter("prediction==0").count()

0

5183/ 109

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

LR accuracy = 0.455111 


In [27]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=10)

paramGrid_lsvc = ParamGridBuilder()\
    .addGrid(lsvc.regParam, [0.1,0.01]).build()

lsvc_tvs = TrainValidationSplit(estimator=lsvc,estimatorParamMaps=paramGrid_lsvc,evaluator=evaluator,trainRatio=0.8)    
# Run Crossvalidation, and choose the best set of parameters.
lsvc_model = lsvc_tvs.fit(trainingData)
lsvc_prediction = lsvc_model.transform(testData)
accuracy_lsvc = evaluator.evaluate(lsvc_prediction)
print("LR accuracy = %g " % accuracy_lsvc)

LR accuracy = 0.520147 


In [11]:
data = spark.read.format("libsvm").load("/home/kakade/spark/data/mllib/sample_libsvm_data.txt")

In [13]:
data.show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator


#Train_test_split
(trainingData, testData) = vector_data.randomSplit([0.8, 0.2])

#cross_validation_train setup
evaluator = BinaryClassificationEvaluator()
#LR model
lr = LogisticRegression(maxIter=10)

paramGrid_lr = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .build()

cv_lr = CrossValidator(estimator=lr,estimatorParamMaps=paramGrid_lr,evaluator=evaluator,numFolds=3)


#Decision Tree model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

paramGrid_dt = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [2, 3, 5]).build()
    
cv_dt = CrossValidator(estimator=dt,estimatorParamMaps=paramGrid_dt,evaluator=evaluator,numFolds=3)    
# Run Crossvalidation, and choose the best set of parameters.
lr_model = cv_lr.fit(trainingData)
dt_model = cv_dt.fit(trainingData)

# Make predictions.

lr_prediction = lr_model.transform(testData)
dt_prediction = dt_model.transform(testData)

accuracy_lr = evaluator.evaluate(lr_prediction)
accuracy_dt = evaluator.evaluate(dt_prediction)
print("LR accuracy = %g " % accuracy_lr)
print("DT accuracy = %g " % accuracy_dt)