# Spark Machine Learning


## Start Spark Session

In [1]:
from pyspark.sql import SparkSession


#######################
# Create Spark session
#######################
spark = SparkSession.builder \
    .appName("Trending Video Prediction") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()




24/02/22 10:35:08 WARN Utils: Your hostname, MacBook-Pro-von-Philip.local resolves to a loopback address: 127.0.0.1; using 192.168.2.35 instead (on interface en0)
24/02/22 10:35:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/22 10:35:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/22 10:35:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Data load und Data prep


In [3]:
###################################
# Load the dataset
###################################
file_path = "../ScraperData/DE.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

########################################
# Show the first few rows of the dataset
########################################
df.show()




+-----------+--------------------+-------------------+--------------------+--------------------+----------+-------------+--------------------+----------+-------------+-----------------+
|   video_id|               title|        publishedAt|           channelId|        channelTitle|categoryId|trending_date|                tags|view_count|comment_count|comments_disabled|
+-----------+--------------------+-------------------+--------------------+--------------------+----------+-------------+--------------------+----------+-------------+-----------------+
|jhMP8RSv4ws|Final-Auftritt vo...|2024-02-17 22:40:00|UCvbR8mrSZ1BXf2LN...|     Das Supertalent|        24|     24.18.02|Supertalent|Das S...|    514610|          336|            False|
|xeLMS48vrZU|Anna Ermakova - '...|2024-02-17 22:50:00|UCvbR8mrSZ1BXf2LN...|     Das Supertalent|        24|     24.18.02|Supertalent|Das S...|    487691|          332|            False|
|CxpUiuI9O4s|Der unfassbare Fa...|2024-02-18 12:00:08|UCKGMHVipEvuZudh

In [4]:
################################################################
# Display the schema of the dataset to understand the data types
################################################################
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: string (nullable = true)



In [5]:
#################
# Data Prep
#################


######################################
# Change the Datatypes of the columns
######################################

from pyspark.sql.functions import when

# Convert "view_count" column to integer type
df = df.withColumn("view_count", df["view_count"].cast("int"))
# Convert "comment_count" column to integer type
df = df.withColumn("comment_count", df["comment_count"].cast("int"))
# Convert "categoryId" column to integer type
df = df.withColumn("categoryId", df["categoryId"].cast("int"))
# Convert "comments_disabled" column to boolean type
df = df.withColumn("comments_disabled", when(df["comments_disabled"] == "True", True).otherwise(False))


# Show the DataFrame schema
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = false)



In [7]:

##############################################
# Load JSON file and convert it to a DataFrame
##############################################

import json

category_id_df = spark.read.json('../ScraperData/DE_category_id.json')

In [8]:
category_id_df.printSchema()


###############################################
# Sub-Spark-Session for Handling Corrupt Error
###############################################


#Sucessfully solved 

root
 |-- _corrupt_record: string (nullable = true)



In [10]:
########################################################
# Load Json File and prepare for merging with the DE_csv 
########################################################

from pyspark.sql.functions import explode, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType


################################################
# Schema der json definieren
################################################
schema = StructType([
    StructField("kind", StringType(), True),
    StructField("etag", StringType(), True),
    StructField("items", ArrayType(StructType([
        StructField("id", StringType(), True),
        StructField("snippet", StructType([
            StructField("title", StringType(), True)
        ]), True)
    ]), True), True)
])

#################
# Load Json
#################
df_test = spark.read.schema(schema) \
    .option("multiLine", True) \
    .json('../ScraperData/DE_category_id.json')

##########################################
# Select and transform to get id and title
##########################################
df_transformed = df_test.select(explode("items").alias("item")).select(
    col("item.id").alias("id"),
    col("item.snippet.title").alias("title")
)

################################
# Show the transformed dataframe
################################
df_transformed.show(truncate=False)



