In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=583849bfe89e618c96e4615900b9b513de3c13efe10c482be49a2caf254a362c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:

from pyspark.sql.functions import explode
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import split, udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
from pyspark.ml.feature import OneHotEncoder

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session
spark = SparkSession.builder.appName("TVShowRecommendation").getOrCreate()

# Load the dataset
data = spark.read.csv("/content/tv_shows.csv", header=True, inferSchema=True)
data = data.limit(50)
data.show()

# Select columns for distribution
columns_to_plot = ["Netflix", "Hulu", "Prime Video", "Disney+"]
# Collect the data to the driver
data_collect = data.select(*columns_to_plot).toPandas()
# Plot the distributions using matplotlib
plt.figure(figsize=(12, 6))

for col in columns_to_plot:
    plt.hist(data_collect[col], bins=20, alpha=0.5, label=col)

plt.title("Distribution of Columns")
plt.xlabel("Value")
plt.ylabel("Frequency")
plt.legend()
plt.show()

# Split the title column into individual words
words = udf(lambda x: x.split(), ArrayType(StringType()))
tv_shows = data.withColumn("words", words(data.Title))

# Explode the array of words to create a row for each word
tv_shows_exploded = tv_shows.select("ID", "words", "Title")
tv_shows_exploded = tv_shows_exploded.withColumn("word", explode("words"))


# Index the words using StringIndexer
string_indexer = StringIndexer(inputCol="word", outputCol="indexed_words")
model = string_indexer.fit(tv_shows_exploded)
indexed_words = model.transform(tv_shows_exploded)

print(indexed_words)

# Apply OneHotEncoder to the indexed words
encoder = OneHotEncoder(inputCol="indexed_words", outputCol="encoded_words")
encoder.setDropLast(False)
ohe = encoder.fit(indexed_words) # indexer is the existing dataframe, see the question
encoded_words = ohe.transform(indexed_words)

# Assemble the indexed words into a feature vector
assembler = VectorAssembler(inputCols=["encoded_words"], outputCol="features")
#indexed_words = assembler.transform(indexed_words)
encoded_words_assembled = assembler.transform(encoded_words)

# Split the dataset into training and testing sets
#(training, testing) = indexed_words.randomSplit([0.8, 0.2])
(training, testing) = encoded_words_assembled.randomSplit([0.8, 0.2])

# Define the random forest classifier
rf = RandomForestClassifier(labelCol="ID", featuresCol="features")

# Define the evaluation metric
evaluator = MulticlassClassificationEvaluator(labelCol="ID", predictionCol="prediction", metricName="accuracy")

# Define the hyperparameter grid for cross-validation
param_grid = ParamGridBuilder().addGrid(rf.numTrees, [10, 50, 100]).addGrid(rf.maxDepth, [2, 5, 10]).build()


# Define the cross-validation pipeline
pipeline = Pipeline(stages=[rf])

# Create a CrossValidator
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3
)

# Run cross-validation and choose the best set of parameters
cv_model = crossval.fit(training)

# Make predictions on the testing set
predictions = cv_model.transform(testing)

# Evaluate the model
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


# Define a function to recommend streaming platforms for a given TV show
def recommend_platforms(title):
    # Split the title into individual words
    words = title.split()

    # Index the words
    indexed_wrds = model.transform(spark.createDataFrame([(words,)], ["words"])).select("indexed_words").first()[0]

    # Assemble the feature vector
    features = assembler.transform(spark.createDataFrame([(indexed_wrds,)], ["indexed_wrds"])).select("features").first()[0]

    # Make a prediction using the trained model
    prediction = model.transform(spark.createDataFrame([(features,)], ["features"])).select("prediction").first()[0]

    # Get the top 3 streaming platforms with the highest probabilities
    platforms = indexed_wrds.zip(prediction).filter(lambda x: x[1] > 0).sortBy(lambda x: x[1], ascending=False).take(3)

    # Return the names of the top 3 streaming platforms
    return [model.stages[-1].labels[int(index)] for (index, prob) in platforms]
# Test the recommendation system
print(recommend_platforms("The Lion King"))

# Stop the Spark session
spark.stop()