In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, regexp_replace, when
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.appName("Webtoon Data Cleaning and Exploration").getOrCreate()

df = spark.read.csv("C:/Users/pc/Desktop/Webtoon Dataset.csv", header=True, inferSchema=True)

# DATA INSPECTION
num_rows = df.count()
num_columns = len(df.columns)
column_data_types = df.dtypes

print(f"Number of Rows: {num_rows}")
print(f"Number of Columns: {num_columns}")
for column, data_type in column_data_types:
    print(f"Column: {column}, Data Type: {data_type}")

df.describe(["Likes", "Rating", "Subscribers"]).show()
             
df.select("Genre").distinct().show()

# DATA CLEANING
# Convert "M" and "K" indicators to numeric values
def convert_indicator_to_numeric(column_name):
    return (regexp_replace(trim(column_name), r'[^\d.]', '').cast(DoubleType()) *
            when(column_name.endswith('M'), 1000000)
            .when(column_name.endswith('K'), 1000)
            .otherwise(1))
df = df.withColumn("Likes", convert_indicator_to_numeric(col("Likes")))
df = df.withColumn("Subscribers", convert_indicator_to_numeric(col("Subscribers")))

cleaned_df = df.na.drop() # Drop rows with missing values
cleaned_df = cleaned_df.dropDuplicates() # Remove duplicate rows
cleaned_df.show()


# DATA VISUALIZATION
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pyspark.sql.functions import col

# Select the relevant columns for the scatter plot
scatter_data = cleaned_df.select("Name", "Likes", "Subscribers").toPandas()

# Create a scatter plot
plt.figure(figsize=(10, 6))
plt.scatter(scatter_data["Likes"], scatter_data["Subscribers"], alpha=0.5, marker='o')
plt.xlabel("Likes")
plt.ylabel("Subscribers")
plt.title("Likes vs. Subscribers for Webtoons")
plt.grid(True)

# Calculate the correlation between Likes and Subscribers
correlation = scatter_data["Likes"].corr(scatter_data["Subscribers"])
correlation_text = f"Correlation: {correlation:.2f}"

# Calculate the best-fit line (trend line)
slope, intercept = np.polyfit(scatter_data["Likes"], scatter_data["Subscribers"], 1)
trend_line = slope * scatter_data["Likes"] + intercept

# Plot the trend line
plt.plot(scatter_data["Likes"], trend_line, color='red', label='Trend Line')

# Annotate the plot with the correlation value
plt.annotate(correlation_text, xy=(np.min(scatter_data["Likes"]), np.max(scatter_data["Subscribers"])))

# Show a legend
plt.legend()

plt.show()

