In [None]:
from neo4j import GraphDatabase
import os
import json
from py2neo import Graph, Node, Relationship
import chardet
from datetime import datetime
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from queue import Queue
import time
import pandas as pd

# Thread-local storage for Neo4j connections
thread_local = threading.local()

'''
Include this in your neo4j DB before run the code:
CREATE CONSTRAINT unique_user_id IF NOT EXISTS FOR (u:User) REQUIRE u.user_id IS UNIQUE;
CREATE CONSTRAINT unique_tweet_id IF NOT EXISTS FOR (t:Tweet) REQUIRE t.tweet_id IS UNIQUE;
'''
# Define your connection parameters
uri = "bolt://localhost:7687"
username = "neo4j"
password = "12345678"
database = "neo4j"

# Connection pool settings
MAX_THREADS = 4  # Adjust based on your system's capabilities
MAX_CONNECTIONS = 4  # Match with number of threads

# Thread-safe counter for monitoring progress
processed_items = Queue()

def get_graph_connection():
    """Get or create a thread-local Neo4j connection."""
    if not hasattr(thread_local, "graph"):
        thread_local.graph = Graph(uri, auth=(username, password), name=database)
    return thread_local.graph

def convert_timestamp(timestamp):
    try:
        if not timestamp:
            print("No Stamp Found")
            return None
        return datetime.strptime(timestamp, '%a %b %d %H:%M:%S +0000 %Y').isoformat()
    except Exception as e:
        print(f"Error converting timestamp {timestamp}: {e}")
        return None

def process_annotations(annotations_file):
    with open(annotations_file, 'r') as file:
        return json.load(file)

def preprocess_text(text):
    try:
        # Return empty string if text is None
        if text is None:
            return ""
            
        # Convert to string if not already
        text = str(text)
        
        # Convert to lowercase
        text = text.lower()
        # Remove URLs
        text = re.sub(r'http\S+|www\S+', '', text)
        # Remove mentions and hashtags
        text = re.sub(r'@[A-Za-z0-9_]+|#[A-Za-z0-9_]+', '', text)
        # Remove special characters and punctuation
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        # Remove extra spaces
        text = re.sub(r'\s+', ' ', text).strip()
        
        return text
    except Exception as e:
        print(f"Error in text preprocessing: {e}")
        return ""

def load_annotation(annotation_path):
    """Load the annotation JSON file from the specified path."""
    try:
        if os.path.exists(annotation_path):
            with open(annotation_path, "r") as annotation_file:
                return json.load(annotation_file)
        else:
            print(f"Annotation file not found: {annotation_path}")
            return {}
    except Exception as e:
        print(f"Error loading annotation file {annotation_path}: {e}")
        return {}
'''
def convert_annotations(annotation, string=True, is_rumour=True):
    """Convert annotation dictionary to a label based on its contents."""
    if not annotation or not isinstance(annotation, dict):
        print("No annotation File found")
        return "unknown" if string else 4
    
    if not is_rumour:
        return "non-rumour" if string else 3  # Assign a distinct label for non-rumours

    if 'misinformation' in annotation.keys() and 'true' in annotation.keys():
        if int(annotation['misinformation']) == 0 and int(annotation['true']) == 0:
            return "unverified" if string else 2
        elif int(annotation['misinformation']) == 0 and int(annotation['true']) == 1:
            return "true" if string else 1
        elif int(annotation['misinformation']) == 1 and int(annotation['true']) == 0:
            return "false" if string else 0
        elif int(annotation['misinformation']) == 1 and int(annotation['true']) == 1:
            print("Both misinformation and true are set to 1!")
            return None
'''    
def convert_annotations(annotation, string = True, is_rumour=True):
    if 'misinformation' in annotation.keys() and 'true'in annotation.keys():
        if int(annotation['misinformation'])==0 and int(annotation['true'])==0:
            if string:
                label = "unverified"
            else:
                label = 2
        elif int(annotation['misinformation'])==0 and int(annotation['true'])==1 :
            if string:
                label = "true"
            else:
                label = 1
        elif int(annotation['misinformation'])==1 and int(annotation['true'])==0 :
            if string:
                label = "false"
            else:
                label = 0
        elif int(annotation['misinformation'])==1 and int(annotation['true'])==1:
            print ("OMG! They both are 1!")
            print(annotation['misinformation'])
            print(annotation['true'])
            label = None
            
    elif 'misinformation' in annotation.keys() and 'true' not in annotation.keys():
        # all instances have misinfo label but don't have true label
        if int(annotation['misinformation'])==0:
            if string:
                label = "unverified"
            else:
                label = 2
        elif int(annotation['misinformation'])==1:
            if string:
                label = "false"
            else:
                label = 0
                
    elif 'true' in annotation.keys() and 'misinformation' not in annotation.keys():
        print ('Has true not misinformation')
        label = None
    else:
        print('No annotations')
        label = None
           
    return label