+---+--------------------+
|id |title               |
+---+--------------------+
|1  |Film & Animation    |
|2  |Autos & Vehicles    |
|10 |Music               |
|15 |Pets & Animals      |
|17 |Sports              |
|18 |Short Movies        |
|19 |Travel & Events     |
|20 |Gaming              |
|21 |Videoblogging       |
|22 |People & Blogs      |
|23 |Comedy              |
|24 |Entertainment       |
|25 |News & Politics     |
|26 |Howto & Style       |
|27 |Education           |
|28 |Science & Technology|
|30 |Movies              |
|31 |Anime/Animation     |
|32 |Action/Adventure    |
|33 |Classics            |
+---+--------------------+
only showing top 20 rows



In [11]:
##################################################################################
# Titel zu CategoryTitel ändern, dass keine doppelten Columns im Join-Prozess sind
##################################################################################
# Rename the 'title' column to 'Filmtitel'
df_transformed_renamed = df_transformed.withColumnRenamed("title", "categoryTitle")

# Show the DataFrame to verify the change
df_transformed_renamed.show()

+---+--------------------+
| id|       categoryTitle|
+---+--------------------+
|  1|    Film & Animation|
|  2|    Autos & Vehicles|
| 10|               Music|
| 15|      Pets & Animals|
| 17|              Sports|
| 18|        Short Movies|
| 19|     Travel & Events|
| 20|              Gaming|
| 21|       Videoblogging|
| 22|      People & Blogs|
| 23|              Comedy|
| 24|       Entertainment|
| 25|     News & Politics|
| 26|       Howto & Style|
| 27|           Education|
| 28|Science & Technology|
| 30|              Movies|
| 31|     Anime/Animation|
| 32|    Action/Adventure|
| 33|            Classics|
+---+--------------------+
only showing top 20 rows



In [14]:
###################################################
# Save the transformed DataFrame to a new JSON file
###################################################
df_transformed_renamed.write.mode("overwrite").json("../ScraperData/cleanDEcategory.json")

In [15]:
##############################
# Print the column names of df
##############################
print(df.columns)

['video_id', 'title', 'publishedAt', 'channelId', 'channelTitle', 'categoryId', 'trending_date', 'tags', 'view_count', 'comment_count', 'comments_disabled']


In [16]:
###################################################################
# Merge CSV und Json mit category
###################################################################
df_joined = df.join(df_transformed_renamed, df.categoryId == df_transformed_renamed.id, 'left') \
               #.withColumnRenamed("title", "category")

df_final = df_joined.drop("id")


df_final.show()



+-----------+--------------------+-------------------+--------------------+--------------------+----------+-------------+--------------------+----------+-------------+-----------------+--------------------+
|   video_id|               title|        publishedAt|           channelId|        channelTitle|categoryId|trending_date|                tags|view_count|comment_count|comments_disabled|       categoryTitle|
+-----------+--------------------+-------------------+--------------------+--------------------+----------+-------------+--------------------+----------+-------------+-----------------+--------------------+
|jhMP8RSv4ws|Final-Auftritt vo...|2024-02-17 22:40:00|UCvbR8mrSZ1BXf2LN...|     Das Supertalent|        24|     24.18.02|Supertalent|Das S...|    514610|          336|            false|       Entertainment|
|xeLMS48vrZU|Anna Ermakova - '...|2024-02-17 22:50:00|UCvbR8mrSZ1BXf2LN...|     Das Supertalent|        24|     24.18.02|Supertalent|Das S...|    487691|          332|     

In [17]:
#################################
# Print the schema of the dataset
#################################
df_final.printSchema()


root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = false)
 |-- categoryTitle: string (nullable = true)



In [18]:
####################################################################
# Entferne Spalten, die wir nicht nutzen werden für die Models etc.
####################################################################

# Zeiten, für andere Model wichtig, wenn Zeit von Realease des Videos bis es in den Trends erscheint wichtig ist. || Oder, wenn das Datum wichtig ist zu der ein bestimmtes Video trendet, bzw. der Zeituraum
#"publishedAt","trending_date"
df = df_final.drop("video_id", "categoryId")



In [19]:
df.show(2)

