In [1]:
import xml.etree.ElementTree as ET
input_files = ["users", "badges", "tags", "posts", "posttags", "comments", "dummy"]

db_config = {
    'host': 'localhost',
    'database': 'ubuntu2',
    'user': 'postgres',
    'password': 'root',
    'port': '5432'
}

data_directory = 'C:\\Users\\mr2714\\OneDrive - rit.edu\\Python_Projects\\BigData\\data\\'

chunk_size = 5

print("hello")

hello


In [2]:
posts_query = """
    INSERT INTO posts (Id, ParentId, OwnerUserId, AcceptedAnswerId, Title, Body, Score, ViewCount, CreationDate)
    VALUES %s
    """
post_tags_query = """
    INSERT INTO posttags (PostId, TagId)
    VALUES %s
    """

In [3]:
import psycopg2
import psycopg2.extras as extras
import os

def connect():
    return psycopg2.connect(
        dbname=db_config['database'],
        user=db_config['user'],
        password=db_config['password'],
        host=db_config['host'],
        port=db_config['port']
    )

# method to connect and execute an sql file script for database
def exec_sql_file(path):
    # full_path = os.path.join(os.path.dirname(__file__), f'./{path}')
    current_directory = os.getcwd()
    full_path = os.path.join(current_directory, f'./{path}')
    conn = connect()
    cur = conn.cursor()
    with open(full_path, 'r') as file:
        cur.execute(file.read())
    conn.commit()
    conn.close()

# method to connect and get one record from db; based on provide query
def exec_get_one(sql, args={}):
    conn = connect()
    cur = conn.cursor()
    cur.execute(sql, args)
    one = cur.fetchone()
    conn.close()
    return one

# method to connect and get all records from db; based on provide query
def exec_get_all(sql, args={}):
    conn = connect()
    cur = conn.cursor()
    cur.execute(sql, args)
    # https://www.psycopg.org/docs/cursor.html#cursor.fetchall
    list_of_tuples = cur.fetchall()
    conn.close()
    return list_of_tuples

# method to execute a query; based on provided query
def exec_commit(sql, args={}):
    conn = connect()
    cur = conn.cursor()
    result = cur.execute(sql, args)
    conn.commit()
    conn.close()
    return result

# method to excute a bulk query (i.e., to insert chunk of data togather) based on provided query and input data
def execute_df_values(sql, tuples):
    conn = connect()
    cur = conn.cursor()
    result = extras.execute_values(cur, sql, tuples)
    conn.commit()
    conn.close()
    return result


In [4]:
def create_schema():
    # Run the script file to create schema. 
    exec_sql_file('create_schema.sql')

# create_schema()

In [5]:
# This method removes entries with non-existent FK Ids from chunk_data.
def remove_invalid_entries_links(chunk_data, valid_ids, loc):
    # return only data that contains valid ids
    return [data for data in chunk_data if int(data[loc]) in valid_ids]

# This methods removes entries with non-existent FK Ids from chunk_data but ignore entries where the FK Id is None.
def remove_invalid_entries_links_ignore_none(chunk_data, valid_ids, loc):
    # return only data that contains valid ids and None ids
    return [data for data in chunk_data if data[loc] is None or int(data[loc]) in valid_ids]

# This method extract ids from chunk_data on give location to compare FK ids with existing Ids in the database.
def extract_ids_from_chunk(chunk_data, loc):
    # return ids extracted from given location from chunk_data
    return [int(data[loc]) for data in chunk_data]


# This method extract ids from chunk_data on give location to compare FK ids with existing Ids in the database. Only includes non-None values.
def extract_ids_from_chunk_none(chunk_data, loc):
    # # return ids extracted from given location from chunk_data only where is not None
    return [int(data[loc]) for data in chunk_data if data[loc] is not None]

# This method checks which ids are valid by querying the database and returns only valid_ids to ensure fk is not violated.
def check_valid_fk_ids(table, ids):
    valid_ids = set()
    if ids:
        # Make sure that only to check for valid table names to avoid SQL injection. 
        if table.lower() not in {t.lower() for t in input_files}:
            raise ValueError("Invalid table name")
        sql = "SELECT Id FROM " + table + " WHERE Id IN %s"
        result = exec_get_all(sql, (tuple(ids),))
        valid_ids = [row[0] for row in result]
    return valid_ids

