In [1]:
import os
import re
import shutil
import argparse
import numpy as np
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql import Window
from pyspark.ml.feature import  Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.feature import NGram, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import row_number, monotonically_increasing_id

In [2]:
def read_line(row):
    if row is None:
        return None
    row = row.split('\t')
    rurl = row[0]
    activity = row[1]
    label = row[2]
    activity = re.sub('[^A-Za-z_]', '', activity)
    activity = re.sub(' +', ' ', activity.replace('_', ' ').strip())
    return rurl, activity, label

In [3]:
def read_data(sc, data_path):
    if sc is None or data_path is None:
        return None
    documents = sc.textFile(data_path).map(lambda line : read_line(line))
    schema = StructType([StructField("rurl", StringType(), True),
                         StructField("activity", StringType(), True),
                         StructField("label", StringType(), True)])
    documents = SQLContext(sc).createDataFrame(documents, schema)  
    return documents

In [4]:
def transform_data(documents):
    if documents is None:
        return None
    tokenizer = Tokenizer(inputCol="activity", outputCol="tokens")
    documents = tokenizer.transform(documents)
    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    documents = remover.transform(documents)
    ngram = NGram(n=2, inputCol="filtered_tokens", outputCol="bi-grams")
    documents = ngram.transform(documents)
    cv = CountVectorizer(inputCol="bi-grams", outputCol="features")
    cvModel = cv.fit(documents)
    documents = cvModel.transform(documents)
    documents = documents.withColumn("label", documents["label"].cast("double"))
    return documents, cvModel.vocabulary
    

In [5]:
def run_model(documents):
    if documents is None:
        return None
    lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=200)
    paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.2, 0.4, 0.6, 0.8, 1.0]) \
    .build()
    cross_val = CrossValidator(estimator=lr, estimatorParamMaps = paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds= 5)
    cvModel = cross_val.fit(documents)
    return cvModel.bestModel
    
    

In [6]:
def extract_metric(model, documents, output_path):
    if model is None or documents is None or output_path is None:
        return None
    metricdf = pd.DataFrame({'auc': []})
    model_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
    predictions = model.transform(documents)
    metric = model_eval.evaluate(predictions)
    metricdf = metricdf.append({'auc': metric}, ignore_index=True)
    metricdf = SQLContext(sc).createDataFrame(metricdf)
    metricdf.coalesce(1).write.format('csv').option('delimiter', '\t').option('header', 'true').save(output_path + '/' + 'metric')
    return metric 


In [7]:
def write_model(sc, model, output_path):
    if model is None or sc is None or output_path is None:
        return None
    # Save and load model
    # model.save(sc, model_path)
    # same_model = MatrixFactorizationModel.load(sc, model_path)
    output_path = output_path + '/' + 'model'
    model.save(output_path)
    return None

In [8]:
def write_model_weigths(model, output_path):
    if model is None or output_path is None:
        return None
    weights = model.coefficients
    modelweightsdf = SQLContext(sc).createDataFrame(weights.toArray().tolist(), "float")
    modelweightsdf = modelweightsdf.withColumnRenamed("value", "model_weights")
    modelweightsdf = modelweightsdf.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id()))-1)
    modelvocabdf = SQLContext(sc).createDataFrame(vocab, "string")
    modelvocabdf = modelvocabdf.withColumnRenamed("value", "model_vocab")
    modelvocabdf = modelvocabdf.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id()))-1)
    modeldf = modelvocabdf.join(modelweightsdf, "index", "outer").drop("index")
    modeldf.coalesce(1).write.format('csv').option('delimiter', '\t').option('header', 'true').save(output_path + '/' + 'model_weights')
    return None
    

In [None]:
sc = SparkContext(conf=SparkConf().setAppName("category_classifier_on_spark"))
documents = read_data(sc, 'test')
documents, vocab = transform_data(documents)
trainDF, testDF = documents.randomSplit([0.8,0.2])
model = run_model(trainDF)
metric = extract_metric(model, testDF, 'output')
write_model(sc, model, 'output')
write_model_weigths(model, 'output')

In [9]:
# def run_logistic_regression(sc, input_path, output_path):
#     if sc is None or input_path is None or output_path is None:
#         return None
#     documents = read_data(sc, input_path)
#     documents, vocab = transform_data(documents)
#     trainDF, testDF = documents.randomSplit([0.8,0.2])
#     model = run_model(trainDF)
#     metric = extract_metric(model, testDF, output_path)
#     write_model(sc, model, output_path)
#     write_model_weigths(model, output_path)
#     return None


In [None]:
# def main(args):
#     if args is None:
#         return None
#     sc = SparkContext(conf = SparkConf().setAppName('LR_on_spark').set('spark.driver.maxResultSize', '4g'))
#     run_logistic_regression(sc, args.input_path, args.output_path)

In [None]:
# if __name__ == "__main__":
#     parser = argparse.ArgumentParser()
#     parser.add_argument('--input_path', help='input_path', default='input_path')
#     parser.add_argument('--output_path', help='output_path', default='output_path')
#     args = parser.parse_args()
#     main(args) 