In [1]:
import pyspark
from pyspark import SparkConf

from pyspark.sql import (SparkSession, functions as F, types as T)
spark = (SparkSession.builder.master('yarn')
         .appName("LBA")
         .config('spark.master', 'local[*]')
         .config('spark.sql.cbo.enabled', True)
         .config('spark.sql.cbo.joinReorder.enabled', True)
         .config("spark.executor.instances",10)
         .config("spark.executor.cores",5)
         .config("spark.executor.memory", "8g")
         .config('spark.driver.memory', '4g')
         .config('spark.yarn.executor.memoryOverhead', '2g')
         .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.2.3")
         .config("spark.port.maxRetries", 100)
         .config('spark.default.parallelism', 4)
         .config('spark.kryoserializer.buffer.max', '512m')
         .getOrCreate()
         )
from pyspark.ml.feature import (Tokenizer, NGram, RegexTokenizer as smlRegexTokenizer,
                                StopWordsRemover as sml_StopWordsRemover,CountVectorizer, 
                                CountVectorizerModel, IDF,IDFModel, Word2Vec, StringIndexer, 
                                VectorIndexer, IndexToString, QuantileDiscretizer, VectorAssembler
                               )
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.ml.classification import (RandomForestClassifier,
                                       RandomForestClassificationModel)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator,
                                  MulticlassClassificationEvaluator)
from pyspark.ml.tuning import (CrossValidator, CrossValidatorModel,
                               ParamGridBuilder)
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
from pyspark.ml.stat import Correlation
from pyspark.ml import Pipeline, PipelineModel

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

import os
import fnmatch
import pandas as pd
import numpy as np
from copy import deepcopy
pd.set_option('display.max_columns', 999)
import logging
from time import time
from datetime import datetime
from itertools import combinations as cb

In [3]:
def readData(filePath):
    df = spark.read.option('inferSchema', 'True').option('header', 'True').csv(filePath)
    print('Shape of dataframe is:', df.count(), len(df.columns))
    print('Columns of dataframe are:', df.columns)
    return df

df = readData('word2vec_df1.csv')

('Shape of dataframe is:', 5416067, 51)
('Columns of dataframe are:', ['_c0', 'Louanna O. Heuhsen', '_c2', '2019-01-03 00:00:00', '2018-12-31 00:00:004', '591572499', 'L.HEUHSENFEES2', '2018-12-03 00:00:00', 'Paid', '2019-04-01 17:02:45.513', '30363724', '_c11', '_c12', '140000.0', '0.014', '2018-12-31 00:00:0015', '$35K/week (4 weeks)', '0.017', '1.0', '0.019', '_c20', '0.021', '0.022', '0.023', '0.024', '0.025', '14735498.0', '81621.0', '2019-01-03 17:51:01.277', '_c29', '(Unspecified)', 'A990', 'Summary and Phase Invoice Entry Template Fees', 'Fee33', 'Fee34', 'Fee35', 'Fee36', 'Strategy & Business Development', 'Altria', '_c39', '_c40', '_c41', '_c42', '_c43', '_c44', '_c45', '_c46', '_c47', '_c48', '_c49', 'left_only'])


In [4]:
df.count()

5416067

In [5]:
df = df.filter(F.col('$35K/week (4 weeks)').isNotNull())

In [6]:
df.count()

5100260

In [7]:
def cleanDescription(df):

    data = df[['$35K/week (4 weeks)']]
    
    documentAssembler = (DocumentAssembler() 
      .setInputCol('$35K/week (4 weeks)') 
      .setOutputCol("document") 
      )

    tokenizer = (RegexTokenizer() 
    .setInputCols(["document"])
    .setOutputCol("token") 
    .setPattern("\w+")
    )


    normalizer = (Normalizer() 
      .setInputCols(["token"]) 
      .setOutputCol("normalized")
      .setPattern("[^A-Za-z]")
      )

    stemmer = (Stemmer() 
       .setInputCols(["normalized"]) 
       .setOutputCol("stem")
       )

    
    finisher = (Finisher() 
    .setInputCols(["stem"])
    .setOutputCols(["stem_tokens"])
    .setAnnotationSplitSymbol(', ')
    .setValueSplitSymbol('|')
    .setCleanAnnotations(True)
    .setIncludeKeys(False)
    .setOutputAsArray(True)
    )

    remover = sml_StopWordsRemover(inputCol='stem_tokens', 
                              outputCol="clean_reviews",
                              stopWords=stopwordList)
    
    nlp_pipeline = Pipeline() \
      .setStages([
        documentAssembler, 
        tokenizer,
        normalizer, 
        stemmer,
        finisher,
        remover
      ])

    nlp_pl_stages = list(enumerate(nlp_pipeline.getStages()))
    nlp_pl_model = nlp_pipeline.fit(data)
    clean_description = (nlp_pl_model.transform(data))
    return clean_description, df