+--------------------+-------------------+--------------------+---------------+-------------+--------------------+----------+-------------+-----------------+-------------+
|               title|        publishedAt|           channelId|   channelTitle|trending_date|                tags|view_count|comment_count|comments_disabled|categoryTitle|
+--------------------+-------------------+--------------------+---------------+-------------+--------------------+----------+-------------+-----------------+-------------+
|Final-Auftritt vo...|2024-02-17 22:40:00|UCvbR8mrSZ1BXf2LN...|Das Supertalent|     24.18.02|Supertalent|Das S...|    514610|          336|            false|Entertainment|
|Anna Ermakova - '...|2024-02-17 22:50:00|UCvbR8mrSZ1BXf2LN...|Das Supertalent|     24.18.02|Supertalent|Das S...|    487691|          332|            false|Entertainment|
+--------------------+-------------------+--------------------+---------------+-------------+--------------------+----------+-------------+-

In [20]:
##############################################
# Check, ob ein Video Comments deaktiviert hat
##############################################

df_true_check = df.filter(df.comments_disabled == True)
print(df_true_check.count())

26


In [21]:
####################################
# Entfernen weiterer Spalten
####################################

df = df.drop("channelId","comments_disabled")


In [22]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- categoryTitle: string (nullable = true)



## Feature Engineering


In [23]:
from pyspark.sql.functions import when



##############################################################
# Festlegen ab wann ein Video als trending für das Model zählt
##############################################################
df = df.withColumn('is_trending', when((col('view_count') > 500000) & (col('comment_count') > 1000), 1).otherwise(0))


df.show(15)


+--------------------+-------------------+--------------------+-------------+--------------------+----------+-------------+---------------+-----------+
|               title|        publishedAt|        channelTitle|trending_date|                tags|view_count|comment_count|  categoryTitle|is_trending|
+--------------------+-------------------+--------------------+-------------+--------------------+----------+-------------+---------------+-----------+
|Final-Auftritt vo...|2024-02-17 22:40:00|     Das Supertalent|     24.18.02|Supertalent|Das S...|    514610|          336|  Entertainment|          0|
|Anna Ermakova - '...|2024-02-17 22:50:00|     Das Supertalent|     24.18.02|Supertalent|Das S...|    487691|          332|  Entertainment|          0|
|Der unfassbare Fa...|2024-02-18 12:00:08|      Simplicissimus|     24.18.02|Simplicissimus|2 ...|    191084|          607|  Entertainment|          0|
|6 MÄDELS BLIND DA...|2024-02-18 12:00:00|               CEDDO|     24.18.02|ceddotalk|c

In [24]:
#########################################
# Scraper druckt pro Tag 200 neue Entries
#########################################
row_count = df.count()

print(f"The DataFrame has {row_count} rows.")

The DataFrame has 1000 rows.


In [25]:
###########################################################
# Übersicht der Anzahl trendender Videos in einer Kategorie
###########################################################


from pyspark.sql.functions import count


trending_videos_df = df.filter(df.is_trending == 1)


In [26]:
trending_count_by_category = trending_videos_df.groupBy("categoryTitle").agg(count("is_trending").alias("trending_count"))
trending_count_by_category = trending_count_by_category.orderBy("trending_count", ascending=False)

trending_count_by_category.show()

+--------------------+--------------+
|       categoryTitle|trending_count|
+--------------------+--------------+
|               Music|            69|
|              Sports|            57|
|       Entertainment|            38|
|              Gaming|            33|
|      People & Blogs|            23|
|              Comedy|            15|
|Science & Technology|            13|
|    Film & Animation|             9|
|    Autos & Vehicles|             8|
|     News & Politics|             5|
|           Education|             2|
|       Howto & Style|             2|
+--------------------+--------------+



## Feature Transformation

In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder


#######################################
# One-Hot-Encoding
#######################################
indexer = StringIndexer(inputCol="categoryTitle", outputCol="category_index").fit(df)
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")

###########################
# Assemble features
###########################
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["category_vec", "view_count", "comment_count"], outputCol="features")




In [28]:
from pyspark.ml import Pipeline

##########################
# Pipeline Rahmen setzen
##########################
pipeline = Pipeline(stages=[indexer,encoder,assembler])

In [29]:

########################
# Fit Pipline to Data
########################
pipline_model = pipeline.fit(df)

###############################
# Transforms the Data
###############################
df_transformed = pipline_model.transform(df)




In [30]:

