In [1]:
##########################################################################
# Program Name : train_model_pyspark.ipynb
# Purpose : train a model
# Kaggle Dataset Source : obertvici/indonesia-top-ecommerce-unicorn-tweets
# Location of Dataset Loaded : Linux HDFS
# Data Processsing Tools: pyspark
###########################################################################

import subprocess
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lower, regexp_replace, count
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import lit
import tensorflow as tf
import numpy as np
from keras.layers import Embedding
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle

def run_command(command):
    """Utility function to run shell commands"""
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    if process.returncode == 0:
        print(f"Success: {command}\nOutput:\n{stdout.decode()}")
    else:
        print(f"Error: {command}\nError Message:\n{stderr.decode()}")

# Restart DFS
print("Stopping DFS...")
run_command("stop-dfs.sh")

print("Starting DFS...")
run_command("start-dfs.sh")

# Check running Java services
print("Checking Java processes...")
run_command("jps")

# Check Safe Mode status
print("Checking Safe Mode status...")
run_command("hdfs dfsadmin -safemode get")

# Leave Safe Mode if it's ON
print("Leaving Safe Mode if necessary...")
run_command("hdfs dfsadmin -safemode leave")

# Create HDFS directories
hdfs_dirs = [
    "kaggle/datasets",
    "kaggle/splits"
]

for hdfs_dir in hdfs_dirs:
    print(f"Creating HDFS directory: {hdfs_dir}")
    run_command(f"hdfs dfs -mkdir -p {hdfs_dir}")

# Download dataset from Kaggle
kaggle_dataset_path = "../kaggle/datasets"
dataset_name = "indonesia-top-ecommerce-unicorn-tweets"
print("Downloading dataset from Kaggle...")
run_command(f"kaggle datasets download -d robertvici/{dataset_name} -p {kaggle_dataset_path}")

# Unzip the downloaded dataset
zip_file_path = f"{kaggle_dataset_path}/{dataset_name}.zip"
print("Unzipping dataset...")
run_command(f"unzip -o {zip_file_path} -d {kaggle_dataset_path}")

# Upload files to HDFS
print("Uploading Datasets JSON files to HDFS...")
run_command(f"hdfs dfs -put {kaggle_dataset_path}/*.json kaggle/datasets")

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("E-Commerce Engagement Prediction ML") \
    .getOrCreate()

# Load datasets from HDFS
blibli_df = spark.read.json('hdfs://localhost:9000/user/dataeng/kaggle/datasets/bliblidotcom.json')
bukalapak_df = spark.read.json('hdfs://localhost:9000/user/dataeng/kaggle/datasets/bukalapak.json')
lazadaID_df = spark.read.json('hdfs://localhost:9000/user/dataeng/kaggle/datasets/lazadaID.json')
shopeeID_df = spark.read.json('hdfs://localhost:9000/user/dataeng/kaggle/datasets/ShopeeID.json')
tokopedia_df = spark.read.json('hdfs://localhost:9000/user/dataeng/kaggle/datasets/tokopedia.json')

# Add a new column to identify the company source
blibli_df = blibli_df.withColumn('source', lit('blibli'))
bukalapak_df = bukalapak_df.withColumn('source', lit('bukalapak'))
lazadaID_df = lazadaID_df.withColumn('source', lit('lazadaID'))
shopeeID_df = shopeeID_df.withColumn('source', lit('shopeeID'))
tokopedia_df = tokopedia_df.withColumn('source', lit('tokopedia'))

# Merge datasets using union (axis=0 equivalent in Spark)
merged_df = blibli_df.union(bukalapak_df).union(lazadaID_df).union(shopeeID_df).union(tokopedia_df)

# Clean tweet text
def clean_text(text):
    return text.lower().replace("#", "").strip()

clean_text_udf = udf(clean_text, StringType())

