In [35]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import HashingTF, Tokenizer, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.sql.types import NumericType, StructField, StructType
from pyspark.sql.functions import lit


spark = SparkSession.builder.getOrCreate()

In [36]:
df = spark.read.options(header='True').csv("./base_ga_amostra.csv")
df.show()

+--------+------+-------------+-------------+-----------+----------+-----------+----------+-----------+
| hitType|  page|     clientId|eventCategory|eventAction|eventLabel|  utmSource| utmMedium|utmCampaign|
+--------+------+-------------+-------------+-----------+----------+-----------+----------+-----------+
|pageview|/teste|cookie('_ga')|         None|       None|      None|str(thread)|str(linha)| Harlem DP6|
|pageview|/teste|cookie('_ga')|         None|       None|      None|str(thread)|str(linha)| Harlem DP6|
|pageview|/teste|cookie('_ga')|         None|       None|      None|str(thread)|str(linha)| Harlem DP6|
|pageview|/teste|cookie('_ga')|         None|       None|      None|str(thread)|str(linha)| Harlem DP6|
|pageview|/teste|cookie('_ga')|         None|       None|      None|str(thread)|str(linha)| Harlem DP6|
|pageview|/teste|cookie('_ga')|         None|       None|      None|str(thread)|str(linha)| Harlem DP6|
|pageview|/teste|cookie('_ga')|         None|       None|      N

In [37]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep").fit(df) for column in list(set(df.columns))]
assembler = VectorAssembler(
    inputCols=['utmMedium_index']#, 'eventLabel_index', 'page_index', 'eventCategory_index', 'clientId_index', 'utmSource_index', 'eventAction_index', 'utmCampaign_index']
    ,outputCol='features_index')
dt = DecisionTreeClassifier(featuresCol="features_index", labelCol="hitType_index") # REMOVER

In [38]:
stages = indexers
stages.append(assembler)
stages.append(dt) #REMOVER
pipeline = Pipeline(stages=stages)

In [39]:
(train, test) = df.randomSplit([0.8, 0.2])
model = pipeline.fit(train)

In [40]:
pred = model.transform(test)

In [41]:
pred.select('hitType', 'probability', 'prediction').show()

+--------+-----------+----------+
| hitType|probability|prediction|
+--------+-----------+----------+
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
+--------+-----------+----------+
only showing top 20 rows



In [42]:
model.write().overwrite().save("pipeline_model")

In [43]:
pipelineModel = PipelineModel.load('./pipeline_model')

In [44]:
test_predicted = pipelineModel.transform(test)
test_predicted.select('hitType', 'probability', 'prediction').show()

+--------+-----------+----------+
| hitType|probability|prediction|
+--------+-----------+----------+
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
|pageview|  [1.0,0.0]|       0.0|
+--------+-----------+----------+
only showing top 20 rows

