## Set enviroment variables

In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "/Users/snehalnair/Workspace/blog/image_processing/imageenv/bin/python"

## Initiate spark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('vla').getOrCreate()

## Import required libraries

In [2]:
from operator import itemgetter
from pyspark.sql import SparkSession
from typing import Iterable

import pandas as pd
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import (
    CountVectorizer,
    StringIndexer,
    VectorAssembler,
    Binarizer,
    HashingTF,
    StandardScaler
)
from pyspark.ml.linalg import DenseVector, Vectors, VectorUDT
from pyspark.sql import functions as fn
from pyspark.sql.types import ArrayType, DoubleType, FloatType, IntegerType, StringType, BinaryType, StructType, StructField
from pyspark.sql.window import Window
import ast

In [None]:
from elephas.ml_model import ElephasEstimator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *

## Set up the pipeline

In [8]:
stages = []
class TrimByMinVisits(Transformer):
        """Remove visitors with less than declared minimum visits

            Args:
                None

            Returns:
                Update self.data_frame with trimmed dataset
        """
        def __init__(self, minv: int):
            self.minvisits = minv
            super(TrimByMinVisits, self).__init__()
        def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
            my_window = Window.partitionBy('fullVisitorId')
            if self.minvisits > 1:
                return df.select(
                    "*", fn.count('fullVisitorId').\
                    over(my_window).alias("rank")).where(
                        "rank > {}".format(self.minvisits)
                    )
            return df 
stages += [TrimByMinVisits(MIN_VISITS)]

class TimeBetweenVisits(Transformer):
    """Gets the time lapsed between two consecutive visits

                Args:
                    Transformer class

                Returns:
                    TimeBetweenVisits Transformer for the pipeline
    """
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
            partition_col = 'fullVisitorId'
            orderby_sort_col = 'visitStartTime'
            my_window = Window.partitionBy(partition_col
                ).orderBy(orderby_sort_col
                )

            return df.withColumn(
                "prev_value", fn.lag(fn.col(orderby_sort_col)).over(my_window)
                ).withColumn("timeBetweenVisits",fn.when(fn.isnull(
                      fn.col(orderby_sort_col) - fn.col('prev_value')), 0).otherwise(
                      fn.col(orderby_sort_col) - fn.col('prev_value'))
                ).drop('prev_value')
stages += [TimeBetweenVisits()]     

class AssertDataType(Transformer):
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
        df.createOrReplaceTempView("df")
        return spark.sql("""select 
          cast(fullVisitorId as string) as fullVisitorId, 
          cast(visitStartTime as int) as visitStartTime,
          cast(channelGrouping as string) as channelGrouping,
          cast(medium as string) as medium,
          cast(source as string) as source,
          cast(city as string) as city,
          cast(deviceCategory as string) as deviceCategory,
          cast(timeOnSite as float) as timeOnSite,
          cast(timeBetweenVisits as float) as timeBetweenVisits,
          cast(pagePathLevel1 as array<string>) as pagePathLevel1,
          cast(pagePathLevel3 as array<string>) as pagePathLevel2,
          cast(pagePathLevel3 as array<string>) as pagePathLevel3,
          cast(contentGroup4 as array<string>) as contentGroup4,
          cast(visitsIn0 as double) as visitsIn0,
          cast(transactionsIn0 as double) as transactionsIn0
          from df""")
stages += [AssertDataType()]


class FillNa(Transformer):
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
      return df.fillna(0)
stages += [FillNa()] 


scalervect_assembler = [VectorAssembler(
    inputCols=[column], 
    outputCol=column + "_vect"
) for column in VAR_TYPES['X_NUM_COLS']]
stages += scalervect_assembler


scaler = [StandardScaler(
    inputCol=column + "_vect", 
    outputCol=column + "_scaled",
    withStd=True, withMean=False,
) for column in VAR_TYPES['X_NUM_COLS']]
stages += scaler