df_transformed.select("features", "is_trending").show()

+--------------------+-----------+
|            features|is_trending|
+--------------------+-----------+
|(14,[0,12,13],[1....|          0|
|(14,[0,12,13],[1....|          0|
|(14,[0,12,13],[1....|          0|
|(14,[0,12,13],[1....|          0|
|(14,[0,12,13],[1....|          0|
|(14,[0,12,13],[1....|          0|
|(14,[3,12,13],[1....|          0|
|(14,[1,12,13],[1....|          1|
|(14,[0,12,13],[1....|          0|
|(14,[0,12,13],[1....|          0|
|(14,[5,12,13],[1....|          1|
|(14,[0,12,13],[1....|          1|
|(14,[0,12,13],[1....|          0|
|(14,[3,12,13],[1....|          0|
|(14,[4,12,13],[1....|          1|
|(14,[3,12,13],[1....|          0|
|(14,[6,12,13],[1....|          0|
|(14,[3,12,13],[1....|          0|
|(14,[2,12,13],[1....|          0|
|(14,[7,12,13],[1....|          0|
+--------------------+-----------+
only showing top 20 rows



In [31]:
####################################################
# Splitting the data into Trainingsdata and Testdata
####################################################
(train_data, test_data) = df_transformed.randomSplit([0.7, 0.3])

## RandomForest

In [32]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


#####################################
# Initialize the Random Forest model
#####################################
rf = RandomForestClassifier(labelCol="is_trending", featuresCol="features")

#####################################
# Initialize evaluator
#####################################
evaluator = BinaryClassificationEvaluator(labelCol="is_trending")

#####################
# Parameter setzen
#####################
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [5, 10, 20])
             .addGrid(rf.numTrees, [20, 50, 100])
             .build())


#################################################
# Cross Validator setzen und bestes Model finden
#################################################
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cv_model = cv.fit(train_data)
bestModel = cv_model.bestModel
best_model_predictions = bestModel.transform(test_data)





In [33]:
best_model_predictions.show(1)

+--------------------+-------------------+-------------------+-------------+--------------------+----------+-------------+-------------+-----------+--------------+--------------+--------------------+--------------------+--------------------+----------+
|               title|        publishedAt|       channelTitle|trending_date|                tags|view_count|comment_count|categoryTitle|is_trending|category_index|  category_vec|            features|       rawPrediction|         probability|prediction|
+--------------------+-------------------+-------------------+-------------+--------------------+----------+-------------+-------------+-----------+--------------+--------------+--------------------+--------------------+--------------------+----------+
|1. FC Heidenheim ...|2024-02-19 00:00:16|sportstudio fußball|     24.20.02|Fußball|Fussball|...|    460727|          319|       Sports|          0|           1.0|(12,[1],[1.0])|(14,[1,12,13],[1....|[19.5624549049290...|[0.97812274524645...|

In [34]:
##########################
# Evaluate the best model
##########################
best_model_accuracy = evaluator.evaluate(best_model_predictions)
print(f"Best Model Accuracy: {best_model_accuracy}")

Best Model Accuracy: 0.9988359911535328


In [35]:
test_data.show() 

+--------------------+-------------------+--------------------+-------------+--------------------+----------+-------------+---------------+-----------+--------------+--------------+--------------------+
|               title|        publishedAt|        channelTitle|trending_date|                tags|view_count|comment_count|  categoryTitle|is_trending|category_index|  category_vec|            features|
+--------------------+-------------------+--------------------+-------------+--------------------+----------+-------------+---------------+-----------+--------------+--------------+--------------------+
|1. FC Heidenheim ...|2024-02-19 00:00:16| sportstudio fußball|     24.20.02|Fußball|Fussball|...|    460727|          319|         Sports|          0|           1.0|(12,[1],[1.0])|(14,[1,12,13],[1....|
|1. FC Heidenheim:...|2024-02-19 16:00:08|         Manu Thiele|     24.21.02|Fußball|Fußball N...|     88738|          493|         Sports|          0|           1.0|(12,[1],[1.0])|(14,[1,

## Model Comparison

In [36]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

##########################
# Initialize the models
##########################
lr = LogisticRegression(featuresCol='features', labelCol='is_trending')
dt = DecisionTreeClassifier(featuresCol='features', labelCol='is_trending')
rf = RandomForestClassifier(featuresCol='features', labelCol='is_trending')
gbt = GBTClassifier(featuresCol='features', labelCol='is_trending')


models = [lr, dt, rf, gbt]


evaluator = BinaryClassificationEvaluator(labelCol="is_trending", metricName="areaUnderROC")

################################
# Train and evaluate each model
################################
for model in models:
    # Train model
    model_fit = model.fit(train_data)  
    
    # Make predictions
    predictions = model_fit.transform(test_data)  
    
    # Evaluate model
    accuracy = evaluator.evaluate(predictions)
    print(f"Model: {model.__class__.__name__}, Accuracy: {accuracy}")


24/02/22 10:41:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/02/22 10:41:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


Model: LogisticRegression, Accuracy: 0.9827726690722849
Model: DecisionTreeClassifier, Accuracy: 1.0
Model: RandomForestClassifier, Accuracy: 0.9988359911535328
Model: GBTClassifier, Accuracy: 1.0


In [37]:
test_data.show() 

+--------------------+-------------------+--------------------+-------------+--------------------+----------+-------------+---------------+-----------+--------------+--------------+--------------------+
|               title|        publishedAt|        channelTitle|trending_date|                tags|view_count|comment_count|  categoryTitle|is_trending|category_index|  category_vec|            features|
+--------------------+-------------------+--------------------+-------------+--------------------+----------+-------------+---------------+-----------+--------------+--------------+--------------------+
|1. FC Heidenheim ...|2024-02-19 00:00:16| sportstudio fußball|     24.20.02|Fußball|Fussball|...|    460727|          319|         Sports|          0|           1.0|(12,[1],[1.0])|(14,[1,12,13],[1....|
|1. FC Heidenheim:...|2024-02-19 16:00:08|         Manu Thiele|     24.21.02|Fußball|Fußball N...|     88738|          493|         Sports|          0|           1.0|(12,[1],[1.0])|(14,[1,

## Model Testing on Fake Data


In [44]:
from pyspark.sql.functions import lit

# Step 1: Create a new DataFrame with the required structure
new_data = [
    {"categoryTitle": "Music","title": "New Video 1", 
      "view_count": 100000, "comment_count": 500,
      "publishedAt": "2024-02-18T11:00:08Z","channelTitle" : "h","tags": "ggg", "trending_date" : "24.18.02"}
    
]
new_df = spark.createDataFrame(new_data)

indexer = StringIndexer(inputCol="categoryTitle", outputCol="category_index").fit(df)
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
assembler = VectorAssembler(inputCols=["category_vec", "view_count", "comment_count"], outputCol="features")
pipeline = Pipeline(stages=[indexer,encoder,assembler])
pipline_model = pipeline.fit(df)
new_df_transformed = pipline_model.transform(new_df)







In [45]:
new_df_transformed.printSchema()
train_data.printSchema()

root
 |-- categoryTitle: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- title: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- view_count: long (nullable = true)
 |-- category_index: double (nullable = false)
 |-- category_vec: vector (nullable = true)
 |-- features: vector (nullable = true)

root
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- categoryTitle: string (nullable = true)
 |-- is_trending: integer (nullable = false)
 |-- category_index: double (nullable = false)
 |-- category_vec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [46]:
new_df_transformed.printSchema()

root
 |-- categoryTitle: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- title: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- view_count: long (nullable = true)
 |-- category_index: double (nullable = false)
 |-- category_vec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [47]:

predictions = bestModel.transform(new_df_transformed)

# Show the predictions
predictions.select("title", "categoryTitle", "view_count", "comment_count", "features", "prediction").show()


+-----------+-------------+----------+-------------+--------------------+----------+
|      title|categoryTitle|view_count|comment_count|            features|prediction|
+-----------+-------------+----------+-------------+--------------------+----------+
|New Video 1|        Music|    100000|          500|(14,[2,12,13],[1....|       0.0|
+-----------+-------------+----------+-------------+--------------------+----------+

