## INSTALLATION

In [None]:
# install the OpenJDK 11 JDK on a Debian or Ubuntu-based system in a quiet and non-interactive manner
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
# download a file from a website
# !wget -nc -q https://dlcdn.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.tgz
!wget -nc -q https://dlcdn.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz
# extract the contents of a file
!tar xf spark-3.2.4-bin-hadoop2.7.tgz

In [None]:
 # API for interacting with the Spark
!pip install pyspark==3.2.4

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.2.4
  Downloading pyspark-3.2.4.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m27.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.4-py2.py3-none-any.whl size=282040940 sha256=373aef36d067f2214a755616f7931fb09c37833c3646c6d6c7707c7bfec144b0
  Stored in directory: /root/.cache/pip/wheels/23/b7/78/7ee2975da06bd91492903dc71a17dd87ee851b9ae5b69c85b4
Successfully built pyspark
Installing collected packages: py4j

## Set Environment Variables


In [None]:
# explore files and directories of any locations in colab
!ls /content/spark-3.2.4-bin-hadoop2.7/

bin   data	jars	    LICENSE   NOTICE  R		 RELEASE  yarn
conf  examples	kubernetes  licenses  python  README.md  sbin


In [None]:
import os, sys
# set the environment variable to the locations where Spark and Java are installed.
os.environ["JAVA_HOME"] =  "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop2.7"
# add PySpark libraries to the system path:
sys.path.append("/content/spark-3.2.4-bin-hadoop2.7/python")
sys.path.append("/content/spark-3.2.4-bin-hadoop2.7/python/lib/py4j-0.10.9.5-src.zip")

In [None]:
# import dependencies
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer, Bucketizer, OneHotEncoder, VectorAssembler, StringIndexer, MinMaxScalerModel, \
StandardScaler, Imputer, Tokenizer,StopWordsRemover, MinMaxScaler, PolynomialExpansion
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, ClusteringEvaluator, RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.mllib.evaluation import RankingMetrics
from random import randint
from pprint import pprint
import matplotlib.pyplot as plt
import numpy as np; import pandas as pd
from pyspark.sql.functions import concat, lit



In [None]:
# spark = SparkSession.builder.appName("PROJECT").getOrCreate()
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "80g") \
    .appName("PROJECT") \
    .getOrCreate()
sc = spark.sparkContext 

In [None]:
spark

## DATA PREPARATION

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path = "/content/drive/MyDrive/Big_Data_Project/"

In [None]:
# path = '/Project Bigdata/DATA/'

In [None]:
# online uers of retail sport merchandise company
df = spark.read.csv(path +
    "select_item.csv", 
    inferSchema=True, 
    header=True
)

df = df.withColumn('article_id', concat(lit('0'), df['article_id']))
# df.limit(10).show(truncate=False)
df.show()


+----------+------+
|article_id|labels|
+----------+------+
|0751399004|Blazer|
|0560559001|Blazer|
|0768285002|Blazer|
|0783144005|Blazer|
|0719378001|Blazer|
|0598636001|Blazer|
|0783245003|Blazer|
|0837368001|Blazer|
|0768433001|Blazer|
|0636455009|Blazer|
|0611745008|Blazer|
|0501288005|Blazer|
|0711031001|Blazer|
|0568601008|Blazer|
|0718908001|Blazer|
|0630542002|Blazer|
|0593009001|Blazer|
|0749815001|Blazer|
|0746292003|Blazer|
|0892327002|Blazer|
+----------+------+
only showing top 20 rows



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

label_stringIdx = StringIndexer(inputCol = "labels", outputCol = "label")
pipeline = Pipeline(stages=[label_stringIdx])
# Fit the pipeline to training documents.
# pipelineFit = pipeline.fit(df3)
# df4 = df3
pipelineFit = pipeline.fit(df)
df = pipelineFit.transform(df)
df.show(5)

+----------+------+-----+
|article_id|labels|label|
+----------+------+-----+
|0751399004|Blazer|  0.0|
|0560559001|Blazer|  0.0|
|0768285002|Blazer|  0.0|
|0783144005|Blazer|  0.0|
|0719378001|Blazer|  0.0|
+----------+------+-----+
only showing top 5 rows



