# Machine Learning Pipeline - Sarcasm Detection Pipeline V1 (Base)

## Import Required Library

In [1]:
import tensorflow as tf
import tensorflow_transform as tft

import json

import pandas as pd
import zipfile as zf
import os

from tfx.components import CsvExampleGen, StatisticsGen, SchemaGen, ExampleValidator, Transform, Trainer, Tuner
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import LatestBlessedModelStrategy 
from tfx.dsl.components.common.resolver import Resolver 


from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.types import Channel 

from tfx.components import Tuner
from tfx.components import Evaluator

from tfx.proto import trainer_pb2
from tfx.proto import example_gen_pb2

## Data Configuration

### Download Dataset

In [2]:
!kaggle datasets download -d rmisra/news-headlines-dataset-for-sarcasm-detection

!mkdir raw

Dataset URL: https://www.kaggle.com/datasets/rmisra/news-headlines-dataset-for-sarcasm-detection
License(s): Attribution 4.0 International (CC BY 4.0)
Downloading news-headlines-dataset-for-sarcasm-detection.zip to c:\Users\Rahfi\MLOps-Pipeline-Projects\SarcasmDetectionV1-Pipeline




  0%|          | 0.00/3.30M [00:00<?, ?B/s]
 30%|███       | 1.00M/3.30M [00:00<00:01, 1.23MB/s]
 61%|██████    | 2.00M/3.30M [00:01<00:00, 1.92MB/s]
 91%|█████████ | 3.00M/3.30M [00:01<00:00, 2.04MB/s]
100%|██████████| 3.30M/3.30M [00:01<00:00, 1.99MB/s]
A subdirectory or file raw already exists.


In [3]:
!move news-headlines-dataset-for-sarcasm-detection.zip raw/

        1 file(s) moved.


### Extract File

In [4]:
files = "raw/news-headlines-dataset-for-sarcasm-detection.zip"
zip = zf.ZipFile(files, 'r')
zip.extractall('raw/')
zip.close()

### Data Converting

In [5]:
import json
import pandas as pd

!mkdir data
# Path to the JSON files
file_paths = [r"raw\Sarcasm_Headlines_Dataset_v2.json", r"raw\Sarcasm_Headlines_Dataset_v2.json"]
# Initialize a list to store the data
data_list = []
# Read and process each line as a separate JSON object
for file_path in file_paths:
    with open(file_path, 'r') as file:
        for line in file:
            data_list.append(json.loads(line))

# Normalize JSON data
df = pd.json_normalize(data_list)

A subdirectory or file data already exists.


### Export Data

In [6]:
df.to_csv("raw/data.csv", index=False)

### Data Loading

In [7]:
dataset = pd.read_csv("raw/data.csv")

In [8]:
dataset.head()

Unnamed: 0,is_sarcastic,headline,article_link
0,1,thirtysomething scientists unveil doomsday clo...,https://www.theonion.com/thirtysomething-scien...
1,0,dem rep. totally nails why congress is falling...,https://www.huffingtonpost.com/entry/donna-edw...
2,0,eat your veggies: 9 deliciously different recipes,https://www.huffingtonpost.com/entry/eat-your-...
3,1,inclement weather prevents liar from getting t...,https://local.theonion.com/inclement-weather-p...
4,1,mother comes pretty close to using word 'strea...,https://www.theonion.com/mother-comes-pretty-c...


### Data Assesing

In [9]:
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 57238 entries, 0 to 57237
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   is_sarcastic  57238 non-null  int64 
 1   headline      57238 non-null  object
 2   article_link  57238 non-null  object
dtypes: int64(1), object(2)
memory usage: 1.3+ MB


In [10]:
def data_assesing(data):
    print(f"Total NaN/Null Data per Column:\n{data.isna().sum().sort_values(ascending=False)}\n")
    print(f"Data Shape:\n{data.shape}")
    print(f"\nTotal Duplicated Data: {data.duplicated().sum()}")

