In [1]:
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from collections import Counter, defaultdict
from nltk.stem import WordNetLemmatizer
import copy

# Download necessary NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')


# Load data
train_data = pd.read_csv('train.txt', delimiter=' ::: ', engine='python')
test_data = pd.read_csv('test.txt', delimiter=' ::: ', engine='python')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Qasem\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Qasem\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Qasem\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Importing train data and
Preprocessing

In [94]:
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
# Preprocessing function
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    tokens = word_tokenize(text)
    filtered_tokens = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words]
    return ' '.join(filtered_tokens)

# Preprocess data

preprocess_data = copy.deepcopy(train_data.head(1000))
preprocess_data['description'] = preprocess_data['description'].apply(preprocess_text)
data = preprocess_data


In [55]:
# from imblearn.under_sampling import RandomUnderSampler

# X_train = copy.deepcopy(data['description'])
# y_train = copy.deepcopy(data['genre'])

# # تبدیل داده‌ها به فرمت مناسب
# X_train = X_train.values.reshape(-1, 1)

# # ایجاد نمونه‌های کاهشی
# rus = RandomUnderSampler(random_state=42)
# X_resampled, y_resampled = rus.fit_resample(X_train, y_train)

# # تبدیل داده‌های کاهشی به DataFrame
# resampled_train_data = pd.DataFrame({
#     'description': X_resampled.flatten(),
#     'genre': y_resampled
# })

# des = []
# genre = []
# for i in range(len(X_resampled)):
#     des.append(X_resampled[i][0])
#     genre.append(y_resampled[i][0])

# n = pd.DataFrame({'description':des, 'genre':y_resampled})
# data = n

In [95]:

# Precompute IDF values
def compute_idf(corpus):
    idf_dict = defaultdict(int)
    for document in corpus:
        unique_words = set(document.split())
        for word in unique_words:
            idf_dict[word] += 1
    total_documents = len(corpus)
    idf_dict = {word: np.log10(total_documents / count) for word, count in idf_dict.items()}
    return idf_dict

# Compute TF-IDF
corpus = data['description'].tolist()

idf_dict = compute_idf(corpus)

def compute_tf(text):
    tokens = text.split()
    tf_text = Counter(tokens)
    tf_text = {word: count / len(tokens) for word, count in tf_text.items()}
    return tf_text

# Train-test split function
def train_test_split(X, y, test_size=0.2, random_state=42):
    np.random.seed(random_state)
    indices = np.random.permutation(len(X))
    test_size = int(len(X) * test_size)
    test_indices = indices[:test_size]
    train_indices = indices[test_size:]
    X_train = X[train_indices]
    X_val = X[test_indices]
    y_train = y.iloc[train_indices]
    y_val = y.iloc[test_indices]
    return X_train, X_val, y_train, y_val



# tfidf_matrix = []
# for text in corpus:
#     tfidf_text = {}
#     computed_tf = compute_tf(text)
#     for word in computed_tf:
#         tfidf_text[word] = computed_tf[word] * idf_dict.get(word, 0)
#     tfidf_matrix.append(tfidf_text)

# # Convert the tfidf_matrix to a consistent format
# unique_words = set(word for doc in tfidf_matrix for word in doc.keys())
# word_index = {word: i for i, word in enumerate(unique_words)}
# tfidf_vectors = []
# for doc in tfidf_matrix:
#     vector = np.zeros(len(unique_words))
#     for word, tfidf in doc.items():
#         vector[word_index[word]] = tfidf
#     tfidf_vectors.append(vector)

# tfidf_vectors = np.array(tfidf_vectors)


In [119]:
class knn_model:
    def __init__(self,k=5):
        self.k = k
        
    def euclidean_distance(self,a, b):
        return np.sqrt(np.sum((a - b) ** 2))

    def fit(self, corpus):
        # Compute TF-IDF
        
        idf_dict = compute_idf(corpus)
        tfidf_matrix = []
        for text in corpus:
            tfidf_text = {}
            computed_tf = compute_tf(text)
            for word in computed_tf:
                tfidf_text[word] = computed_tf[word] * idf_dict.get(word, 0)
            tfidf_matrix.append(tfidf_text)

        # Convert the tfidf_matrix to a consistent format
        unique_words = set(word for doc in tfidf_matrix for word in doc.keys())
        
        word_index = {word: i for i, word in enumerate(unique_words)}
        tfidf_vectors = []
        for doc in tfidf_matrix:
            vector = np.zeros(len(unique_words))
            for word, tfidf in doc.items():
                vector[word_index[word]] = tfidf
            tfidf_vectors.append(vector)

        self.tfidf_vectors = np.array(tfidf_vectors)
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(self.tfidf_vectors, data['genre'], test_size=0.3, random_state=42)
        return self.tfidf_vectors

    # KNN predict function
    def predict(self, x_test):
        distances = []
        for i in range(len(X_train)):
            dist = self.euclidean_distance(self.X_train[i], x_test)
            distances.append((dist, self.y_train.iloc[i]))
        
        distances.sort(key=lambda x: x[0])
        neighbors = distances[:self.k]
        output_values = [neighbor[1] for neighbor in neighbors]
        prediction = Counter(output_values).most_common(1)[0]
        return prediction