clean_description, df = cleanDescription(df)

In [8]:
clean_description.count()

5100260

In [9]:
clean_description.columns

['$35K/week (4 weeks)', 'stem_tokens', 'clean_reviews']

In [10]:
word2Vec = (Word2Vec()
                .setInputCol("clean_reviews")
                .setOutputCol("W2V")
                .setMinCount(1)
                .setNumPartitions(1)
                .setStepSize(0.1)
                .setWindowSize(10)
                .setVectorSize(400)
                .setMaxSentenceLength(20)
              )  
model = word2Vec.fit(clean_description)

model.getVectors().show()

+-------------+--------------------+
|         word|              vector|
+-------------+--------------------+
|        frane|[0.07755979895591...|
|ratingschedul|[0.04851161688566...|
|        shuvo|[-0.0091098425909...|
|       hilyer|[0.02221369184553...|
|      rahmani|[-0.0260612219572...|
|        epoca|[0.08312328159809...|
| positionstat|[-0.0698190852999...|
|chargingparti|[0.03715729713439...|
|        nejim|[0.00212544575333...|
|         stum|[-0.0501778051257...|
|         ihss|[-0.0148747991770...|
|        cerio|[0.11717541515827...|
|     zasypkin|[-0.0603234581649...|
|    attorneya|[-0.1003433242440...|
|     mattsson|[-0.1063998863101...|
|   hirschhorn|[0.05476534366607...|
|       dimsis|[0.01609681546688...|
|     clarissa|[0.26560556888580...|
|    bourgouin|[-0.0607305541634...|
|    dellacqua|[0.12180265039205...|
+-------------+--------------------+
only showing top 20 rows



In [21]:
w2v_dict = model.getVectors()

In [22]:
w2v_dict.write.parquet('w2v_dict.parquet', mode = 'overwrite')

In [25]:
model.write().overwrite().save('filepath_to_just_word2vec_not_its_model')

In [40]:
word2Vec.save('word2vecoriginal')

In [41]:
loaded = Word2Vec.load('word2vecoriginal')

In [42]:
loaded.getVectorSize()

400

In [32]:
model.save('modeloriginal')

In [37]:
loadedmodel = Word2VecModel.load('modeloriginal')

In [36]:
from pyspark.ml.feature import Word2VecModel

In [39]:
loadedmodel.getVectors().show()

+-------------+--------------------+
|         word|              vector|
+-------------+--------------------+
|        frane|[0.07755979895591...|
|ratingschedul|[0.04851161688566...|
|        shuvo|[-0.0091098425909...|
|       hilyer|[0.02221369184553...|
|      rahmani|[-0.0260612219572...|
|        epoca|[0.08312328159809...|
| positionstat|[-0.0698190852999...|
|chargingparti|[0.03715729713439...|
|        nejim|[0.00212544575333...|
|         stum|[-0.0501778051257...|
|         ihss|[-0.0148747991770...|
|        cerio|[0.11717541515827...|
|     zasypkin|[-0.0603234581649...|
|    attorneya|[-0.1003433242440...|
|     mattsson|[-0.1063998863101...|
|   hirschhorn|[0.05476534366607...|
|       dimsis|[0.01609681546688...|
|     clarissa|[0.26560556888580...|
|    bourgouin|[-0.0607305541634...|
|    dellacqua|[0.12180265039205...|
+-------------+--------------------+
only showing top 20 rows

