conda create -n pyspark python=3.6.8 pip wheel pandas matplotlib ipykernel

In [1]:
import os
import sys

os.environ["PYSPARK_SUBMIT_ARGS"]='pyspark-shell --master yarn ' # local[0]'
os.environ["PYSPARK_PYTHON"]='/usr/bin/python3'
#PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"]='/usr/bin/python3'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2.3.1.0.0-78
      /_/

Using Python version 3.6.8 (default, Dec 30 2018 01:22:34)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql.types import *

In [3]:
train_schema = StructType(
   fields = [
      StructField("uid", StringType(), True),
      StructField("gender_age", StringType(), True),
      StructField("visits", ArrayType(
          StructType([
               StructField("timestamp", LongType(), True),
               StructField("url", StringType(), True)
               ])
      ),True)
   ])

In [4]:
test_schema = StructType(
   fields = [
      StructField("uid", StringType(), True),
      StructField("visits", ArrayType(
          StructType([
               StructField("timestamp", LongType(), True),
               StructField("url", StringType(), True)
               ])
      ),True)
])

https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel

https://www.slideshare.net/SparkSummit/building-custom-ml-pipelinestages-for-feature-selection-with-marc-kaminski

https://stackoverflow.com/questions/42140980/spark-ml-pipelines-unseen-label-exception-when-classifying-new-examples?rq=1

