In [1]:
from pyspark import SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, StructField, BooleanType, TimestampType, FloatType
from pyspark.sql import functions as F

import numpy as np

import tensorflow as tf
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D
from tensorflow.keras import backend as K

from sparknlp.annotator import * 
from sparknlp.common import * 
from sparknlp.base import * 

In [2]:
# Define schema 
sarcasm_schema = StructType([StructField("article_link", StringType(), True), StructField("headline", StringType(), True), StructField("is_sarcastic", IntegerType(), True)])


In [3]:
# Import labeled sarcasm dataset from uploaded json file
sarcasm_df = spark.read.format("json").load("/FileStore/tables/Sarcasm_Headlines_Dataset.json", schema=sarcasm_schema)

In [4]:
sarcasm_df.show()

In [5]:
# Create doc, sentence, and token pipeline stages using headline as input
docs = DocumentAssembler().setInputCol("headline").setOutputCol("doc")
sentences = SentenceDetector().setInputCols(["doc"]).setOutputCol("sentence")
tokens = Tokenizer().setInputCols(["sentence"]).setOutputCol("token")

In [6]:
# Create the pipeline
sar_pipeline = Pipeline(stages=[docs, sentences, tokens])

In [7]:
# Fit the dataframe to the pipeline, transform and persist
sar_model = sar_pipeline.fit(sarcasm_df)
processed = sar_model.transform(sarcasm_df).persist()
processed.count()

In [8]:
# Build test and trainging set split
train, test = processed.randomSplit(weights=[0.75, 0.25], seed=25)
print(train.count())
print(test.count())

In [9]:
# Use pretrained word embeddings model and transform against both train and test features
# May need to dlownload glove_100d file locally on first run
glove = WordEmbeddingsModel.pretrained()
train_features = glove.transform(train)
test_features = glove.transform(test)

In [10]:
# Convert train/test sets to pandas df to convert into numpy array for model training
x_train_df = train_features.select(['embeddings']).toPandas()
y_train_df = train_features.select(['is_sarcastic']).toPandas()
x_test_df = test_features.select(['embeddings']).toPandas()
y_test_df = test_features.select(['is_sarcastic']).toPandas()

In [11]:
# Create UDF to append embeddings values to result array and return as numpy array
def get_features(row):
  result = []
  for col in row:
    result.append(col['embeddings'])
  return np.array(result)

In [12]:
# Call get_feature UDF
x_train_df['features'] = x_train_df['embeddings'].apply(get_features)
x_test_df['features'] = x_test_df['embeddings'].apply(get_features)

In [13]:
# Extract numpy arrays for features and is_sarcastic values for use as inputs into model
x_train = x_train_df['features'].values
x_test = x_test_df['features'].values
y_train = y_train_df['is_sarcastic'].values
y_test = y_test_df['is_sarcastic'].values

In [14]:
# set parameters for our model:
maxlen = 150 #max 50 words per article
batch_size = 48 #size of the batch 
filters = 100 #dimension of filters for the convolutional layer
kernel_size = 3 #size of the kernel used in the convolutional layer
hidden_dims = 250 #dimension of the hidden layer
epochs = 10 #number of training epochs

In [15]:
# Pad train and test features to same size (max length) and cast resulting arrays to float32 (used for keras support)
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)

x_train = x_train.astype(np.float32)
x_test = x_test.astype(np.float32)

In [16]:
# Create base model
model = Sequential()

In [17]:
# Add a Conv1D as first layer and
# word group filters of size filter_length:
model.add(Conv1D(filters,
                 kernel_size,
                 padding='valid',
                 activation='relu',
                 strides=1))
# we use max pooling:
model.add(GlobalMaxPooling1D())

# Add hidden layers
model.add(Dense(hidden_dims))
model.add(Dropout(0.2))
model.add(Activation('relu'))

# We project onto a single unit output layer, and squash it with a sigmoid:
model.add(Dense(1))
model.add(Activation('sigmoid'))


In [18]:
model.compile(loss='binary_crossentropy',
              optimizer='adam',
              metrics=['accuracy','mae', 'mse'])

model.fit(x_train, y_train,
          batch_size=batch_size,
          epochs=epochs,
          validation_data=(x_test, y_test))

In [19]:
%sh
rm -rf /tmp/sarcasm
mkdir /tmp/sarcasm

In [20]:

path = '/FileStore/sarcasm.h5'
model.save(path) 



In [21]:


dbutils.fs.cp("file:/tmp/sarcasm.h5", "dbfs:/FileStore/sarcasm.h5")
display(dbutils.fs.ls("dbfs:/FileStore/sarcasm.h5"))

path,name,size
dbfs:/FileStore/sarcasm.h5,sarcasm.h5,703448


In [22]:
https://community.cloud.databricks.com/files/tmp/sarcasm.h5?o=6852060676645499