# Apply text cleaning and create new features
data_cleaned = merged_df.withColumn("clean_tweet", clean_text_udf(col("tweet"))) \
                       .withColumn("engagement", col("replies_count") + col("retweets_count") + col("likes_count"))

# Select relevant features
selected_data = data_cleaned.select(
    col("clean_tweet").alias("text"),
    col("replies_count").alias("replies"),
    col("retweets_count").alias("retweets"),
    col("likes_count").alias("likes"),
    col("engagement").alias("target"),
    col("hashtags"),    
    col("source")
)

# Split dataset
print("Splitting dataset into train, validate, and test")
train_data, validate_data, test_data = selected_data.randomSplit([0.7, 0.15, 0.15], seed=42)

# Save splits on HDFS for later use
splits_dataset_path = "../kaggle/splits"
train_data.write.json(f"{splits_dataset_path}/train.json", mode="overwrite")
validate_data.write.json(f"{splits_dataset_path}/validate.json", mode="overwrite")
test_data.write.json(f"{splits_dataset_path}/test.json", mode="overwrite")

# Remove datasets on local file system
print("Removing local dataset files...")
run_command(f"rm -r ../kaggle")

# Change null value with 0 (if any)
merged_df = merged_df.fillna({"likes_count": 0, "replies_count": 0, "retweets_count": 0})

from pyspark.sql import functions as F

# Check negative value
merged_df.filter((F.col("likes_count") < 0) | (F.col("replies_count") < 0) | (F.col("retweets_count") < 0)).show()

# Change negative value with 0 (if any)
for col in ["likes_count", "replies_count", "retweets_count"]:
    merged_df = merged_df.withColumn(col, F.when(F.col(col) < 0, 0).otherwise(F.col(col)))
    
# Matching target engagement definitions in Spark DataFrame
blibli_df = blibli_df.withColumn("engagement", F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
bukalapak_df = bukalapak_df.withColumn("engagement", F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
lazadaID_df = lazadaID_df.withColumn("engagement",   F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
shopeeID_df = shopeeID_df.withColumn("engagement",   F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
tokopedia_df = tokopedia_df.withColumn("engagement", F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))

# Load train data (convert Spark DataFrame to Pandas)
train_df = train_data.toPandas()

# Tokenize and vectorize text (fit on original text, not the padded sequences)
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(train_df["text"])  # Fit tokenizer on the raw text data

# Convert texts to sequences
X_train = tokenizer.texts_to_sequences(train_df["text"])

# Pad the sequences to ensure uniform length
X_train = tf.keras.preprocessing.sequence.pad_sequences(X_train, padding='post')

y_train = np.array(train_df["target"])


# Define a simple Neural Network model
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=5000, output_dim=64),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(128, activation="relu"),
    tf.keras.layers.Dense(1, activation="linear")
])

# Compile the model
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

vocab_size = 42500  # Customize with your tokenizer
embedding_dim = 128
embedding_layer = Embedding(input_dim=vocab_size, output_dim=embedding_dim)

# Example: Tokenize and pad the input text
max_vocab_size = 5000
tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<UNK>")
tokenizer.fit_on_texts(train_df["text"])  # 'train_df["text"]' should be a list of strings

# Example of saving the tokenizer
with open('../models/tokenizer.pkl', 'wb') as file:
    pickle.dump(tokenizer, file)

X_train_sequences = tokenizer.texts_to_sequences(train_df["text"])
X_train = pad_sequences(X_train_sequences, padding='post')

# Ensure y_train is in the correct format (e.g., a numpy array)
y_train = np.array(train_df["target"])  # Adjust this based on your target column

# Train the model 
model.fit(X_train, y_train, epochs=10, batch_size=32)

# Save the model with a valid file extension in local server
model.save("../models/e-commerce-engagement-model.keras")  # For the native Keras format

# Export to save model
model.export("../saved_model/1")
print("Final Model ==> saved_model/1")

# Stop the Spark session
spark.stop()

print("")
print("All tasks completed successfully!")