corpus = data['description'].tolist()
k = knn_model()
tfidf_vectors = k.fit(corpus)       
X_train, X_val, y_train, y_val = train_test_split(tfidf_vectors, data['genre'], test_size=0.3, random_state=42)

In [122]:
y_pred = [k.predict(x)[0] for x in X_val]

# Classification report function
def classification_report(y_true, y_pred):
    unique_genres = {x:0 for x in list(data['genre'].unique())}
    labels = list(set(y_true))
    y_pred_list = list(y_pred)
    
    for x in y_pred_list:
        unique_genres[x]+=1
    
    report = {}
    for label in labels:
        
        true_positive = sum((y_true == label) & (y_pred == label))
        false_positive = sum((y_true != label) & (y_pred == label))
        false_negative = sum((y_true == label) & (y_pred != label))
        precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0
        recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
        report[label] = {
            'precision': precision,
            'recall': recall,
            'f1-score': f1_score,
            'support': unique_genres[label]
            
        }
        report = pd.DataFrame(report)
    
    return report.T

print(classification_report(np.array(y_val), np.array(y_pred)))

             precision    recall  f1-score  support
crime         0.000000  0.000000  0.000000      0.0
western       0.000000  0.000000  0.000000      0.0
sci-fi        0.000000  0.000000  0.000000      0.0
animation     0.000000  0.000000  0.000000      0.0
comedy        0.250000  0.090909  0.133333      4.0
documentary   0.311828  0.966667  0.471545     93.0
drama         0.000000  0.000000  0.000000      2.0
action        0.000000  0.000000  0.000000      0.0
horror        0.000000  0.000000  0.000000      0.0
mystery       0.000000  0.000000  0.000000      0.0
family        0.000000  0.000000  0.000000      0.0
adventure     0.000000  0.000000  0.000000      0.0
reality-tv    0.000000  0.000000  0.000000      0.0
romance       0.000000  0.000000  0.000000      0.0
short         0.000000  0.000000  0.000000      1.0
thriller      0.000000  0.000000  0.000000      0.0


In [76]:
# ذخیره مدل و بردار ساز برای استفاده در آینده
import joblib
joblib.dump(k, 'knn_model.pkl')
joblib.dump(tfidf_vectors, 'tfidf_vectorizer.pkl')

['tfidf_vectorizer.pkl']

In [77]:
# بارگذاری مدل و بردار ساز
model = joblib.load('knn_model.pkl')
vectorizer = joblib.load('tfidf_vectorizer.pkl')

# Test prediction
model.fit(corpus)

# Preprocess and vectorize function
def vectorizer_transform(description):
    description = preprocess_text(description[0])
    computed_tf = compute_tf(description)
    tfidf_vector = np.zeros(len(unique_words))
    for word in description.split():
        if word in word_index:
            tfidf_vector[word_index[word]] = computed_tf[word] * idf_dict.get(word, 0)
    return tfidf_vector

# Predict genre function
def predict_genre(description):
    description_vector = vectorizer_transform([description])
    predicted_genre = model.predict( description_vector)
    return predicted_genre[0]



index = 86
new_description = test_data['description'][index]
predicted_genre = predict_genre(new_description)
print(f"Predicted Genre: {predicted_genre}")
print(test_data['genre'][index])

Predicted Genre: documentary
comedy


In [112]:
d = {'documentary': {'precision': 0.2803030303030303, 'recall': 0.9487179487179487, 'f1-score': 0.4327485380116959, 'support': 78}, 'animation': {'precision': 0, 'recall': 0.0, 'f1-score': 0, 'support': 3}}
df= pd.DataFrame(d)
df

Unnamed: 0,documentary,animation
precision,0.280303,0.0
recall,0.948718,0.0
f1-score,0.432749,0.0
support,78.0,3.0