data_assesing(dataset)

Total NaN/Null Data per Column:
is_sarcastic    0
headline        0
article_link    0
dtype: int64

Data Shape:
(57238, 3)

Total Duplicated Data: 28621


### Data Cleaning

In [11]:
dataset = pd.DataFrame(dataset.drop(columns=["article_link"]))

In [12]:
data_assesing(dataset)

Total NaN/Null Data per Column:
is_sarcastic    0
headline        0
dtype: int64

Data Shape:
(57238, 2)

Total Duplicated Data: 28735


In [13]:
dataset.head(len(dataset))

Unnamed: 0,is_sarcastic,headline
0,1,thirtysomething scientists unveil doomsday clo...
1,0,dem rep. totally nails why congress is falling...
2,0,eat your veggies: 9 deliciously different recipes
3,1,inclement weather prevents liar from getting t...
4,1,mother comes pretty close to using word 'strea...
...,...,...
57233,1,jews to celebrate rosh hashasha or something
57234,1,internal affairs investigator disappointed con...
57235,0,the most beautiful acceptance speech this week...
57236,1,mars probe destroyed by orbiting spielberg-gat...


### Export Data Part 2

In [15]:
!mkdir data
dataset.to_csv("data/data.csv", index=False)

A subdirectory or file data already exists.


## Set Variables

In [None]:
PIPELINE_NAME = "sarcasm-pipeline"
SCHEMA_PIPELINE_NAME = "sarcasm-tfdv-schema"

PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

# from absl import logging
# logging.set_verbosity(logging.INFO)

In [None]:
DATA_ROOT = "data"

In [None]:
interactive_context = InteractiveContext(pipeline_root=PIPELINE_ROOT)

## Data Ingestion

In [None]:
output = example_gen_pb2.Output(
    split_config = example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name="train", hash_buckets=8),
        example_gen_pb2.SplitConfig.Split(name="eval", hash_buckets=2)
    ])
)
example_gen = CsvExampleGen(input_base=DATA_ROOT, output_config=output)

In [None]:
interactive_context.run(example_gen)

## Data Validation

### Create Statistic Summary

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs["examples"]
)
interactive_context.run(statistics_gen)

In [None]:
interactive_context.show(statistics_gen.outputs["statistics"])

### Create Data Schema

In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs["statistics"]
)
interactive_context.run(schema_gen)

In [None]:
interactive_context.show(schema_gen.outputs["schema"])

### Checking Anomalies in Dataset

In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema']
)
interactive_context.run(example_validator)

In [None]:
interactive_context.show(example_validator.outputs['anomalies'])

## Data Preprocessing

In [None]:
TRANSFORM_MODULE_FILE = "transform.py"

In [None]:
%%writefile {TRANSFORM_MODULE_FILE}

import tensorflow as tf

LABEL_KEY = "is_sarcastic"
FEATURE_KEY = "headline"

def transformed_name(key):
    """Renaming transformed features"""
    return key + "_xf"
def preprocessing_fn(inputs):
    """
    Preprocess input features into transformed features
    
    Args:
        inputs: map from feature keys to raw features.
    
    Return:
        outputs: map from feature keys to transformed features.    
    """
    
    outputs = {}
    
    outputs[transformed_name(FEATURE_KEY)] = tf.strings.lower(inputs[FEATURE_KEY])
    
    outputs[transformed_name(LABEL_KEY)] = tf.cast(inputs[LABEL_KEY], tf.int64)
    
    return outputs

In [None]:
transform  = Transform(
    examples=example_gen.outputs['examples'],
    schema= schema_gen.outputs['schema'],
    module_file=os.path.abspath(TRANSFORM_MODULE_FILE)
)
interactive_context.run(transform)

## Model Development

In [None]:
TRAINER_MODULE_FILE = "trainer.py"

In [None]:
%%writefile {TRAINER_MODULE_FILE}
import tensorflow as tf
import tensorflow_transform as tft 
from tensorflow.keras import layers
import os  
import tensorflow_hub as hub
from tfx.components.trainer.fn_args_utils import FnArgs
 
