In [None]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from transformers import AutoModelForCausalLM, AutoTokenizer, DistilBertTokenizer, DistilBertModel
from sklearn.feature_extraction.text import TfidfVectorizer
import nltk
from nltk.tokenize import sent_tokenize
from nltk.corpus import stopwords
import re
import os
import numpy as np
import time
import json
from datetime import datetime
from IPython.display import display, clear_output, HTML
import ipywidgets as widgets
import networkx as nx


In [None]:

# Download NLTK resources
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)


True

In [None]:
# Set environment variable to avoid tokenizers parallelism warning
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
# Blog Classifier Model
class BlogClassifier(nn.Module):
    def __init__(self, pretrained_model_name="distilbert-base-uncased", num_classes=4):
        super(BlogClassifier, self).__init__()
        self.base_model = DistilBertModel.from_pretrained(pretrained_model_name)
        self.dropout = nn.Dropout(0.1)

        # Custom classification head with intermediate layer
        self.classifier = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits

In [None]:



class BlogAnalysisPipeline:
    def __init__(self, repo_url="https://github.com/siddhamapple/instruction_tuned_model"):
        """Initialize the complete blog analysis pipeline."""
        print("Loading models...")

        # Clone repository if needed
        if not os.path.exists("instruction_tuned_model"):
            os.system(f"git clone {repo_url}")

        # Initialize the blog classifier
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")
        self.classifier = self.load_classifier("instruction_tuned_model/blog_classifier/blog_classifier_model.pt")
        self.bert_tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
        self.categories = ['World', 'Sports', 'Business', 'Sci/Tech']

        # Initialize the instruction model - try Alpaca model first, then fall back to regular model
        try:
            self.tokenizer = AutoTokenizer.from_pretrained("instruction_tuned_model/models_alpaca")
            self.model = AutoModelForCausalLM.from_pretrained("instruction_tuned_model/models_alpaca")
            print("Using Alpaca-trained instruction model")
        except:
            self.tokenizer = AutoTokenizer.from_pretrained("instruction_tuned_model/models")
            self.model = AutoModelForCausalLM.from_pretrained("instruction_tuned_model/models")
            print("Using base instruction model")

        self.model.to(self.device)

        # Initialize the TF-IDF vectorizer for keyword extraction
        self.vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, 2))

        # Create directory for saved results
        os.makedirs("analysis_results", exist_ok=True)

        print("All components loaded successfully!")

    def load_classifier(self, model_path):
        """Load the blog classifier model."""
        model = BlogClassifier(num_classes=4).to(self.device)
        model.load_state_dict(torch.load(model_path, map_location=self.device))
        model.eval()
        return model

    def classify_blog(self, blog_text):
        """Classify the blog post using the classifier model."""
        # Tokenize the input text
        encoding = self.bert_tokenizer(
            blog_text,
            truncation=True,
            max_length=128,
            padding='max_length',
            return_tensors='pt'
        )

        input_ids = encoding['input_ids'].to(self.device)
        attention_mask = encoding['attention_mask'].to(self.device)

        # Get prediction
        with torch.no_grad():
            logits = self.classifier(input_ids, attention_mask)
            _, predicted = torch.max(logits, 1)
            pred_idx = predicted.item()

        # Return the predicted category
        return self.categories[pred_idx]

    def extract_keywords_tfidf(self, text, n=5):
        """Extract keywords from text using TF-IDF."""
        # Clean text
        text = re.sub(r'[^\w\s]', ' ', text.lower())

        try:
            # Fit and transform the text
            tfidf_matrix = self.vectorizer.fit_transform([text])
            feature_names = self.vectorizer.get_feature_names_out()

            # Get top keywords based on TF-IDF scores
            tfidf_scores = zip(feature_names, tfidf_matrix.toarray()[0])
            sorted_keywords = sorted(tfidf_scores, key=lambda x: x[1], reverse=True)

            # Return top N keywords with scores
            return [(keyword, score) for keyword, score in sorted_keywords[:n]]
        except:
            # Fallback to simple word frequency if TF-IDF fails
            words = text.split()
            stop_words = set(stopwords.words('english'))
            word_freq = {}
            for word in words:
                if len(word) > 3 and word not in stop_words:
                    word_freq[word] = word_freq.get(word, 0) + 1

            sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
            return [(word, freq/len(words)) for word, freq in sorted_words[:n]]

    def process_instruction(self, blog_text, instruction, max_length=512):
        """Process an instruction on the blog text."""
        # Format input with instruction template
        prompt = f"""
Below is a blog post. {instruction}

Blog: {blog_text}

Response:
"""

        # Generate response
        inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024)
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        # Generate with parameters to avoid repetition
        output = self.model.generate(
            **inputs,
            max_length=len(inputs["input_ids"][0]) + max_length,
            temperature=0.7,
            top_p=0.9,
            do_sample=True,
            repetition_penalty=1.5,
            no_repeat_ngram_size=3,
            pad_token_id=self.tokenizer.eos_token_id
        )

        response = self.tokenizer.decode(output[0], skip_special_tokens=True)

        # Extract just the response part
        response_text = response.split("Response:")[-1].strip()

        # Post-process to remove repetitions
        response_text = re.sub(r'(.{30,}?)\1+', r'\1', response_text)

        return response_text

    def generate_mindmap(self, text, central_topic, n=5):
        """Generate a mind map from the provided text."""
        # Extract keywords using TF-IDF
        keywords = self.extract_keywords_tfidf(text, n)

        # Create graph
        G = nx.Graph()

        # Add central node (blog category)
        G.add_node(central_topic, size=30, color='#FF5733')  # Vibrant orange for center

        # Add keyword nodes directly connected to the central node
        for i, (keyword, score) in enumerate(keywords):
            # Scale node size based on importance
            node_size = 15 + 10 * score  # Larger nodes for more important keywords

            # Color based on importance
            color_intensity = min(0.9, 0.5 + score * 0.5)  # Between 0.5 and 0.9
            color = plt.cm.Blues(color_intensity)

            G.add_node(keyword, size=node_size, color=color, score=score)
            G.add_edge(central_topic, keyword, weight=score*5)

        return G, keywords

    def visualize_mindmap(self, graph, keywords):
        """Visualize the mind map graph."""
        plt.figure(figsize=(10, 10))

        # Get central node (first node)
        central_node = list(graph.nodes())[0]

        # Create a radial layout
        pos = nx.spring_layout(graph, k=0.5, iterations=50, seed=42)

        # Force central node to center
        pos[central_node] = np.array([0.5, 0.5])

        # Get node attributes
        node_sizes = [graph.nodes[node].get('size', 10)**2 for node in graph.nodes()]
        node_colors = [graph.nodes[node].get('color', 'lightblue') for node in graph.nodes()]

        # Get edge weights for width
        edge_weights = [graph[u][v].get('weight', 1) for u, v in graph.edges()]

        # Draw edges with varying width based on weight
        nx.draw_networkx_edges(
            graph, pos,
            width=[w/5 for w in edge_weights],
            alpha=0.7,
            edge_color='gray'
        )

        # Draw nodes
        nx.draw_networkx_nodes(
            graph, pos,
            node_size=node_sizes,
            node_color=node_colors,
            alpha=0.9
        )

        # Draw labels with different font sizes
        # Central node with larger font
        central_label = {central_node: central_node}
        nx.draw_networkx_labels(graph, pos, labels=central_label, font_size=16, font_weight='bold')

        # Keyword nodes with size based on importance
        keyword_labels = {node: node for node in graph.nodes() if node != central_node}
        nx.draw_networkx_labels(graph, pos, labels=keyword_labels, font_size=12)

        plt.axis('off')
        plt.tight_layout()

        # Add a title
        plt.title(f"Keywords for {central_node}", fontsize=16, pad=20)

        # Add a legend showing top 3 keywords with scores
        legend_text = "Top Keywords:\n" + "\n".join([f"{kw} ({score:.3f})" for kw, score in keywords[:3]])
        plt.figtext(0.02, 0.02, legend_text, fontsize=12, bbox=dict(facecolor='white', alpha=0.8))

        return plt

    def process_blog(self, blog_text, instruction_type="summarize", n=3):
        """Process a blog post with the complete pipeline."""
        # Step 1: Classify the blog
        category = self.classify_blog(blog_text)
        print(f"Blog Category: {category}")

        # Step 2: Process the instruction
        if instruction_type == "summarize":
            instruction = f"Summarize this blog in {n} sentences."
            result = self.process_instruction(blog_text, instruction)
            return {
                "category": category,
                "summary": result
            }
        elif instruction_type == "keywords":
            # For keywords, use our TF-IDF extractor directly
            keywords_with_scores = self.extract_keywords_tfidf(blog_text, n)
            keywords = [kw for kw, _ in keywords_with_scores]

            # Also generate a mindmap visualization for keywords
            graph, kw_with_scores = self.generate_mindmap(blog_text, category, n)
            plt = self.visualize_mindmap(graph, kw_with_scores)

            return {
                "category": category,
                "keywords": keywords,
                "mind_map_plot": plt
            }
        elif instruction_type == "mindmap":
            # Generate a keyword-focused mindmap
            graph, keywords = self.generate_mindmap(blog_text, category, n)
            plt = self.visualize_mindmap(graph, keywords)

            return {
                "category": category,
                "keywords": [kw for kw, _ in keywords],
                "mind_map_plot": plt
            }
        elif instruction_type == "main_points":
            instruction = f"Extract the {n} main points from this blog."
            result = self.process_instruction(blog_text, instruction)
            return {
                "category": category,
                "result": result
            }
        else:
            # Custom instruction
            result = self.process_instruction(blog_text, instruction_type)
            return {
                "category": category,
                "result": result
            }

    def save_results(self, result, blog_text, instruction_type):
        """Save analysis results to files."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        base_filename = f"analysis_results/blog_analysis_{timestamp}"

        # Save results as JSON
        json_data = {
            "timestamp": timestamp,
            "category": result.get("category", ""),
            "instruction_type": instruction_type,
            "blog_text": blog_text[:500] + "..." if len(blog_text) > 500 else blog_text
        }

        if "summary" in result:
            json_data["summary"] = result["summary"]
        if "keywords" in result:
            json_data["keywords"] = result["keywords"]
        if "result" in result:
            json_data["result"] = result["result"]

        # Save JSON file
        with open(f"{base_filename}.json", "w") as f:
            json.dump(json_data, f, indent=2)

        # Save HTML report
        html_content = f"""
        <html>
        <head>
            <title>Blog Analysis Results</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                h1 {{ color: #333; }}
                h2 {{ color: #666; }}
                .category {{ background-color: #f0f0f0; padding: 10px; border-radius: 5px; }}
                .summary {{ background-color: #e6f7ff; padding: 10px; border-radius: 5px; }}
                .keywords {{ background-color: #f0fff0; padding: 10px; border-radius: 5px; }}
                .result {{ background-color: #fff0f0; padding: 10px; border-radius: 5px; }}
                .blog-text {{ background-color: #f9f9f9; padding: 15px; border: 1px solid #ddd;
                             border-radius: 5px; margin-top: 20px; white-space: pre-wrap; }}
            </style>
        </head>
        <body>
            <h1>Blog Analysis Results</h1>
            <p>Analysis performed on: {timestamp}</p>

            <h2>Category</h2>
            <div class="category">{result.get("category", "")}</div>
        """

        if "summary" in result:
            html_content += f"""
            <h2>Summary</h2>
            <div class="summary">{result["summary"]}</div>
            """

        if "keywords" in result:
            html_content += f"""
            <h2>Keywords</h2>
            <div class="keywords">{', '.join(result["keywords"])}</div>
            """

        if "result" in result:
            html_content += f"""
            <h2>Result</h2>
            <div class="result">{result["result"]}</div>
            """

        html_content += f"""
            <h2>Original Blog Text</h2>
            <div class="blog-text">{blog_text}</div>
        </body>
        </html>
        """

        with open(f"{base_filename}.html", "w") as f:
            f.write(html_content)

        # Save mindmap if present
        if "mind_map_plot" in result:
            result["mind_map_plot"].savefig(f"{base_filename}_mindmap.png")

        return {
            "json_path": f"{base_filename}.json",
            "html_path": f"{base_filename}.html",
            "mindmap_path": f"{base_filename}_mindmap.png" if "mind_map_plot" in result else None
        }

In [None]:


# Create a simple interface
def create_interface():
    # Create input widget for blog text
    blog_input = widgets.Textarea(
        value='',
        placeholder='Enter your blog post here...',
        description='Blog:',
        disabled=False,
        layout=widgets.Layout(width='100%', height='200px')
    )

    # Create dropdown for instruction type
    instruction_dropdown = widgets.Dropdown(
        options=['summarize', 'keywords', 'mindmap', 'main_points', 'custom'],
        value='summarize',
        description='Task:',
        disabled=False,
    )

    # Create slider for count
    count_slider = widgets.IntSlider(
        value=3,
        min=1,
        max=10,
        step=1,
        description='Count:',
        disabled=False,
        continuous_update=False,
        orientation='horizontal',
        readout=True,
        readout_format='d'
    )

    # Create text input for custom instruction
    custom_instruction = widgets.Text(
        value='',
        placeholder='Enter custom instruction here...',
        description='Custom:',
        disabled=True,
        layout=widgets.Layout(width='100%')
    )

    # Create save format selector
    save_format = widgets.Dropdown(
        options=['html', 'json', 'both'],
        value='both',
        description='Save format:',
        disabled=False,
    )

    # Function to toggle custom instruction field
    def toggle_custom_field(change):
        custom_instruction.disabled = change.new != 'custom'

    instruction_dropdown.observe(toggle_custom_field, names='value')

    # Create output area
    output_area = widgets.Output()

    # Create buttons
    analyze_button = widgets.Button(description="Analyze Blog")
    save_button = widgets.Button(description="Save Results", disabled=True)

    # Initialize pipeline
    try:
        pipeline = BlogAnalysisPipeline()
        initialization_status = widgets.HTML(value="<span style='color:green'>✓ Pipeline initialized successfully</span>")
    except Exception as e:
        pipeline = None
        initialization_status = widgets.HTML(value=f"<span style='color:red'>✗ Error initializing pipeline: {str(e)}</span>")

    # Store the last result
    last_result = {}
    last_blog_text = ""
    last_instruction_type = ""

    # Define button click handlers
    def on_analyze_click(b):
        nonlocal last_result, last_blog_text, last_instruction_type

        with output_area:
            clear_output()

            if pipeline is None:
                print("Error: Pipeline not initialized. Please check the error message above.")
                return

            if not blog_input.value.strip():
                print("Error: Please enter a blog post to analyze.")
                return

            print("Processing blog...")

            # Get instruction
            if instruction_dropdown.value == 'custom':
                if not custom_instruction.value.strip():
                    print("Error: Please enter a custom instruction.")
                    return
                instruction = custom_instruction.value
            else:
                instruction = instruction_dropdown.value

            # Process blog
            try:
                start_time = time.time()
                result = pipeline.process_blog(
                    blog_input.value,
                    instruction_type=instruction,
                    n=count_slider.value
                )
                end_time = time.time()

                # Store the result for saving later
                last_result = result
                last_blog_text = blog_input.value
                last_instruction_type = instruction
                save_button.disabled = False

                # Display results
                print(f"Category: {result['category']}")
                print(f"Processing time: {end_time - start_time:.2f} seconds")

                if 'summary' in result:
                    print(f"\nSummary:\n{result['summary']}")
                elif 'keywords' in result:
                    print(f"\nKeywords: {', '.join(result['keywords'])}")
                elif 'result' in result:
                    print(f"\nResult:\n{result['result']}")

                if 'mind_map_plot' in result:
                    display(result['mind_map_plot'].figure)
            except Exception as e:
                import traceback
                print(f"Error processing blog: {str(e)}")
                print(traceback.format_exc())

    def on_save_click(b):
        with output_area:
            if not last_result:
                print("No results to save. Please analyze a blog first.")
                return

            print("Saving results...")
            try:
                saved_paths = pipeline.save_results(
                    last_result,
                    last_blog_text,
                    last_instruction_type
                )

                print("Results saved to:")
                for key, path in saved_paths.items():
                    if path:
                        print(f"- {key}: {path}")

                # Create download links
                html = "<h3>Download Links:</h3>"
                for key, path in saved_paths.items():
                    if path:
                        filename = os.path.basename(path)
                        html += f'<p><a href="{path}" download="{filename}" target="_blank">{filename}</a></p>'

                display(HTML(html))
            except Exception as e:
                import traceback
                print(f"Error saving results: {str(e)}")
                print(traceback.format_exc())

    analyze_button.on_click(on_analyze_click)
    save_button.on_click(on_save_click)

    # Display all widgets
    display(initialization_status)
    display(blog_input)
    display(widgets.HBox([instruction_dropdown, count_slider]))
    display(custom_instruction)
    display(widgets.HBox([analyze_button, save_button, save_format]))
    display(output_area)



# Run the interface
create_interface()


Loading models...
Using device: cpu
Using Alpaca-trained instruction model
All components loaded successfully!


HTML(value="<span style='color:green'>✓ Pipeline initialized successfully</span>")

Textarea(value='', description='Blog:', layout=Layout(height='200px', width='100%'), placeholder='Enter your b…

HBox(children=(Dropdown(description='Task:', options=('summarize', 'keywords', 'mindmap', 'main_points', 'cust…

Text(value='', description='Custom:', disabled=True, layout=Layout(width='100%'), placeholder='Enter custom in…

HBox(children=(Button(description='Analyze Blog', style=ButtonStyle()), Button(description='Save Results', dis…

Output()