In [None]:
df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  8.0| 1000|
|  0.0| 1000|
|  7.0| 1000|
|  1.0| 1000|
|  4.0| 1000|
| 11.0| 1000|
| 14.0| 1000|
|  3.0| 1000|
|  2.0| 1000|
| 17.0| 1000|
| 10.0| 1000|
| 13.0| 1000|
|  6.0| 1000|
|  5.0| 1000|
| 15.0| 1000|
|  9.0| 1000|
| 16.0| 1000|
| 12.0| 1000|
+-----+-----+



In [None]:
# image_array18000_86.npy = 2048 features
# image_array18000_128.npy = 8192 features
# image_array18000_224.npy = 51200 features

features = np.load('/content/drive/MyDrive/Big_Data_Project/image_array18000_86.npy')[::2]
names = np.load(path + 'image_name.npy')[::2]

In [None]:
features.shape,names.shape

((9000, 2048), (9000,))

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.linalg import Vectors


names = [int(x) for x in names]
features = [Vectors.dense(features[i]) for i in range(len(features))]
schema = StructType([StructField("names", IntegerType()),StructField("features", VectorUDT(), True)])
 
data = [(names[i], features[i]) for i in range(len(names))]
df2 = spark.createDataFrame(data, schema)
df2 = df2.withColumn('names', concat(lit('0'), df2['names']))
df2.show()