indexers = [StringIndexer(
    inputCol=column, 
    outputCol=column + "_index"
).setHandleInvalid("keep") for column in VAR_TYPES['STRING_COLS']]
stages += indexers


hashingtf = [HashingTF(
    inputCol=column, 
    outputCol=column + "_vector",
    numFeatures=VOCABSIZE_ARRAYCOLS
) for column in VAR_TYPES['ARRAY_COLS']]
stages += hashingtf


binarizer = [Binarizer(
    inputCol=column, 
    outputCol=column + "_binary",
    threshold=0
) for column in VAR_TYPES['Y_COLS']]
stages += binarizer

class GroupBySortByStructPad(Transformer):
    def __init__(self, finalcols_list: Iterable[str], padlen: int):
        self.finalcols_list = finalcols_list
        self.fcol_select = ['sortedByStartTime.'+colstr for colstr in self.finalcols_list]
        self.padlength = padlen
        super(GroupBySortByStructPad, self).__init__()
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
        pad_plus_one = ['channelGrouping_index','medium_index','deviceCategory_index',
                        'source_index', 'city_index','visitsIn0_binary',
                        'transactionsIn0_binary']
        for col in pad_plus_one:
          df = df.withColumn(col, fn.col(col)+1)  
        
        win = Window.partitionBy('fullVisitorId').orderBy('visitStartTime')
        return df.withColumn(
          "rank", fn.dense_rank().over(win)
        ).where(
          'rank <= {}'.format(self.padlength)
        ).groupBy('fullVisitorId').agg(
          fn.array_sort(
            fn.collect_list(
              fn.struct(self.finalcols_list))).alias(
          "sortedByStartTime"
        )).select(self.fcol_select)
        
stages += [GroupBySortByStructPad(VAR_TYPES['FINAL_COLS'], PAD_LEN)]


class PadVector(Transformer):
    def zero_value(self, dataType):
      if dataType == DoubleType():
        return 0.0
      elif dataType == VectorUDT():
        return Vectors.sparse(VOCABSIZE_ARRAYCOLS, [])
  
    def __init__(self, pad_cols: Iterable[str], pad_len: int):
      super(PadVector, self).__init__()
      self.pad_cols = pad_cols
      self.pad_len = pad_len
        
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
      field_datatype = {
        f.name: f.dataType for f in df.schema.fields if f.name in self.pad_cols
      }
      schema_fields = df.schema.fields
      i = df
      for c in self.pad_cols:
        padder = fn.udf(
          lambda ar: ar + [ self.zero_value(field_datatype[c].elementType) \
                           for a in range(self.pad_len - len(ar))], field_datatype[c]
        )
        i = i.withColumn(c, padder(c))
      return i
        
stages += [PadVector(pad_cols = VAR_TYPES['FINAL_COLS'], pad_len = 5)]

class MergeArrayVector(Transformer):
    
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
        def merge_array_vector(arr):
            if len(arr) == 0:
                return arr
            else:
                f = arr[0].toArray()
                for a in arr[1:]:
                    f = f + a.toArray()
                return Vectors.dense(f)
        merge_array_vector_udf = fn.udf(merge_array_vector, VectorUDT())
        return df.withColumn(
            "pagePath_vector_arr",
            fn.concat(
                fn.col("pagePathLevel1_vector"),
                fn.col("pagePathLevel2_vector"),
                fn.col("pagePathLevel3_vector"),
            )
        ).withColumn(
            "pagePath_vector",
            merge_array_vector_udf(fn.col("pagePath_vector_arr"))
        )
stages += [MergeArrayVector()]

class MergeIndexFeatures(Transformer):
    def _transform(self, df: fn.DataFrame) -> fn.DataFrame:
        array_to_vec = fn.udf(Vectors.dense, VectorUDT())
        return df.withColumn(
                "index_features",
            array_to_vec(
                fn.concat(
                    fn.col("source_index"),
                    fn.col("medium_index"),
                    fn.col("channelGrouping_index"),
                    fn.col("deviceCategory_index")
        )
            )
                )
stages += [MergeIndexFeatures()]