def check_valid_fk_ids_from_chunk(chunk_data, ids):
    valid_ids = set()
    if ids:
        ids_set = set(ids)
        valid_ids = {data[0] for data in chunk_data if data[0] in ids_set}
    return valid_ids

# This method removes entries with non-existent FK Ids from chunk_data.
def remove_invalid_entries_links_chunked(chunk_data, valid_ids, loc, chunk_size=5000):
    # return only data that contains valid ids and None ids
    valid_ids_set = set(valid_ids)
    filtered_data = []
    # Process in chunks
    for i in range(0, len(chunk_data), chunk_size):
        chunk = chunk_data[i:i + chunk_size]
        filtered_data.extend(data for data in chunk if int(data[loc]) in valid_ids_set)
    
    return filtered_data

In [6]:
def report_db_statistics():
    # loop over all the tables
    for table in input_files:
        # input files are known to avoid SQL injection
        query = "SELECT COUNT(*) FROM " + table + ";"
        result = exec_get_one(query)
        if result:
            print("Table: ", table, " Record Inserted: ", result[0])
report_db_statistics()

Table:  users  Record Inserted:  1450497
Table:  badges  Record Inserted:  1977517
Table:  tags  Record Inserted:  3155
Table:  posts  Record Inserted:  909005
Table:  posttags  Record Inserted:  1130897
Table:  comments  Record Inserted:  616366
Table:  dummy  Record Inserted:  910366


In [7]:
def remove_self_bulk_post(chunk_data, loc):
    # make s set of valid Ids (postIds)
    post_ids = {data[0] for data in chunk_data}
    # make a set of valid Ids based on location 1/3, excluding None
    parent_ids = {data[loc] for data in chunk_data if data[loc] is not None}
    # Find invalid Ids
    invalid_ids = parent_ids - post_ids
    # filter data to remove invalid Ids
    filtered_chunk_data = [data for data in chunk_data if data[loc] not in invalid_ids]
    return filtered_chunk_data

# This method inserts data in a dummy table to ensure that correct data is entered. 
# This is done to avoid memory; as reading whole post has body and tags which can get really big. 
def dummy_posts_insert(input_file):
    import xml.etree.ElementTree as ET

    input_file = data_directory + input_file
    context = ET.iterparse(input_file, events=("start", "end"))

    chunk_data = []
    # read the whole data into chunk 
    for event, elem in context:
        if elem.tag == 'row' and elem.get('Id') is not None and elem.get('OwnerUserId') is not None: 
                # Get posts data to insert into posts
            element_data = (
                    elem.get('Id'), elem.get('ParentId'),
                    elem.get('OwnerUserId'), elem.get('AcceptedAnswerId')
                )
            chunk_data.append(element_data)
            elem.clear()
    # remove posts with invalid parent id
    filter1_parents = remove_self_bulk_post(chunk_data, 1)
    # remove posts with invlaid users
    user_ids = extract_ids_from_chunk(filter1_parents, 2)
    valid_user_ids = check_valid_fk_ids('users', user_ids)
    filter_users = remove_invalid_entries_links_chunked(filter1_parents, valid_user_ids, 2)
    # remove posts with invalid answerid
    filter_answers = remove_self_bulk_post(filter_users, 3)
    # delete table and recreate a dummy 
    dummy_query = "INSERT INTO dummy (Id, ParentId, OwnerUserId, AcceptedAnswerId) VALUES %s"
    # insert data into dummy table 
    execute_df_values(dummy_query, filter_answers)