2025-01-19 16:53:59.915741: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-01-19 16:53:59.920287: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-19 16:53:59.933848: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-19 16:53:59.977163: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1737280440.032150   70770 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1737280440.04

Stopping DFS...
Success: stop-dfs.sh
Output:
Stopping namenodes on [0.0.0.0]
Stopping datanodes
Stopping secondary namenodes [dataeng-virtual-machine]

Starting DFS...
Success: start-dfs.sh
Output:
Starting namenodes on [0.0.0.0]
Starting datanodes
Starting secondary namenodes [dataeng-virtual-machine]

Checking Java processes...
Success: jps
Output:
71409 NameNode
71541 DataNode
28055 SparkSubmit
71978 Jps
40620 SparkSubmit
71756 SecondaryNameNode
43775 SparkSubmit

Checking Safe Mode status...
Success: hdfs dfsadmin -safemode get
Output:
Safe mode is ON

Leaving Safe Mode if necessary...
Success: hdfs dfsadmin -safemode leave
Output:
Safe mode is OFF

Creating HDFS directory: kaggle/datasets
Success: hdfs dfs -mkdir -p kaggle/datasets
Output:

Creating HDFS directory: kaggle/splits
Success: hdfs dfs -mkdir -p kaggle/splits
Output:

Downloading dataset from Kaggle...
Success: kaggle datasets download -d robertvici/indonesia-top-ecommerce-unicorn-tweets -p ../kaggle/datasets
Output:
Da

25/01/19 16:55:26 WARN Utils: Your hostname, dataeng-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.241.136 instead (on interface ens33)
25/01/19 16:55:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/19 16:55:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/19 16:55:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/01/19 16:55:29 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/01/19 16:55:29 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/01/19 16:55:43 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.yo

Splitting dataset into train, validate, and test


25/01/19 16:56:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+-------------+--------+-------+------------+----------+--------------+------+----+--------+----------+---------+---------+-----+----+-------+-------+----------+--------+-----+
|cashtags|conversation_id|created_at|date|geo|hashtags| id|likes_count|link|mentions|name|near|photos|place|quote_url|replies_count|reply_to|retweet|retweet_date|retweet_id|retweets_count|source|time|timezone|trans_dest|trans_src|translate|tweet|urls|user_id|user_rt|user_rt_id|username|video|
+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+-------------+--------+-------+------------+----------+--------------+------+----+--------+----------+---------+---------+-----+----+-------+-------+----------+--------+-----+
+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+

2025-01-19 16:57:24.118847: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


Epoch 1/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m173s[0m 14ms/step - loss: 860127.8125 - mae: 29.6059
Epoch 2/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 14ms/step - loss: 818085.5000 - mae: 32.1735
Epoch 3/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m174s[0m 15ms/step - loss: 720687.6250 - mae: 27.6550
Epoch 4/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m170s[0m 14ms/step - loss: 688658.5625 - mae: 26.6733
Epoch 5/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 14ms/step - loss: 553800.1875 - mae: 25.7240
Epoch 6/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 14ms/step - loss: 456790.7500 - mae: 23.3270
Epoch 7/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 14ms/step - loss: 561675.6250 - mae: 24.1938
Epoch 8/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 14ms/step - loss: 653

INFO:tensorflow:Assets written to: ../saved_model/1/assets


Saved artifact at '../saved_model/1'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 60), dtype=tf.float32, name='keras_tensor')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  130643582587280: TensorSpec(shape=(), dtype=tf.resource, name=None)
  130643582587472: TensorSpec(shape=(), dtype=tf.resource, name=None)
  130643582588624: TensorSpec(shape=(), dtype=tf.resource, name=None)
  130643582586128: TensorSpec(shape=(), dtype=tf.resource, name=None)
  130643582589968: TensorSpec(shape=(), dtype=tf.resource, name=None)
Final Model ==> saved_model/1

All tasks completed successfully!
