In [None]:
import re
import nltk
import os
import pandas as pd
from kafka import KafkaConsumer, KafkaProducer
import io
import threading
import time
import os
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
nltk_data_path = "D:/nltk_data"  # Change this to your desired directory
if not os.path.exists(nltk_data_path):
    os.makedirs(nltk_data_path)

# Append the path to NLTK's data search paths
nltk.data.path.append(nltk_data_path)
# Download the required NLTK data to the custom path
nltk.download('punkt', download_dir=nltk_data_path)
nltk.download('stopwords', download_dir=nltk_data_path)
nltk.download('wordnet', download_dir=nltk_data_path)
nltk.download('omw-1.4', download_dir=nltk_data_path)
from nltk.tokenize import word_tokenize
nltk.data.path.append("D:/nltk_data")

In [None]:
def clean_text(text):
    """Remove HTML tags, special characters, and punctuation."""
    text = re.sub(r'<.*?>', '', text)  # Remove HTML tags
    text = re.sub(r'[^a-zA-Z\s]', '', text)  # Remove special characters and punctuation
    return text

lemmatizer = WordNetLemmatizer()

def lemmatize_tokens(tokens): #running -> run / better -> good
    return [lemmatizer.lemmatize(token) for token in tokens]

def tokenize_text(text): # text = "The dogs are running fast." -> Output: ['The', 'dogs', 'are', 'running', 'fast', '.']
    tokens = word_tokenize(text, language='english', preserve_line=True)
    return tokens

def handle_missing_data(text, placeholder='Missing'): # Input = " " -> #Output: "Missing"
    """Handle missing or noisy data."""
    if pd.isnull(text) or text.strip() == "":
        return placeholder
    return text

def to_lowercase(tokens):
    """Convert tokens to lowercase."""
    return [token.lower() for token in tokens]

def remove_stopwords(tokens): # remove a, an, the, this, etc.
    """Remove stopwords."""
    stop_words = set(stopwords.words('english'))
    return [token for token in tokens if token not in stop_words]

def preprocess_text(text):
    """Complete text preprocessing pipeline."""
    text = handle_missing_data(text)  # Handle missing or noisy data
    text = clean_text(text)          # Clean text (remove unwanted characters)
    tokens = tokenize_text(text)     # Tokenize text
    tokens = to_lowercase(tokens)    # Convert to lowercase
    tokens = remove_stopwords(tokens)  # Remove stopwords
    tokens = lemmatize_tokens(tokens)  # Or stem_tokens(tokens) for stemming
    return ' '.join(tokens)      

In [None]:
kafka_broker = 'localhost:29092'
consumer = KafkaConsumer(
    'processed_data',
    bootstrap_servers=[kafka_broker],
    enable_auto_commit=True,
    value_deserializer=lambda x: x.decode('utf-8')
)
producer = KafkaProducer(
    bootstrap_servers=[kafka_broker],
    linger_ms=5000,
    acks='all',
    max_block_ms=60000
)

In [None]:
def consume_and_send():
    output_file_path = 'preprocess_data.csv'

    try:
        print("Waiting for data")
        for message in consumer:
            print(message )
            print(f"Received message: [{message.timestamp}:{message.offset}] {message.value}")
            
            # Write the received message (CSV content) to a file
            with open(output_file_path, 'w') as f:
                f.write(message.value)  # Save the content to the file

            print(f"Message saved to {output_file_path}")
            
            # Read the CSV file
            df = pd.read_csv(output_file_path)
            
            # Data cleaning steps
            df['title'] = df['title'].apply(preprocess_text)
            df['categories/keyword'] = df['categories/keyword'].apply(preprocess_text)
            df['author_tags'] = df['author_tags'].apply(preprocess_text)
            # Save the cleaned DataFrame
            
            # Save cleaned data to a new CSV
            df.to_csv(output_file_path , index=False)
            with open(output_file_path, 'r') as csvfile:
                csv_content = csvfile.read()  # Read entire file content
            print(f"Sending CSV content to Kafka: {csv_content[:100]}...")  # Log first 100 characters
            producer.send('for_ML_data', csv_content.encode('utf-8'))
            time.sleep(2)
            producer.flush()

    except Exception as e:
        print(f"Error while consuming: {e}")

try:
    consume_and_send()