def create_event_node(event_name):
    """Create an event node with a timestamp."""
    graph = get_graph_connection()
    event_node = Node("Event", name=event_name)
    graph.merge(event_node, "Event", "name")
    print(f"Created Event node: {event_name}")
    return event_node

def create_user_node(user_data):
    """Create a user node based on tweet user data."""
    try:
        graph = get_graph_connection()
        user_node = Node(
            "User", 
            user_id=user_data.get("id"), 
            screen_name=user_data.get("screen_name"), 
            verified=user_data.get("verified"),
            follower_count=user_data.get("followers_count", 0),
            description=preprocess_text(user_data.get("description", "")),
            friends_count=user_data.get("friends_count", 0),
            favourites_count=user_data.get("favourite_count", 0),
            created_at=convert_timestamp(user_data.get("created_at")),
            influencer_score=(
                user_data.get("followers_count", 0) * 2 + 
                user_data.get("friends_count", 0) * 0.5 + 
                (100 if user_data.get("verified") else 0)
            )
        )
        graph.merge(user_node, "User", "user_id")
        return user_node
    except Exception as e:
        print(f"Error creating user node: {e}")
        return None

def create_tweet_node(tweet_data, annotation_label):
    """Create a tweet node based on tweet data."""
    try:
        graph = get_graph_connection()

        tweet_text = tweet_data.get("text", "")
        hashtag_count = len(re.findall(r'#\w+', tweet_text)) if pd.notnull(tweet_text) else 0
        mention_count = len(re.findall(r'@\w+', tweet_text)) if pd.notnull(tweet_text) else 0
        url_count = len(re.findall(r'http[s]?://\S+', tweet_text)) if pd.notnull(tweet_text) else 0

        tweet_node = Node(
            "Tweet", 
            tweet_id=tweet_data.get("id_str"),
            
            hashtag_count=hashtag_count,
            mention_count = mention_count,
            url_count=url_count,

            text=preprocess_text(tweet_data.get("text", "")),
            user_id=tweet_data.get("user", {}).get("id_str"),
            favorite_count=tweet_data.get("favorite_count", 0),
            retweet_count=tweet_data.get("retweet_count", 0),
            retweeted=tweet_data.get("retweeted", False),  # Default to False if not provided
            annotation_label=annotation_label  # Default to "unknown" if not present
        )
     
        graph.merge(tweet_node, "Tweet", "tweet_id")

        in_reply_to_status_id = tweet_data.get("in_reply_to_status_id_str")
        if in_reply_to_status_id:
            replied_to_tweet_node = graph.nodes.match("Tweet", tweet_id=in_reply_to_status_id).first()
            if not replied_to_tweet_node:
                replied_to_tweet_node = Node("Tweet", tweet_id=in_reply_to_status_id)
                graph.merge(replied_to_tweet_node, "Tweet", "tweet_id")

            replied_to_relationship = Relationship(tweet_node, "REPLIED_TO", replied_to_tweet_node)
            replied_to_relationship["timestamp"] = convert_timestamp(tweet_data.get("created_at"))
            graph.merge(replied_to_relationship)

        return tweet_node
    except Exception as e:
        print(f"Error creating tweet node: {e}")
        return None

def create_relationships(user_node, tweet_node, event_node, date):
    """Create relationships with thread-local graph connection."""
    if all([user_node, tweet_node, event_node]):
        graph = get_graph_connection()
        try:
            posted_relationship = Relationship(user_node, "WROTE", tweet_node)
            posted_relationship["timestamp"] = date
            graph.merge(posted_relationship)
            related_to_relationship = Relationship(tweet_node, "RELATED_TO", event_node)
            related_to_relationship["start_time"] = date
            graph.merge(related_to_relationship)
        except Exception as e:
            print(f"Error creating relationships: {e}")

def process_tweet_file(file_path, event_node, annotation_label):
    """Process a single tweet file with thread-local graph connection."""
    try:
        with open(file_path, 'rb') as f:
            raw_data = f.read()
            encoding = chardet.detect(raw_data)['encoding']

        with open(file_path, "r", encoding=encoding) as file:
            tweet_data = json.load(file)
            user_data = tweet_data.get("user")
            
            if user_data:
                user_node = create_user_node(user_data)
                tweet_node = create_tweet_node(tweet_data, annotation_label)

                print(f"annotation label of tweet is {tweet_node['annotation_label']}")

                      
                create_relationships(user_node, tweet_node, event_node, convert_timestamp(tweet_data["created_at"]))

                # Process mentions
                graph = get_graph_connection()
                for mentioned_user in tweet_data.get("entities", {}).get("user_mentions", []):
                    mentioned_user_node = Node("User", 
                                            user_id=mentioned_user["id_str"],
                                            screen_name=mentioned_user["screen_name"])
                    graph.merge(mentioned_user_node, "User", "user_id")
                    mention_relationship = Relationship(tweet_node, "MENTIONS", mentioned_user_node)
                    mention_relationship["timestamp"] = convert_timestamp(tweet_data["created_at"])
                    graph.merge(mention_relationship)

                processed_items.put(1)
    except Exception as e:
        print(f"Error processing tweet file {file_path}: {e}")

