In [1]:
from pyspark.sql.functions import regexp_replace, col, udf
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
import plotly.graph_objects as go
import plotly
import datetime


In [2]:

def timestamp_transform(x):
  return datetime.datetime.fromtimestamp(x).hour
#spark.udf.register("timestamp_transform", timestamp_transform)
format_timestamp_udf = udf(lambda x: timestamp_transform(x))

In [3]:
def cat(cat_list, df):
  # The index of string vlaues multiple columns
  print("categorized varibales function begin")
  indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in cat_list
  ]
  
  # The encode of indexed vlaues multiple columns
  encoders = [OneHotEncoder(dropLast=True,inputCol=indexer.getOutputCol(),
              outputCol="{0}_encoded".format(indexer.getOutputCol())) 
      for indexer in indexers
  ]

  # Vectorizing encoded values
  #assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="cat_features")
  cols =[encoder.getOutputCol() for encoder in encoders]
  pipeline = Pipeline(stages=indexers+ encoders)
  model=pipeline.fit(df)
  transformed_cat = model.transform(df).select(cols)
  
  
  return transformed_cat

In [4]:
def text_processing(df):
  ''' Regex Tokenizer removes the punctuation and tokenizes the text
  StopWordsRemover to remove stopwords --default list of stopwords from english library
  HashingTF counts the word frequency but with consums lesser memory as it hashes the frequency
  Word2Vec produces word embedding'''
  tok = RegexTokenizer(inputCol="title", outputCol="words", pattern="\\W") 
  remover = StopWordsRemover(inputCol="words", outputCol="filtered")
  #tok = Tokenizer(inputCol="title", outputCol="words") 
  htf = HashingTF(inputCol="filtered", outputCol="tf", numFeatures=200) 
  w2v = Word2Vec(inputCol="filtered", outputCol="w2v")
  
  pipeline = Pipeline(stages=[tok, remover,htf, w2v]) 
  # Fit the pipeline 
  model = pipeline.fit(df)  
  
  #choosing one feature out of HTF and word2vec
  transform_text = model.transform(df).select("w2v")
  print("text transform done")
  return transform_text

In [5]:
spark.conf.set( "spark.sql.crossJoin.enabled" , "true" )

In [6]:
def features(df, cat_cols, int_cols):
  #changing created_utc to hours of the day
  df = df.withColumn("created_utc", col("created_utc").cast("integer"))
  
  #calling timestamp_transform function
  df = df.withColumn('hour_of_day', format_timestamp_udf(df['created_utc']))
  df = df.drop('created_utc')
  
  #now hours of the day can be one-hot encoded as a feature, appending it in the list of categorical variables
  
  cat_cols.append("hour_of_day")
  
  
  #encoding cateorical columns
  transformed_cat = cat(cat_cols, df)
  
  #transforming text to word embeddings
  transfomed_text = text_processing(df)
  
  #integer features
  integer_features = df.select(int_cols)
  
  label = df.select('score')
  
  #combining all the variables in a dataframe
  output_df = transformed_cat.join(transfomed_text,how = "outer")
  
  output_df1 = output_df.join(integer_features,how =  "outer")
  
  input_cols = output_df1.columns
  #scaling using StandardScaler and combining using vector assembler as an output column - feature
  print("VA begins")
  
  va = VectorAssembler(inputCols = input_cols , outputCol = "features")
  output_df2 = va.transform(output_df1)
  print("VA done")
  scaler = StandardScaler(inputCol="features", outputCol="scaledFeature",
                        withStd=True, withMean=True)
  model = scaler.fit(output_df)
  output_df3 = model.transform(output_df2).select("scaledFeatures")
  print("Scaling Done")
  '''
  pipeline = Pipeline(stages =va+scaler)
  model = pipeline.fit(output_df)
  output_df = model.transform(output_df).select("scaledFeatures")'''
  print("VA done")
  output_df4 = output_df3.join(label)
  
  return output_df4