except Exception as e:
    print(f"Error while calling: {e}")
#         retry_count += 1  # Increment retry count for unexpected errors

In [None]:
df = pd.read_csv('2018_cleaned_ml.csv')
df['title'] = df['title'].apply(preprocess_text)
df['categories/keyword'] = df['categories/keyword'].apply(preprocess_text)
df['author_tags'] = df['author_tags'].apply(preprocess_text)
df.head()

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer

tfidf_vectorizer_titles = TfidfVectorizer(max_features=10000, min_df=0.00, max_df=0.01)
df['title_tfidf'] = list(tfidf_vectorizer_titles.fit_transform(df['title']).toarray())

# Sentence Embeddings for abstracts
# df['tags_tfidf'] = list(tfidf_vectorizer.fit_transform(df['author_tags']).toarray())

# Save extracted features
df.to_pickle("processed_features.pkl")
# df['author_tags'].head()
# df[['title_tfidf', 'tags_tfidf']].head()
# Get the feature names (words) from the vectorizer
vocab = tfidf_vectorizer_titles.get_feature_names_out()
title_tfidf_vector = df['title_tfidf'][1]  # TF-IDF vector
nonzero_indices = [i for i, value in enumerate(title_tfidf_vector) if value != 0]

# Print words and their corresponding TF-IDF scores
for index in nonzero_indices:
    print(f"Word: {vocab[index]}, TF-IDF Score: {title_tfidf_vector[index]}")

In [None]:
# Apply preprocessing to specific columns (e.g., 'title' and 'abstract')
df['title'] = df['title'].apply(preprocess_text)
df['categories/keyword'] = df['categories/keyword'].apply(preprocess_text)
df['author_tags'] = df['author_tags'].apply(preprocess_text)



# Save the cleaned DataFrame
df.to_csv('data/2018_cleaned_processed.csv', index=False)

In [None]:
new_df = pd.read_csv('data/2018_cleaned_processed.csv')
new_df.head()

In [None]:
# Example pipeline for feature extraction
# TF-IDF for titles
tfidf_vectorizer_titles = TfidfVectorizer(max_features=10000, min_df=0.01, max_df=0.8) #exclude > 80% duplicate and < 1%
df['title_tfidf']  = list(tfidf_vectorizer_titles.fit_transform(df['title']).toarray())

X_tfidf = tfidf_vectorizer_titles.fit_transform(df['title']).toarray()

# Get the feature names (the words)
header = tfidf_vectorizer_titles.get_feature_names_out()

# Convert the TF-IDF array to a DataFrame
df_tfidf = pd.DataFrame(X_tfidf, columns=header)

# Now you can add this DataFrame back to your original DataFrame
df = pd.concat([df, df_tfidf], axis=1)
df = df.drop(columns=['title_tfidf'])
df.to_csv("Test_TFIDF.csv", index=False)

df.head()
# sample_tfidf = tfidf_vectorizer_titles.fit_transform()


# # Sentence Embeddings for abstracts
# # df['tags_tfidf'] = list(tfidf_vectorizer.fit_transform(df['author_tags']).toarray())

# # Save extracted features
# df.to_pickle("processed_features.pkl")
# # df['author_tags'].head()
# # df[['title_tfidf', 'tags_tfidf']].head()

# print(name_tfidf)

In [None]:
tfidf_vectorizer_titles = TfidfVectorizer(max_features=5000, min_df=0.01, max_df=0.8)
title_tfidf = tfidf_vectorizer_titles.fit_transform(df['title'])

# TF-IDF for tags
tfidf_vectorizer_tags = TfidfVectorizer(max_features=5000, min_df=0.01, max_df=0.8)
tags_tfidf = tfidf_vectorizer_tags.fit_transform(df['author_tags'])

# Optionally, get the feature names (terms)
title_feature_names = tfidf_vectorizer_titles.get_feature_names_out()
tags_feature_names = tfidf_vectorizer_tags.get_feature_names_out()

In [None]:
from scipy.sparse import hstack

# Combine the separate TF-IDF features into one feature matrix
X = hstack([title_tfidf, tags_tfidf])

# Convert the combined features to a dense format (optional, depending on model)
# X = X.toarray()  # Only if your model requires dense arrays, some models like Logistic Regression do
X.toarray()