In [8]:
# This method to insert post tags 
# get the input file; make sure the data directory is correct and the input file is present in the data directory;
# otherwise the execution will fail, this is applicable for all insert methods
def insert_post_tags(post_tags_chunk, pt_query):
    # first get all tags as they are very small to avoid excessive database connections
    tags = exec_get_all("select Id, TagName from tags")
    # create map for easier test
    tag_map = {tagname: tag_id for tag_id, tagname in tags}
    processed_chunk_data = []
    for post_id, tags_str in post_tags_chunk:
        if tags_str is not None:
            # clean data to get individual tags
            tag_list = tags_str.replace('<', ' ').replace('>', ' ').split()
            for tag in tag_list:
                # get id for each tag
                tag_id = tag_map.get(tag) 
                if tag_id is not None:
                    # we already know that post id is not null; so add it to chunk data if tag id is also not null
                    processed_chunk_data.append((int(post_id), int(tag_id)))
    # get the FK ids from the chunck; only inserting tags where post and tag ids exist
    post_ids = extract_ids_from_chunk(processed_chunk_data, -2)
    # compare ids with the ids already existing in the primary table for FK ids; provide the primary table; valid_ids are returned as set
    valid_post_ids = check_valid_fk_ids('posts', post_ids)
    # filter out invalid ids; provide the FK index 
    valid_chunk_data = remove_invalid_entries_links(processed_chunk_data, valid_post_ids, -2)
    # get the FK ids from the chunck; only inserting tags where post and tag ids exist
    tag_ids = extract_ids_from_chunk(valid_chunk_data, -1)
    # compare ids with the ids already existing in the primary table for FK ids; provide the primary table; valid_ids are returned as set
    valid_tag_ids = check_valid_fk_ids('tags', tag_ids)
    # get the FK ids from the chunck; provide the FK index
    valid_chunk_data = remove_invalid_entries_links(valid_chunk_data, valid_tag_ids, -1)
    # insert the valid data
    execute_df_values(pt_query, valid_chunk_data)

In [10]:
def insert_posts(input_file, query, pt_query, max_chunks=5):
    # first do a dummy for safe entry
    dummy_posts_insert(input_file)
    
    input_file = data_directory + input_file
    context = ET.iterparse(input_file, events=("start", "end"))
    
    chunk_count = 0
    elements_in_chunk = 0
    chunk_data = []
    post_tags_chunk = []

    for event, elem in context:
        if elem.tag == 'row':
            if elem.get('Id') is not None and elem.get('OwnerUserId') is not None: 
                # get posts data to insert into posts
                element_data = (
                    elem.get('Id'),elem.get('ParentId'),
                    elem.get('OwnerUserId'),elem.get('AcceptedAnswerId'),
                    elem.get('Title'),elem.get('Body'),
                    elem.get('Score'),elem.get('ViewCount'),
                    elem.get('CreationDate')
                )
                # get tags data to process post tags and then insert into post tags
                tags_data = (
                    elem.get('Id'),
                    elem.get('Tags')
                )
                chunk_data.append(element_data)
                post_tags_chunk.append(tags_data)
                elements_in_chunk += 1

            if elements_in_chunk >= chunk_size:
                chunk_count += 1
                #check valid users for posts
                user_ids = extract_ids_from_chunk(chunk_data, -7)
                valid_user_ids = check_valid_fk_ids('users', user_ids)
                valid_chunk_data = remove_invalid_entries_links(chunk_data, valid_user_ids, -7)
                # check valid parents
                parent_ids = extract_ids_from_chunk_none(valid_chunk_data, -8)
                valid_parent_ids = check_valid_fk_ids('dummy', parent_ids)
                valid_chunk_data = remove_invalid_entries_links_ignore_none(valid_chunk_data, valid_parent_ids, -8)
                # check valid answers
                answer_ids = extract_ids_from_chunk_none(valid_chunk_data, -6)
                valid_answer_ids = check_valid_fk_ids('dummy', answer_ids)
                valid_chunk_data = remove_invalid_entries_links_ignore_none(valid_chunk_data, valid_answer_ids, -6)
                # insert the valid data in posts
                execute_df_values(query, valid_chunk_data)
                # insert post_tags ; post tags processing happens in the insert post tag method
                insert_post_tags(post_tags_chunk, pt_query)
                chunk_data = []
                post_tags_chunk = []
                elements_in_chunk = 0

            elem.clear()

    if elements_in_chunk > 0:
        chunk_count += 1
        # extract valid users ids are added
        user_ids = extract_ids_from_chunk(chunk_data, -7)
        valid_user_ids = check_valid_fk_ids('users', user_ids)
        valid_chunk_data = remove_invalid_entries_links(chunk_data, valid_user_ids, -7)
        # check valid parents
        parent_ids = extract_ids_from_chunk_none(valid_chunk_data, -8)
        valid_parent_ids = check_valid_fk_ids('dummy', parent_ids)
        valid_chunk_data = remove_invalid_entries_links_ignore_none(valid_chunk_data, valid_parent_ids, -8)

        # check valid answers
        answer_ids = extract_ids_from_chunk_none(valid_chunk_data, -6)
        valid_answer_ids = check_valid_fk_ids('dummy', answer_ids)
        valid_chunk_data = remove_invalid_entries_links_ignore_none(valid_chunk_data, valid_answer_ids, -6)
        # insert the valid data in posts
        execute_df_values(query, valid_chunk_data)
        # insert post_tags ; post tags processing happens in the insert post tag method
        insert_post_tags(post_tags_chunk, pt_query)
    
    
    print(f"Processed {chunk_count} chunks for about {chunk_size} elements.")