LABEL_KEY = "is_sarcastic"
FEATURE_KEY = "headline"
 
def transformed_name(key):
    """Renaming transformed features"""
    return key + "_xf"
 
def gzip_reader_fn(filenames):
    """Loads compressed data"""
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')
 
 
def input_fn(file_pattern, 
             tf_transform_output,
             num_epochs,
             batch_size=64)->tf.data.Dataset:
    """Get post_tranform feature & create batches of data"""
    
    # Get post_transform feature spec
    transform_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy())
    
    # create batches of data
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transform_feature_spec,
        reader=gzip_reader_fn,
        num_epochs=num_epochs,
        label_key = transformed_name(LABEL_KEY))
    return dataset
 
# os.environ['TFHUB_CACHE_DIR'] = '/hub_chace'
# embed = hub.KerasLayer("https://tfhub.dev/google/universal-sentence-encoder/4")
 
# Vocabulary size and number of words in a sequence.
VOCAB_SIZE = 10000
SEQUENCE_LENGTH = 100
 
vectorize_layer = layers.TextVectorization(
    standardize="lower_and_strip_punctuation",
    max_tokens=VOCAB_SIZE,
    output_mode='int',
    output_sequence_length=SEQUENCE_LENGTH)
 
 
embedding_dim=16
def model_builder():
    """Build machine learning model"""
    inputs = tf.keras.Input(shape=(1,), name=transformed_name(FEATURE_KEY), dtype=tf.string)
    reshaped_narrative = tf.reshape(inputs, [-1])
    x = vectorize_layer(reshaped_narrative)
    x = layers.Embedding(VOCAB_SIZE, embedding_dim, name="embedding")(x)
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(32, activation="relu")(x)
    outputs = layers.Dense(1, activation='sigmoid')(x)
    
    
    model = tf.keras.Model(inputs=inputs, outputs = outputs)
    
    model.compile(
        loss = 'binary_crossentropy',
        optimizer=tf.keras.optimizers.Adam(0.01),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    
    )
    
    # print(model)
    model.summary()
    return model 
 
 
def _get_serve_tf_examples_fn(model, tf_transform_output):
    
    model.tft_layer = tf_transform_output.transform_features_layer()
    
    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        
        feature_spec = tf_transform_output.raw_feature_spec()
        
        feature_spec.pop(LABEL_KEY)
        
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        
        transformed_features = model.tft_layer(parsed_features)
        
        # get predictions using the transformed features
        return model(transformed_features)
        
    return serve_tf_examples_fn
    
def run_fn(fn_args: FnArgs) -> None:
    
    log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
    
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir = log_dir, update_freq='batch'
    )
    
    es = tf.keras.callbacks.EarlyStopping(monitor='val_binary_accuracy', mode='max', verbose=1, patience=10)
    mc = tf.keras.callbacks.ModelCheckpoint(fn_args.serving_model_dir, monitor='val_binary_accuracy', mode='max', verbose=1, save_best_only=True)
    
    
    # Load the transform output
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    
    # Create batches of data
    train_set = input_fn(fn_args.train_files, tf_transform_output, 10)
    val_set = input_fn(fn_args.eval_files, tf_transform_output, 10)
    vectorize_layer.adapt(
        [j[0].numpy()[0] for j in [
            i[0][transformed_name(FEATURE_KEY)]
                for i in list(train_set)]])
    
    # Build the model
    model = model_builder()
    
    
    # Train the model
    model.fit(x = train_set,
            validation_data = val_set,
            callbacks = [tensorboard_callback, es, mc],
            steps_per_epoch = 1000, 
            validation_steps= 1000,
            epochs=10)
    signatures = {
        'serving_default':
        _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
                                    tf.TensorSpec(
                                    shape=[None],
                                    dtype=tf.string,
                                    name='examples'))
    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)