In [1]:
# Stop yarn and dfs
!stop-yarn.sh
!stop-dfs.sh

Stopping nodemanagers
Stopping resourcemanager
Stopping namenodes on [0.0.0.0]
Stopping datanodes
Stopping secondary namenodes [worker01]


In [2]:
# Restart dfs and yarn
!start-dfs.sh
!start-yarn.sh

Starting namenodes on [0.0.0.0]
Starting datanodes
Starting secondary namenodes [worker01]
Starting resourcemanager
Starting nodemanagers


In [3]:
# Check java services running
!jps

43666 NameNode
44006 SecondaryNameNode
44343 NodeManager
43817 DataNode
44687 Jps


In [4]:
# Check Safe mode: should be OFF
!hdfs dfsadmin -safemode get


Safe mode is ON


In [5]:
# If Safe mode is ON --> execute hdfs dfsadmin -safemode leave
!hdfs dfsadmin -safemode leave


Safe mode is OFF


In [6]:
# Check hdfs home files
!hdfs dfs -ls

Found 7 items
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:31 e-commerce
drwxr-xr-x   - hduser supergroup          0 2024-10-10 21:11 output
-rw-r--r--   3 hduser supergroup      12331 2024-10-26 10:57 taxi_zone_lookup.csv
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:44 test.json
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:44 train.json
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:44 validate.json
drwxr-xr-x   - hduser supergroup          0 2024-11-05 09:20 warehouse


In [7]:
# Create hdfs directories
!hdfs dfs -mkdir -p \
    e-commerce/datasets \
    e-commerce/splits \
    e-commerce/models \
    e-commerce/outputs

In [8]:
!hdfs dfs -ls e-commerce/

Found 4 items
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:44 e-commerce/datasets
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:27 e-commerce/models
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:29 e-commerce/outputs
drwxr-xr-x   - hduser supergroup          0 2024-12-19 08:27 e-commerce/splits


In [9]:
# Download kaggle dataset into hdfs