+----------+--------------------+
|     names|            features|
+----------+--------------------+
|0751399004|[0.0,2.4363582134...|
|0768285002|[2.18542218208313...|
|0719378001|[2.28301310539245...|
|0783245003|[2.30664372444152...|
|0768433001|[0.44266173243522...|
|0611745008|[0.0,0.0,0.0,0.52...|
|0711031001|[1.25683736801147...|
|0718908001|[1.89222705364227...|
|0593009001|[0.0,2.8302984237...|
|0746292003|[1.89797222614288...|
|0504413001|[0.42037066817283...|
|0497369001|[1.76074838638305...|
|0747152002|[0.0,2.2628073692...|
|0813410001|[2.18276691436767...|
|0615176001|[0.0,0.0,0.0,2.20...|
|0728156022|[0.86103808879852...|
|0724699001|[0.00372925400733...|
|0588251002|[0.77098608016967...|
|0931981001|[0.36269199848175...|
|0740307001|[0.03329463303089...|
+----------+--------------------+
only showing top 20 rows



In [None]:
!python -V

Python 3.9.16


In [None]:
df3 = df.join(df2,df.article_id ==  df2.names,"inner")
df3 = df3.select(['article_id','labels','label','features'])
df3.show(5)

+----------+------+-----+--------------------+
|article_id|labels|label|            features|
+----------+------+-----+--------------------+
|0751399004|Blazer|  0.0|[0.0,2.4363582134...|
|0768285002|Blazer|  0.0|[2.18542218208313...|
|0719378001|Blazer|  0.0|[2.28301310539245...|
|0783245003|Blazer|  0.0|[2.30664372444152...|
|0768433001|Blazer|  0.0|[0.44266173243522...|
+----------+------+-----+--------------------+
only showing top 5 rows



In [None]:
# df3.write.parquet(path + "/dataframe.parquet")
# df3.write.option("header", "true").option("delimiter", "|").option("encoding", "UTF-8").format("csv").save(path + "df2048.csv")


## TRAIN

In [None]:
model_df=df3.select(['features','label'])
# model_df = model_df.sample(0.01)
model_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,2.4363582134...|  0.0|
|[2.18542218208313...|  0.0|
|[2.28301310539245...|  0.0|
|[2.30664372444152...|  0.0|
|[0.44266173243522...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [None]:
train, test = model_df.randomSplit([0.7,0.3], 42)
# check whether the target class is balanced

# train.groupBy('label').count().show()
# test.groupBy('label').count().show()

## LOGISTIC REGRESSION

### Hyper-Parameter Tuning

In [None]:
import itertools
s=[ [0, 0.1, 0.01,0.001], [0.0,0.1,0.3, 0.5, 1.0], [0.5] ]
# s=[ [0.01,0.001], [0.0, 1.0] ]
paramGrid = list(itertools.product(*s))
print('Total loop: ',len(paramGrid))

Total loop:  20


In [None]:
paramGrid

[(0, 0.0, 0.5),
 (0, 0.1, 0.5),
 (0, 0.3, 0.5),
 (0, 0.5, 0.5),
 (0, 1.0, 0.5),
 (0.1, 0.0, 0.5),
 (0.1, 0.1, 0.5),
 (0.1, 0.3, 0.5),
 (0.1, 0.5, 0.5),
 (0.1, 1.0, 0.5),
 (0.01, 0.0, 0.5),
 (0.01, 0.1, 0.5),
 (0.01, 0.3, 0.5),
 (0.01, 0.5, 0.5),
 (0.01, 1.0, 0.5),
 (0.001, 0.0, 0.5),
 (0.001, 0.1, 0.5),
 (0.001, 0.3, 0.5),
 (0.001, 0.5, 0.5),
 (0.001, 1.0, 0.5)]

In [None]:
run_dict = {}
run_param_dict = {}

for i in range(len(paramGrid)):

  print(f'Run {i+1}')
  
  model = LogisticRegression(maxIter=20, regParam=paramGrid[i][0], elasticNetParam=paramGrid[i][1],threshold=paramGrid[i][2],featuresCol='features', labelCol="label")
  p_model = model.fit(train)  

  evaluator_LR = MulticlassClassificationEvaluator(predictionCol="prediction")
  df_test_LR = p_model.transform(test)

  run_param_dict[f'run_{i+1}']=[p_model.getRegParam(),paramGrid[i][1],paramGrid[i][2]]

  print('RegParam: ', p_model.getRegParam())
  print('ElasticNetParam: ', p_model.getElasticNetParam())
  print('threshold: ', p_model.getThreshold())
  print('-'*100)
  print('Model Evaluation')
  result_dict = {}

  result_dict["accuracy"] = evaluator_LR.evaluate(df_test_LR, {evaluator_LR.metricName: "accuracy"})
  result_dict["precision"] = evaluator_LR.evaluate(df_test_LR, {evaluator_LR.metricName: "weightedPrecision"})
  result_dict["recall"]  = evaluator_LR.evaluate(df_test_LR, {evaluator_LR.metricName: "weightedRecall"})
  result_dict["f1"] = evaluator_LR.evaluate(df_test_LR, {evaluator_LR.metricName: "f1"})

  print("accuracy: " , result_dict["accuracy"] )
  print("precision: " ,result_dict["precision"])
  print("recall: " ,result_dict["recall"])
  print("f1: " ,result_dict["f1"])
  print('='*100)

  run_dict[f'run_{i+1}']=result_dict

Run 1
RegParam:  0.0
ElasticNetParam:  0.0
threshold:  0.5
----------------------------------------------------------------------------------------------------
Model Evaluation
accuracy:  0.5190457868410927
precision:  0.5214050393462718
recall:  0.5190457868410927
f1:  0.5185752525339519
Run 2
RegParam:  0.0
ElasticNetParam:  0.1
threshold:  0.5
----------------------------------------------------------------------------------------------------
Model Evaluation
accuracy:  0.5190457868410927
precision:  0.5214050393462718
recall:  0.5190457868410927
f1:  0.5185752525339519
Run 3
RegParam:  0.0
ElasticNetParam:  0.3
threshold:  0.5
----------------------------------------------------------------------------------------------------
Model Evaluation
accuracy:  0.5190457868410927
precision:  0.5214050393462718
recall:  0.5190457868410927
f1:  0.5185752525339519
Run 4
RegParam:  0.0
ElasticNetParam:  0.5
threshold:  0.5
-----------------------------------------------------------------------

In [None]:
lr_param = pd.DataFrame.from_dict(run_param_dict)
lr_param.to_csv(path+'lr_param.csv')
lr_param

In [None]:
result = pd.DataFrame.from_dict(run_dict)
result

Unnamed: 0,run_1,run_2,run_3,run_4,run_5,run_6,run_7,run_8,run_9,run_10,run_11,run_12,run_13,run_14,run_15,run_16,run_17,run_18,run_19,run_20
accuracy,0.519046,0.519046,0.519046,0.519046,0.519046,0.559831,0.465564,0.290112,0.184302,0.048865,0.549827,0.554444,0.548673,0.515968,0.464794,0.52828,0.531743,0.538669,0.542132,0.547134
precision,0.521405,0.521405,0.521405,0.521405,0.521405,0.559202,0.478657,0.292021,0.097135,0.002388,0.550176,0.554283,0.548108,0.516379,0.464376,0.530699,0.53358,0.536666,0.541348,0.54616
recall,0.519046,0.519046,0.519046,0.519046,0.519046,0.559831,0.465564,0.290112,0.184302,0.048865,0.549827,0.554444,0.548673,0.515968,0.464794,0.52828,0.531743,0.538669,0.542132,0.547134
f1,0.518575,0.518575,0.518575,0.518575,0.518575,0.556086,0.462755,0.249725,0.114033,0.004553,0.548396,0.552737,0.546198,0.513098,0.46041,0.527907,0.530952,0.536484,0.540518,0.545237


In [None]:
result.T

NameError: ignored

In [None]:
result.to_csv(path+'lr_tuning_result.csv')

## RANDOM FOREST

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 4)
# Train model with Training Data
rfModel = rf.fit(train)
df_test_RF = rfModel.transform(test)
df_test_RF.filter(df_test_RF['prediction'] == 0).orderBy("probability", ascending=False).show(n = 10, truncate = 30)
#     .select("Descript","Category","probability","label","prediction") \
#     .orderBy("probability", ascending=False) \
#     .show(n = 10, truncate = 30)

+------------------------------+-----+------------------------------+------------------------------+----------+
|                      features|label|                 rawPrediction|                   probability|prediction|
+------------------------------+-----+------------------------------+------------------------------+----------+
|[4.125407695770264,0.610043...|  0.0|[19.362866418470038,5.54028...|[0.19362866418470037,0.0554...|       0.0|
|[2.2469189167022705,0.08851...|  0.0|[17.880946476745358,5.99047...|[0.1788094647674536,0.05990...|       0.0|
|[0.9018087387084961,0.0,0.0...|  0.0|[17.689719756807335,6.18618...|[0.17689719756807337,0.0618...|       0.0|
|[0.017782196402549744,0.0,0...|  0.0|[17.591425944508693,6.22207...|[0.17591425944508696,0.0622...|       0.0|
|[1.1819677352905273,0.0,0.8...|  0.0|[16.846006293076815,6.02606...|[0.16846006293076818,0.0602...|       0.0|
|[1.1828473806381226,0.25439...|  0.0|[16.78066635149096,6.222847...|[0.16780666351490964,0.0622...|    

In [None]:
evaluator_RF = MulticlassClassificationEvaluator(predictionCol="prediction")
print("RANDOM FOREST: ")
print("accuracy: " ,evaluator_RF.evaluate(df_test_RF, {evaluator_RF.metricName: "accuracy"}))
print("precision: " ,evaluator_RF.evaluate(df_test_RF, {evaluator_RF.metricName: "weightedPrecision"}))
print("recall: " ,evaluator_RF.evaluate(df_test_RF, {evaluator_RF.metricName: "weightedRecall"}))
print("f1: " ,evaluator_RF.evaluate(df_test_RF, {evaluator_RF.metricName: "f1"}))

RANDOM FOREST: 
accuracy:  0.31973836090804153
precision:  0.40484765391016936
recall:  0.3197383609080416
f1:  0.28521784089084723


### Hyper-Parameter Tuning

In [None]:
import itertools
s=[ 
  [x for x in range(100,600,100)], # numTrees
  [x for x in range(2,10,2)], #maxDepth
  [x for x in range(1,4,1)]  #minInstancesPerNode
   ]

paramGrid = list(itertools.product(*s))
print('Total loop: ',len(paramGrid))
paramGrid



Total loop:  60


[(100, 2, 1),
 (100, 2, 2),
 (100, 2, 3),
 (100, 4, 1),
 (100, 4, 2),
 (100, 4, 3),
 (100, 6, 1),
 (100, 6, 2),
 (100, 6, 3),
 (100, 8, 1),
 (100, 8, 2),
 (100, 8, 3),
 (200, 2, 1),
 (200, 2, 2),
 (200, 2, 3),
 (200, 4, 1),
 (200, 4, 2),
 (200, 4, 3),
 (200, 6, 1),
 (200, 6, 2),
 (200, 6, 3),
 (200, 8, 1),
 (200, 8, 2),
 (200, 8, 3),
 (300, 2, 1),
 (300, 2, 2),
 (300, 2, 3),
 (300, 4, 1),
 (300, 4, 2),
 (300, 4, 3),
 (300, 6, 1),
 (300, 6, 2),
 (300, 6, 3),
 (300, 8, 1),
 (300, 8, 2),
 (300, 8, 3),
 (400, 2, 1),
 (400, 2, 2),
 (400, 2, 3),
 (400, 4, 1),
 (400, 4, 2),
 (400, 4, 3),
 (400, 6, 1),
 (400, 6, 2),
 (400, 6, 3),
 (400, 8, 1),
 (400, 8, 2),
 (400, 8, 3),
 (500, 2, 1),
 (500, 2, 2),
 (500, 2, 3),
 (500, 4, 1),
 (500, 4, 2),
 (500, 4, 3),
 (500, 6, 1),
 (500, 6, 2),
 (500, 6, 3),
 (500, 8, 1),
 (500, 8, 2),
 (500, 8, 3)]

In [None]:
run_dict = {}
run_param_dict = {}

for i in range(len(paramGrid)):

  print(f'Run {i+1}')
  
  model = RandomForestClassifier(numTrees=paramGrid[i][0], maxDepth=paramGrid[i][1],minInstancesPerNode=paramGrid[i][2],featuresCol='features', labelCol="label")
  rf_model = model.fit(train)  

  evaluator_RF = MulticlassClassificationEvaluator(predictionCol="prediction")
  df_test_rf = rf_model.transform(test)

  run_param_dict[f'run_{i+1}']=[rf_model.getRegParam(),paramGrid[i][1],paramGrid[i][2]]

  print('RegParam: ', rf_model.getNumTrees)
  print('ElasticNetParam: ', rf_model.getMaxDepth())
  print('threshold: ', rf_model.getMinInstancesPerNode())
  print('-'*100)

  print('Model Evaluation')
  result_dict = {}

  result_dict["accuracy"] = evaluator_RF.evaluate(df_test_rf, {evaluator_RF.metricName: "accuracy"})
  result_dict["precision"] = evaluator_RF.evaluate(df_test_rf, {evaluator_RF.metricName: "weightedPrecision"})
  result_dict["recall"]  = evaluator_RF.evaluate(df_test_rf, {evaluator_RF.metricName: "weightedRecall"})
  result_dict["f1"] = evaluator_RF.evaluate(df_test_rf, {evaluator_RF.metricName: "f1"})

  print("accuracy: " , result_dict["accuracy"] )
  print("precision: " ,result_dict["precision"])
  print("recall: " ,result_dict["recall"])
  print("f1: " ,result_dict["f1"])
  print('='*100)

  run_dict[f'run_{i+1}']=result_dict

In [None]:
rf_param = pd.DataFrame.from_dict(run_param_dict)
rf_param.to_csv(path+'rf_param.csv')
rf_param

In [None]:
result = pd.DataFrame.from_dict(run_dict)
result.to_csv(path+'lr_tuning_result.csv')
result

In [None]:
evaluator_RF = MulticlassClassificationEvaluator(predictionCol="prediction")
print("RANDOM FOREST:")
print("accuracy: " ,evaluator_RF.evaluate(df_test_RF, {evaluator_RF.metricName: "accuracy"}))
# print("sensitivity: " , evaluator_RF.evaluate(df_test_RF, {evaluator_RF.metricName: "truePositiveRateByLabel"}))

RANDOM FOREST:
accuracy:  0.522077922077922


# **Save** model

In [None]:
p_model.save("/content/drive/MyDrive/logistic_regression_model")

In [None]:
rfModel.save("/content/drive/MyDrive/random_forest_model")