In [144]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class Url2DomainTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    def __init__(self, inputCol="visits.url", outputCol="urls"):
        super(Url2DomainTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
    
     
    def _transform(self, dataset):
        import re
        from urllib.parse import urlparse
        from urllib.request import urlretrieve, unquote
        from pyspark.sql import functions as F 
        
        def url2domain(url):
            url = re.sub('(http(s)*://)+', 'http://', url)
            parsed_url = urlparse(unquote(url.strip()))
            if parsed_url.scheme not in ['http','https']: return None
            netloc = re.search("(?:www\.)?(.*)", parsed_url.netloc).group(1)
            if netloc is not None: return str(netloc.encode('utf8')).strip()
            return None    
        
        url2domain_udf = F.udf(lambda xx: [ url2domain(x) for x in xx],
                   ArrayType(StringType()))
        dataset = dataset.withColumn(self.outputCol,url2domain_udf(self.inputCol))
        return dataset

    

In [158]:
class SelectFields(Transformer, DefaultParamsReadable, DefaultParamsWritable):

    #@keyword_only
    def __init__(self, selectFields=["uid","gender_age","urls"]):
        super(SelectFields, self).__init__()
        self.selectFields = selectFields
        
    def _transform(self, dataset):
        return dataset.select(self.selectFields)



In [7]:
label_strings = ['M:25-34',
 'F:25-34',
 'M:35-44',
 'F:35-44',
 'F:18-24',
 'F:45-54',
 'M:45-54',
 'M:18-24',
 'F:>=55',
 'M:>=55']

In [8]:
training = "lab04/lab04_train_merged_labels.json"
df_train = spark.read.json(training, train_schema)

In [9]:
df_train.select("visits.url").show()

+--------------------+
|                 url|
+--------------------+
|[http://zebra-zoy...|
|[http://sweetradi...|
|[http://ru.orifla...|
|[http://translate...|
|[https://mail.ram...|
|[https://cfire.ma...|
|[http://www.msn.c...|
|[http://www.gazpr...|
|[http://lifenews....|
|[https://www.goog...|
|[http://muz4in.ne...|
|[http://kosmetist...|
|[http://android.m...|
|[http://tsn.ua/po...|
|[http://www.jobin...|
|[http://www.abc-p...|
|[http://easygames...|
|[http://www.ratan...|
|[http://sam-zdrav...|
|[http://www.msn.c...|
+--------------------+
only showing top 20 rows



In [10]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform

class SetValueTransformer(
    Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,
):
    value = Param(
        Params._dummy(),
        "value",
        "value to fill",
    )

    @keyword_only
    def __init__(self, outputCols=None, value=0.0):
        super(SetValueTransformer, self).__init__()
        self._setDefault(value=0.0)
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self, outputCols=None, value=0.0):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this SetValueTransformer.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setValue(self, value):
        """
        Sets the value of :py:attr:`value`.
        """
        return self._set(value=value)

    def getValue(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.value)

    def _transform(self, dataset):
        
        dataset = dataset.withColumn(self.getOutputCols()[0], lit(self.getValue()))
        return dataset

In [148]:
#indexer.fit(df_train.limit(10)).labels
df_train_cut = df_train.limit(10)
#labels = indexer.fit(df_train_cut).labels


In [131]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

In [181]:
url2domain_transformer = Url2DomainTransformer(outputCol="urls",inputCol="visits.url")
select_transformer = SelectFields(selectFields=["uid","gender_age","urls"])
indexer = StringIndexer(inputCol="gender_age", outputCol="label", handleInvalid="keep")
pipeline_transform = Pipeline(stages =  [url2domain_transformer, select_transformer, indexer])

In [193]:


cv = CountVectorizer(inputCol="urls", outputCol="features")

#labels = indexer.fit(df_train_cut).labels
#df_train_cut=indexer.fit(df_train_cut).transform(df_train_cut)

#transformer = SetValueTransformer(outputCols=["a"])

lr = LogisticRegression(labelCol='label', probabilityCol='lr_probability', predictionCol='lr_prediction', rawPredictionCol='lr_rawPrediction')

#rf = RandomForestClassifier(labelCol='label', probabilityCol='rf_probability', predictionCol='rf_prediction', rawPredictionCol='rf_rawPrediction')

#lr_label_converter = IndexToString(inputCol="lr_prediction", outputCol="lr_gender_age", labels=label_strings)

#rf_label_converter = IndexToString(inputCol="rf_prediction", outputCol="rf_gender_age", labels=label_strings)
lr_label_converter = IndexToString(inputCol="lr_prediction", outputCol="lr_gender_age", labels=label_strings)

pipeline = Pipeline(stages =  [url2domain_transformer,select_transformer, indexer, cv, lr, lr_label_converter])



In [161]:
model_trasform = pipeline_transform.fit(df_train_cut)
model_trasform.write().overwrite().save("tst_custom_transformer_model")

In [162]:
from pyspark.ml import PipelineModel
model_transform_reloaded =  PipelineModel.load("tst_custom_transformer_model")

In [163]:
df_train_cut_transformed = model_transform_reloaded.transform(df_train_cut)

In [164]:
df_train_cut_transformed.show()

+--------------------+----------+--------------------+-----+
|                 uid|gender_age|                urls|label|
+--------------------+----------+--------------------+-----+
|d50192e5-c44e-4ae...|   F:18-24|[b'zebra-zoya.ru'...|  1.0|
|d502331d-621e-472...|   M:25-34|[b'sweetrading.ru...|  4.0|
|d50237ea-747e-48a...|   F:25-34|[b'ru.oriflame.co...|  0.0|
|d502f29f-d57a-46b...|   F:25-34|[b'translate-tatt...|  0.0|
|d503c3b2-a0c2-4f4...|    M:>=55|[b'mail.rambler.r...|  2.0|
|d5090ddf-5648-487...|   F:25-34|[b'cfire.mail.ru'...|  0.0|
|d50bcef8-16ff-4e8...|   F:25-34|[b'msn.com', b'ms...|  0.0|
|d50e23dc-0cbd-488...|   F:18-24|[b'gazprom.ru', b...|  1.0|
|d50fdabb-4208-441...|   F:45-54|[b'lifenews.ru', ...|  3.0|
|d511b480-23a6-482...|   F:18-24|[b'google.ru', b'...|  1.0|
+--------------------+----------+--------------------+-----+



In [165]:
model = pipeline.fit(df_train_cut_transformed)

In [194]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr_evaluator = BinaryClassificationEvaluator(
        rawPredictionCol='lr_rawPrediction', labelCol='label', metricName='areaUnderROC'
    )

In [195]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#regParam for L2 regularization, https://craftappmobile.com/l1-vs-l2-regularization/
#elasticNetParam https://en.wikipedia.org/wiki/Elastic_net_regularization, https://www.quora.com/What-is-elastic-net-regularization-in-machine-learning

grid = ParamGridBuilder() \
    .addGrid(LogisticRegression.regParam, [0.1, 0.01]) \
    .addGrid(LogisticRegression.elasticNetParam, [0.1, 0.01]) \
    .build()


In [196]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=grid,
    evaluator=lr_evaluator,
    numFolds=3,
    parallelism=2
)

In [130]:
df_train_cut.show()

+--------------------+----------+--------------------+
|                 uid|gender_age|              visits|
+--------------------+----------+--------------------+
|d50192e5-c44e-4ae...|   F:18-24|[[1419688144068, ...|
|d502331d-621e-472...|   M:25-34|[[1419717886224, ...|
|d50237ea-747e-48a...|   F:25-34|[[1418840296062, ...|
|d502f29f-d57a-46b...|   F:25-34|[[1418217864467, ...|
|d503c3b2-a0c2-4f4...|    M:>=55|[[1427272415001, ...|
|d5090ddf-5648-487...|   F:25-34|[[1419777541435, ...|
|d50bcef8-16ff-4e8...|   F:25-34|[[1426704753001, ...|
|d50e23dc-0cbd-488...|   F:18-24|[[1419613709992, ...|
|d50fdabb-4208-441...|   F:45-54|[[1427203859001, ...|
|d511b480-23a6-482...|   F:18-24|[[1427237735001, ...|
+--------------------+----------+--------------------+



In [197]:
cvModel = crossval.fit(df_train.limit(10))

In [180]:
print(indexer.explainParams())

handleInvalid: how to handle invalid data (unseen or NULL values) in features and label column of string type. Options are 'skip' (filter out rows with invalid data), error (throw an error), or 'keep' (put invalid data in a special additional bucket, at index numLabels). (default: error)
inputCol: input column name. (current: gender_age)
outputCol: output column name. (default: StringIndexer_41b393718a28418251d5__output, current: label)
stringOrderType: How to order labels of string column. The first label after ordering is assigned an index of 0. Supported options: frequencyDesc, frequencyAsc, alphabetDesc, alphabetAsc. (default: frequencyDesc)


In [198]:
cvModel.bestModel.write().overwrite().save("tst_cv_bestmodel_model")

In [199]:
bestmodel_reloaded =  PipelineModel.load("tst_cv_bestmodel_model")

In [200]:
bestmodel_reloaded.transform(df_train.limit(10)).show()

+--------------------+----------+--------------------+-----+--------------------+--------------------+--------------------+-------------+-------------+
|                 uid|gender_age|                urls|label|            features|    lr_rawPrediction|      lr_probability|lr_prediction|lr_gender_age|
+--------------------+----------+--------------------+-----+--------------------+--------------------+--------------------+-------------+-------------+
|dd387df7-f50e-4b7...|   F:25-34|[b'zdorovie43.gor...|  1.0|(128,[0,3,4,9,12,...|[3.08571958527996...|[3.47993295511760...|          5.0|      F:45-54|
|dd3aa566-00da-491...|   M:45-54|[b'mail.rambler.r...|  3.0|(128,[0,5,7,26,55...|[4.27123181104901...|[9.77632606185703...|          5.0|      F:45-54|
|dd3adce0-6207-479...|   F:25-34|[b'pokupkalux.ru'...|  1.0|(128,[1,2,6,11,13...|[-1.8633393060618...|[6.46458303077854...|          1.0|      F:25-34|
|dd3bebdf-69ea-4fd...|   M:35-44|[b'biznes-doman.c...|  4.0|    (128,[81],[1.0])|[5.3049

In [174]:
StringIndexer().explainParams()

"handleInvalid: how to handle invalid data (unseen or NULL values) in features and label column of string type. Options are 'skip' (filter out rows with invalid data), error (throw an error), or 'keep' (put invalid data in a special additional bucket, at index numLabels). (default: error)\ninputCol: input column name. (undefined)\noutputCol: output column name. (default: StringIndexer_478e9078f8887914014c__output)\nstringOrderType: How to order labels of string column. The first label after ordering is assigned an index of 0. Supported options: frequencyDesc, frequencyAsc, alphabetDesc, alphabetAsc. (default: frequencyDesc)"

In [115]:
df_train_cut = df_train.limit(10)

In [83]:
df_train_cut.show()

+--------------------+----------+--------------------+-----+
|                 uid|gender_age|              visits|label|
+--------------------+----------+--------------------+-----+
|dd387df7-f50e-4b7...|   F:25-34|[[1419606827095, ...|  1.0|
|dd3aa566-00da-491...|   M:45-54|[[1426958952000, ...|  3.0|
|dd3adce0-6207-479...|   F:25-34|[[1414428818001, ...|  1.0|
|dd3bebdf-69ea-4fd...|   M:35-44|[[1422207817000, ...|  4.0|
|dd41e32e-202a-424...|   M:25-34|[[1418813830629, ...|  0.0|
|dd45f6b7-675c-414...|   F:18-24|[[1419921965314, ...|  2.0|
|dd472fa9-d1ee-4a5...|   M:25-34|[[1418674371440, ...|  0.0|
|dd474635-14fd-483...|   F:25-34|[[1418405925940, ...|  1.0|
|dd489bce-c115-463...|   M:25-34|[[1427219718001, ...|  0.0|
|dd4b6f79-3d65-4f5...|   M:25-34|[[1427186182000, ...|  0.0|
+--------------------+----------+--------------------+-----+



In [88]:
select_transformer.selectFields

['uid', 'gender_age', 'urls']

In [89]:
df_train_cut

DataFrame[uid: string, gender_age: string, visits: array<struct<timestamp:bigint,url:string>>, label: double]

In [87]:
df_train_cut

DataFrame[uid: string, gender_age: string, visits: array<struct<timestamp:bigint,url:string>>, label: double]

In [112]:
cvModel.bestModel.write().overwrite().save('lab04s_model')

In [53]:
pipeline.explainParams()

'stages: a list of pipeline stages (current: [Url2DomainTransformer_447b82347803217117ef, SelectFields_47998934796a145d7cf0, CountVectorizer_461ea09a378ed3faa2e4, StringIndexer_4b4aad7b8abe1f399a9c, LogisticRegression_44ce84cc5481de82497c, IndexToString_41cfb7bd197702f8c352])'

In [52]:
pipeline.explainParam(712)

ValueError: Cannot resolve 712 as a param.

In [33]:
cvModel = crossval.fit(df_train)

In [34]:
cvModel.write.('lab04s_model')

AttributeError: 'Url2DomainTransformer' object has no attribute '_to_java'

Проверю модель на урлах, открытых в моём браузере

In [4]:
data = {
  "uid": "bd7a30e1-a25d-4cbf-a03f-61748cbe540e",
  "visits": [
    {
      "url": "https://mail.google.com/mail/u/0/#inbox",
      "timestamp": 1419775945781
    }
   ,  
   {
      "url": "https://lk-de.newprolab.com/",
      "timestamp": 1419775945781
    }
   ,  
   {
      "url": "https://yandex.ru/pogoda/moscow/maps/temperature?via=mmapwb&le_TemperatureBalloons=0&le_WindParticles=1&ll=25.976425_49.047348&z=4",
      "timestamp": 1419775945781
    }
  ,  
   {
      "url": "https://translate.yandex.ru/?lang=en-ru&text=derivation",
      "timestamp": 1419775945781
    }
 ,  
   {
      "url": "https://web.whatsapp.com/",
      "timestamp": 1419775945781
    }
,  
   {
      "url": "https://app.slack.com/client/TNG296ABE/CPPRL95HU/thread/CP73F91ST-1571040655.075700",
      "timestamp": 1419775945781
    }
,  
   {
      "url": "https://github.com/newprolab/content_dataengineer5/blob/master/labs/de_lab_04.md",
      "timestamp": 1419775945781
    }
  ]
}

In [5]:
rdd = sc.parallelize([data])

In [6]:
df_test = spark.read.json(rdd)

In [7]:
df_test.show()

+--------------------+--------------------+
|                 uid|              visits|
+--------------------+--------------------+
|bd7a30e1-a25d-4cb...|[[1419775945781, ...|
+--------------------+--------------------+



In [15]:
df_test = df_test.withColumn("urls",url2domain_udf(df_test["visits"].getField("url"))) 

In [16]:
df_test = df_test.select(["uid", "urls"])

In [17]:
model_reloaded.transform(df_test).show(1,False)

+------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------+
|uid                                 |urls                                                                                                                                    |features                              |rawPrediction                                                                                                                                 