create_schema()
chunk_size = 10000
insert_posts("Posts.xml", posts_query, post_tags_query)


Creating FK for parent and answer id after insertion
Processed 93 chunks for about 10000 elements.


In [11]:
print("Creating FK for parent and answer id after insertion")
final_query = """ALTER TABLE Posts ADD CONSTRAINT fk_ParentId FOREIGN KEY (ParentId) REFERENCES Posts(Id) ON DELETE CASCADE; 
ALTER TABLE Posts ADD CONSTRAINT fk_AcceptedAnswerId FOREIGN KEY (AcceptedAnswerId) REFERENCES Posts(Id) ON DELETE CASCADE; """

exec_commit(final_query)

Creating FK for parent and answer id after insertion


DuplicateObject: constraint "fk_parentid" for relation "posts" already exists


In [1]:
print("hello")

hello


Missing postIds count: 4059
Adding FK postid to posts


In [2]:
input_string = "<postgre><database>"

# Remove the angle brackets and split the string based on the brackets
words = input_string.replace('<', ' ').replace('>', ' ').split()

# Filter out any empty strings
word_list = [word for word in words if word]

print(word_list)


['postgre', 'database']


In [None]:
import xml.etree.ElementTree as ET

def analyze_xml(file_path):
    key_info = {}
    total_records = 0
    non_empty_records = 0
    # Required keys to check for presence and non-null values
    required_keys = {'Id', 'OwnerUserId', 'ParentId', 'AcceptedAnswerId'}

    try:
        context = ET.iterparse(file_path, events=("end",))
        temp_unique_counts = {}
        for event, elem in context:
            if elem.tag == 'row':
                total_records += 1
                
                # Check if all required keys are present and non-null
                if all(key in elem.attrib and elem.attrib[key] != "" for key in required_keys):
                    non_empty_records += 1
                    for key, value in elem.attrib.items():
                        value_length = len(value)
                        if key not in key_info:
                            key_info[key] = {
                                "max_length": value_length,
                                "longest_value": value,
                                "null_count": 0, 
                                "unique_count": 0 
                            }
                            temp_unique_counts[key] = set() 
                        else:
                            if value_length > key_info[key]["max_length"]:
                                key_info[key]["max_length"] = value_length
                                key_info[key]["longest_value"] = value
                        # Track unique values up to a limit of 50
                        if key_info[key]["unique_count"] < 50:
                            if key not in temp_unique_counts:
                                temp_unique_counts[key] = set()
                            temp_unique_counts[key].add(value)
                            key_info[key]["unique_count"] = min(len(temp_unique_counts[key]), 50)

                # Update null counts for each key
                for key in key_info:
                    if key not in elem.attrib or elem.attrib[key] == "":
                        key_info[key]["null_count"] += 1

                elem.clear()

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")

    return key_info, total_records, non_empty_records



file_path = data_directory + "Posts" + ".xml"
print("Exploring: ", file_path)
key_info, total_records, non_empty_records = analyze_xml(file_path)
print("total_records: ", total_records, " total_non_empty_records: ", non_empty_records)
for key, info in sorted(key_info.items()):
    print(f"Key: {key}\n")
    print(f"Max_Length: {info['max_length']}\n")
    if len(info['longest_value']) > 1000:
        print(f"Long post with more than 1000 chars\n")
    print(f"Null count: {info['null_count']}\n")
    print(f"Unique Count: {info['unique_count']}\n")
    