!kaggle datasets download -d robertvici/indonesia-top-ecommerce-unicorn-tweets -p ~/kaggle-datasets
!unzip -o ~/kaggle-datasets/indonesia-top-ecommerce-unicorn-tweets.zip -d ~/kaggle-datasets
!hdfs dfs -put ~/kaggle-datasets/*.json e-commerce/datasets/


Dataset URL: https://www.kaggle.com/datasets/robertvici/indonesia-top-ecommerce-unicorn-tweets
License(s): copyright-authors
indonesia-top-ecommerce-unicorn-tweets.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  /home/hduser/kaggle-datasets/indonesia-top-ecommerce-unicorn-tweets.zip
  inflating: /home/hduser/kaggle-datasets/ShopeeID.json  
  inflating: /home/hduser/kaggle-datasets/bliblidotcom.json  
  inflating: /home/hduser/kaggle-datasets/bukalapak.json  
  inflating: /home/hduser/kaggle-datasets/lazadaID.json  
  inflating: /home/hduser/kaggle-datasets/tokopedia.json  
put: `e-commerce/datasets/bliblidotcom.json': File exists
put: `e-commerce/datasets/bukalapak.json': File exists
put: `e-commerce/datasets/lazadaID.json': File exists
put: `e-commerce/datasets/ShopeeID.json': File exists
put: `e-commerce/datasets/tokopedia.json': File exists


In [10]:
# Show list of dataset files downloaded
!hdfs dfs -ls /user/hduser/ecommerce/datasets


ls: `/user/hduser/ecommerce/datasets': No such file or directory


In [11]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lower, regexp_replace, count
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import lit
import tensorflow as tf
import numpy as np
from keras.layers import Embedding
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle


2024-12-19 08:55:24.925342: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-19 08:55:24.939970: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1734573324.956688   42933 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1734573324.961771   42933 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-19 08:55:24.979109: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [12]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("E-Commerce Engagement Prediction ML") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/19 08:55:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [13]:
# Initialize Spark session
spark = SparkSession.builder.appName("eCommerce Data").getOrCreate()

# Load datasets
blibli_df = spark.read.json('e-commerce/datasets/bliblidotcom.json')
bukalapak_df = spark.read.json('e-commerce/datasets/bukalapak.json')
lazadaID_df = spark.read.json('e-commerce/datasets/lazadaID.json')
shopeeID_df = spark.read.json('e-commerce/datasets/ShopeeID.json')
tokopedia_df = spark.read.json('e-commerce/datasets/tokopedia.json')

# Add a new column to identify the company source
blibli_df = blibli_df.withColumn('source', lit('bukalapak'))
bukalapak_df = bukalapak_df.withColumn('source', lit('bukalapak'))
lazadaID_df = lazadaID_df.withColumn('source', lit('bukalapak'))
shopeeID_df = shopeeID_df.withColumn('source', lit('shopee'))
tokopedia_df = tokopedia_df.withColumn('source', lit('tokopedia'))

# Merge datasets using union (axis=0 equivalent in Spark)
merged_df = blibli_df.union(bukalapak_df).union(lazadaID_df).union(shopeeID_df).union(tokopedia_df)

# Show merged data
merged_df.show(10)


24/12/19 08:55:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/12/19 08:55:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+-------------------+-------------+----------+---+--------------------+-------------------+-----------+--------------------+-----------------+----------+----+--------------------+-----+---------+-------------+--------------------+-------+------------+----------+--------------+---------+--------+--------+----------+---------+---------+--------------------+--------------------+---------+-------+----------+------------+-----+
|cashtags|    conversation_id|   created_at|      date|geo|            hashtags|                 id|likes_count|                link|         mentions|      name|near|              photos|place|quote_url|replies_count|            reply_to|retweet|retweet_date|retweet_id|retweets_count|   source|    time|timezone|trans_dest|trans_src|translate|               tweet|                urls|  user_id|user_rt|user_rt_id|    username|video|
+--------+-------------------+-------------+----------+---+--------------------+-------------------+-----------+----------------

In [14]:
# DataFrame structure
print("Schema of the merged DataFrame:")
merged_df.printSchema()


Schema of the merged DataFrame:
root
 |-- cashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: long (nullable = true)
 |-- date: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- likes_count: long (nullable = true)
 |-- link: string (nullable = true)
 |-- mentions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- name: string (nullable = true)
 |-- near: string (nullable = true)
 |-- photos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- place: string (nullable = true)
 |-- quote_url: string (nullable = true)
 |-- replies_count: long (nullable = true)
 |-- reply_to: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- user_id: string (nullable = true)
 |    |    |-- userna

In [15]:
# Number of rows
row_count = merged_df.count()
print(f"Number of rows in the merged DataFrame: {row_count}")


Number of rows in the merged DataFrame: 541180


                                                                                

In [16]:
# Clean tweet text
def clean_text(text):
    return text.lower().replace("#", "").strip()

clean_text_udf = udf(clean_text, StringType())

# Apply text cleaning and create new features
data_cleaned = merged_df.withColumn("clean_tweet", clean_text_udf(col("tweet"))) \
                       .withColumn("engagement", col("replies_count") + col("retweets_count") + col("likes_count"))

# Select relevant features
selected_data = data_cleaned.select(
    col("clean_tweet").alias("text"),
    col("replies_count").alias("replies"),
    col("retweets_count").alias("retweets"),
    col("likes_count").alias("likes"),
    col("engagement").alias("target"),
    col("hashtags"),    
    col("source")
)

# Show processed data
selected_data.show()

+--------------------+-------+--------+-----+------+--------------------+---------+
|                text|replies|retweets|likes|target|            hashtags|   source|
+--------------------+-------+--------+-----+------+--------------------+---------+
|sudah kepikiran m...|      0|       0|    1|     1| [#ramadanlebihbaik]|bukalapak|
|belanja pake kart...|      0|       0|    0|     0| [#ramadanlebihbaik]|bukalapak|
|udah seminggu ram...|      0|       0|    1|     1|[#ramadan, #ramad...|bukalapak|
|catet tanggal mai...|      0|       1|    1|     2|                  []|bukalapak|
|yuk! gabung di ng...|      1|       1|    1|     3|    [#ngobrolbareng]|bukalapak|
|udah bosen banget...|      1|       2|    1|     4|                  []|bukalapak|
|long weekend! bak...|      1|       1|    3|     5|       [#dirumahaja]|bukalapak|
|selamat hari buru...|      0|       0|    3|     3|       [#mayday2020]|bukalapak|
|cek di sini, ya h...|      0|       0|    0|     0|                  []|buk

In [17]:
# Split dataset
train_data, validate_data, test_data = selected_data.randomSplit([0.7, 0.15, 0.15], seed=42)

# Save splits for later use
train_data.write.json("e-commerce/splits/train.json", mode="overwrite")
validate_data.write.json("commerce/splits/validate.json", mode="overwrite")
test_data.write.json("commerce/splits/test.json", mode="overwrite")


                                                                                

In [18]:
# check null values

merged_df.filter(
    (merged_df.likes_count.isNull()) | 
    (merged_df.replies_count.isNull()) | 
    (merged_df.retweets_count.isNull())
).show()


+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+-------------+--------+-------+------------+----------+--------------+------+----+--------+----------+---------+---------+-----+----+-------+-------+----------+--------+-----+
|cashtags|conversation_id|created_at|date|geo|hashtags| id|likes_count|link|mentions|name|near|photos|place|quote_url|replies_count|reply_to|retweet|retweet_date|retweet_id|retweets_count|source|time|timezone|trans_dest|trans_src|translate|tweet|urls|user_id|user_rt|user_rt_id|username|video|
+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+-------------+--------+-------+------------+----------+--------------+------+----+--------+----------+---------+---------+-----+----+-------+-------+----------+--------+-----+
+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+

In [19]:
# Change null value with 0 (if any)
merged_df = merged_df.fillna({"likes_count": 0, "replies_count": 0, "retweets_count": 0})


In [20]:
from pyspark.sql import functions as F

# Check negative value
merged_df.filter((F.col("likes_count") < 0) | (F.col("replies_count") < 0) | (F.col("retweets_count") < 0)).show()


24/12/19 08:55:45 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+-------------+--------+-------+------------+----------+--------------+------+----+--------+----------+---------+---------+-----+----+-------+-------+----------+--------+-----+
|cashtags|conversation_id|created_at|date|geo|hashtags| id|likes_count|link|mentions|name|near|photos|place|quote_url|replies_count|reply_to|retweet|retweet_date|retweet_id|retweets_count|source|time|timezone|trans_dest|trans_src|translate|tweet|urls|user_id|user_rt|user_rt_id|username|video|
+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+-------------+--------+-------+------------+----------+--------------+------+----+--------+----------+---------+---------+-----+----+-------+-------+----------+--------+-----+
+--------+---------------+----------+----+---+--------+---+-----------+----+--------+----+----+------+-----+---------+

In [21]:
# Change negative value with 0 (if any)
for col in ["likes_count", "replies_count", "retweets_count"]:
    merged_df = merged_df.withColumn(col, F.when(F.col(col) < 0, 0).otherwise(F.col(col)))
    

In [22]:
# Matching target engagement definitions in Spark DataFrame
blibli_df = blibli_df.withColumn("engagement", F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
bukalapak_df = bukalapak_df.withColumn("engagement", F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
lazadaID_df = lazadaID_df.withColumn("engagement",   F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
shopeeID_df = shopeeID_df.withColumn("engagement",   F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))
tokopedia_df = tokopedia_df.withColumn("engagement", F.col("likes_count") + F.col("replies_count") + F.col("retweets_count"))


In [23]:
# import pandas as pd

# Load train data (convert Spark DataFrame to Pandas)
train_df = train_data.toPandas()

# Tokenize and vectorize text (fit on original text, not the padded sequences)
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(train_df["text"])  # Fit tokenizer on the raw text data

# Convert texts to sequences
X_train = tokenizer.texts_to_sequences(train_df["text"])

# Pad the sequences to ensure uniform length
X_train = tf.keras.preprocessing.sequence.pad_sequences(X_train, padding='post')

y_train = np.array(train_df["target"])


                                                                                

In [24]:
# Define a simple Neural Network model
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=5000, output_dim=64),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(128, activation="relu"),
    tf.keras.layers.Dense(1, activation="linear")
])

In [25]:
# Compile the model
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

2024-12-19 08:55:57.327175: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


In [26]:
vocab_size = 42500  # Customize with your tokenizer
embedding_dim = 128

embedding_layer = Embedding(input_dim=vocab_size, output_dim=embedding_dim)


In [27]:
# Example: Tokenize and pad the input text
max_vocab_size = 5000
tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<UNK>")
tokenizer.fit_on_texts(train_df["text"])  # 'train_df["text"]' should be a list of strings

# Example of saving the tokenizer
with open('tokenizer.pkl', 'wb') as file:
    pickle.dump(tokenizer, file)

X_train_sequences = tokenizer.texts_to_sequences(train_df["text"])
X_train = pad_sequences(X_train_sequences, padding='post')

# Ensure y_train is in the correct format (e.g., a numpy array)
y_train = np.array(train_df["target"])  # Adjust this based on your target column

# Train the model !!!
model.fit(X_train, y_train, epochs=10, batch_size=32)


Epoch 1/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 2ms/step - loss: 814507.5000 - mae: 30.1946
Epoch 2/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 1376708.3750 - mae: 35.0661
Epoch 3/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 818767.5000 - mae: 27.2493
Epoch 4/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 2ms/step - loss: 502845.1875 - mae: 24.4029
Epoch 5/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 2ms/step - loss: 515621.8750 - mae: 25.4223
Epoch 6/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 2ms/step - loss: 769788.1250 - mae: 29.2789
Epoch 7/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 698755.2500 - mae: 26.9724
Epoch 8/10
[1m11833/11833[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 2ms/step - loss: 708657.6250 - mae:

<keras.src.callbacks.history.History at 0x7f2b353bda50>

In [28]:
# Save the model with a valid file extension in local server
model.save("e-commerce-engagement_model.keras")  # For the native Keras format

In [29]:
# Export to save model
model.export("saved_model")

INFO:tensorflow:Assets written to: saved_model/assets


INFO:tensorflow:Assets written to: saved_model/assets


Saved artifact at 'saved_model'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 60), dtype=tf.float32, name='keras_tensor')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  139823553030544: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139823553031120: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139823553031888: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139823553031312: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139823553033232: TensorSpec(shape=(), dtype=tf.resource, name=None)


## API Gateway with Flask/FastAPI

Create endpoint to call TensorFlow Serving API:

1. pip install flask requests

2. Create predict_model.py
   
   ```
   from flask import Flask, request, jsonify
    import pickle
    import numpy as np
    from tensorflow.keras.preprocessing.sequence import pad_sequences
    from tensorflow.keras.models import load_model
    
    # Initialize Flask app
    app = Flask(__name__)
    
    # Load tokenizer and model
    with open('tokenizer.pkl', 'rb') as file:
        tokenizer = pickle.load(file)
    
    model = load_model('engagement_model.keras')
    
    # Set a maximum sequence length (should match what was used during training)
    MAX_SEQUENCE_LENGTH = 20
    
    @app.route('/predict', methods=['POST'])
    def predict():
        data = request.get_json()
        input_text = data.get('text', '')
    
        if not input_text:
            return jsonify({'error': 'No text provided'}), 400
    
        try:
            # Convert text to sequences
            sequences = tokenizer.texts_to_sequences([input_text])
            
            # Pad the sequences to ensure consistent input shape
            padded_input = pad_sequences(sequences, maxlen=MAX_SEQUENCE_LENGTH, padding='post')
    
            # Convert to NumPy array (Keras requires this)
            processed_input = np.array(padded_input)
    
            # Perform prediction
            prediction = model.predict(processed_input)
    
            # Return prediction
            return jsonify({'prediction': prediction.tolist()})
    
        except Exception as e:
            return jsonify({'error': str(e)}), 500
    
    
   if __name__ == "__main__":
        app.run(host='0.0.0.0', port=5001)

    ```

3. Run API

    ```
    python predict_model.py
   
    ```
  

# Testing

In [7]:
# Use cURL or Postman for testing:
!curl -X POST -H "Content-Type: application/json" -d '{"text": "Hadiah langsung"}' http://localhost:5000/predict

{"prediction":[[23.862144470214844]]}


In [8]:
!curl -X POST -H "Content-Type: application/json" -d '{"text": "Hadiah langsung"}' http://192.168.241.128:5000/predict

{"prediction":[[23.862144470214844]]}


## Features & Target

- Features:- 
    - clean_tweet: Preprocessed tweet text.
    - hashtags: The number or presence of hashtags.
    - replies_count
    - retweets_count
    - likes_count
      
- Target:
    - engagement: A combination of likes, replies, and retweets.
  