def process_tweet_folders_threaded(tweet_folder_path, event_name, tweet_id, category, event_node):
    """Process tweet folders using thread pool."""
    annotation_folder = os.path.join(
        "all-rnr-annotated-threads",
        f"{event_name}-all-rnr-threads", category, tweet_id
    )
    annotation_path = os.path.join(annotation_folder, "annotation.json")
    
    annotation_data = load_annotation(annotation_path)
    print(f"Loaded annotation data: {annotation_data}")
    is_rumor = category == "rumours"
    annotation_label = convert_annotations(annotation_data, is_rumour= is_rumor)
    print(f"Annotation label is : {annotation_label}")
    
    #if annotation_label in ["unknown", "invalid", "conflicting"]:
    #     print(f"Note: Using {annotation_label} label for tweet {tweet_id} in {event_name}")


    tasks = []
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        for tweet_type in ['reactions', 'source-tweets']:
            tweet_type_folder_path = os.path.join(tweet_folder_path, tweet_type)
            if os.path.exists(tweet_type_folder_path):
                for filename in os.listdir(tweet_type_folder_path):
                    if filename.endswith(".json") and not filename.startswith("_"):
                        file_path = os.path.join(tweet_type_folder_path, filename)
                        future = executor.submit(
                            process_tweet_file, 
                            file_path, 
                            event_node, 
                            annotation_label
                        )
                        tasks.append(future)

        # Wait for all tasks to complete
        for future in as_completed(tasks):
            try:
                future.result()
            except Exception as e:
                print(f"Error in thread: {e}")

def process_subfolders_threaded(event_path, event_name, event_node):
    """Process subfolders using thread pool."""
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        tasks = []
        for subfolder in ['rumours', 'non-rumours']:
            subfolder_path = os.path.join(event_path, subfolder)
            if os.path.exists(subfolder_path):
                for tweet_id in os.listdir(subfolder_path):
                    tweet_folder_path = os.path.join(subfolder_path, tweet_id)
                    if os.path.isdir(tweet_folder_path):
                        future = executor.submit(
                            process_tweet_folders_threaded,
                            tweet_folder_path,
                            event_name,
                            tweet_id,
                            subfolder,
                            event_node
                        )
                        tasks.append(future)

        # Wait for all tasks to complete
        for future in as_completed(tasks):
            try:
                future.result()
            except Exception as e:
                print(f"Error in thread: {e}")

def process_event_folders(base_directory):
    """Process event folders with progress monitoring."""
    start_time = time.time()
    processed_count = 0

    for event_folder in os.listdir(base_directory):
        event_path = os.path.join(base_directory, event_folder)
        
        if os.path.isdir(event_path):
            event_name = event_folder.replace('-all-rnr-threads', '')
            timestamp = datetime.now().isoformat()
            event_node = create_event_node(event_name)
            
            process_subfolders_threaded(event_path, event_name, event_node)
            
            # Process items from the queue
            while not processed_items.empty():
                processed_items.get()
                processed_count += 1
                
            elapsed_time = time.time() - start_time
            print(f"Processed {processed_count} items in {elapsed_time:.2f} seconds")
            print(f"Average processing rate: {processed_count/elapsed_time:.2f} items/second")

def print_statistics():
    """Print database statistics using thread-local connection."""
    graph = get_graph_connection()
    
    # Query total nodes
    result = graph.run("MATCH (n) RETURN COUNT(n) AS total_nodes").data()
    total_nodes = result[0]['total_nodes'] if result else 0
    print(f"Total number of nodes: {total_nodes}")

    # Query Event nodes
    result = graph.run("MATCH (e:Event) RETURN COUNT(e) AS total_events").data()
    total_events = result[0]['total_events'] if result else 0
    print(f"Total number of events: {total_events}")

    # Query User nodes
    result = graph.run("MATCH (u:User) RETURN COUNT(u) AS total_users").data()
    total_users = result[0]['total_users'] if result else 0
    print(f"Total number of users: {total_users}")

    # Query Tweet nodes
    result = graph.run("MATCH (t:Tweet) RETURN COUNT(t) AS total_tweets").data()
    total_tweets = result[0]['total_tweets'] if result else 0
    print(f"Total number of tweets: {total_tweets}")

if __name__ == "__main__":
    base_directory = "all-rnr-annotated-threads"
    
    try:
        process_event_folders(base_directory)
        print("\nProcessing complete. Generating statistics...")
        print_statistics()
    except Exception as e:
        print(f"An error occurred during processing: {e}")
    finally:
        print("Script execution completed")