In [1]:
import psycopg2 as pg
import psycopg2.extensions
from psycopg2.extras import execute_values
import copy
import time
import gzip
import json

In [21]:
# Connect to your postgres DB
connection = pg.connect(host="localhost", user="postgres", password="example", dbname="postgres")
pg.extensions.register_type(pg.extensions.UNICODE, connection)

In [22]:
cursor = connection.cursor()


In [3]:
path_to_authors = r"C:\Users\marve\authors.jsonl.gz"
path_to_conversations = r"C:\Users\marve\conversations.jsonl.gz"

In [4]:
with gzip.open(path_to_conversations,'r') as f:
    for line in f:
        conversation = line
        break

Zacneme prvotne so importom dat userov...

In [14]:
def exists(obj, attr, is_id=False):    
    if attr in obj.keys() and obj[attr] is not None:
        if is_id and obj[attr] == "":
            return False
        return True
    return False

In [41]:
def make_string_valid(string):
    string = (str(string)
        .encode("utf-8")
        .decode("utf-8", errors="replace")
        .replace("\x00", "\uFFFD")
    )
    return string

In [None]:
def create_tables(cursor):
    cursor.execute("""
        CREATE TABLE conversations (
        id int8 PRIMARY KEY,
        author_id int8 NOT NULL,
        content text NOT NULL,
        possibly_sensitive bool NOT NULL,
        language varchar(3) NOT NULL,
        source text NOT NULL,
        retweet_count int4,
        reply_count int4,
        like_count int4,
        quote_count int4,
        created_at TIMESTAMPTZ,
        FOREIGN KEY (role_id) REFERENCES roles (role_id)
        );
    """)
    

In [None]:
def import_conversation_table(path_to_conversation_export, log_step=10000):
    print("...Importing 'authors' data...")
    start_time = time.time()
    prev_block_time = start_time

    connection = pg.connect(host="localhost", user="postgres", password="example", dbname="postgres")
    pg.extensions.register_type(pg.extensions.UNICODE, connection)
    cursor = connection.cursor()

    create_tables(cursor)


In [16]:
def import_author_table(path_to_author_export, log_step=10000):
    print("...Importing 'authors' data...")
    start_time = time.time()
    prev_block_time = start_time

    create_table_string = """
        CREATE TABLE authors (
	    id int8 PRIMARY KEY,
        name varchar(255),
        username varchar(255),
        description text,
        followers_count int4,
        following_count int4,
        tweet_count int4,
        listed_count int4
        );
    """

    connection = pg.connect(host="localhost", user="postgres", password="example", dbname="postgres")
    pg.extensions.register_type(pg.extensions.UNICODE, connection)
    cursor = connection.cursor()

    cursor.execute(create_table_string)

    with gzip.open(path_to_author_export,'r') as f:
        for it, author_json_str in enumerate(f):
            author_obj = json.loads(author_json_str)
            author_row = migrate_author_entity(author_obj)
            if author_row is not None:
                cursor.execute(
                    """
                    INSERT INTO authors (id, name, username, description, 
                    followers_count, following_count, tweet_count, 
                    listed_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    """,
                    author_row
                )

            if it % 10000 == 0 and it != 0:
                connection.commit()

                time_check = time.time()
                print(f"Time elapsed since the beggining: {time_check - start_time / 60}m | Time spent on the last block: {time_check - prev_block_time}")
                prev_block_time = time_check

            
    cursor.close()
    connection.close()
    print("...Finished importing 'authors' data...")    
        

In [36]:
def migrate_author_entity(original_obj):
    obj = copy.deepcopy(original_obj)

    if exists(obj, "id", is_id=True) == False:
        return None
    obj["id"] = int(obj["id"])

    nullable_string_attributes = [
        "name",
        "username",
        "description"
    ]
    for str_attrb in nullable_string_attributes:
        if exists(obj, str_attrb) == False:
            obj[str_attrb] = None
        else:
            obj[str_attrb] = make_string_valid(obj[str_attrb])
    
    public_metrics = [
        "followers_count",
        "following_count"
        "tweet_count"
        "listed_count"
    ]
    if exists(obj, "public_metrics") == False:
        obj["public_metrics"] = {}
        
        for m in public_metrics:
            obj["public_metrics"][m] = None
    else:    
        for m in public_metrics:
            if exists(obj["public_metrics"], m) == False:
                obj["public_metrics"][m] = None
            else:
                obj["public_metrics"][m] = int(obj["public_metrics"][m])
    
    table_row = [
        obj["id"],
        obj["name"],
        obj["username"],
        obj["description"],
        obj["public_metrics"]["followers_count"],
        obj["public_metrics"]["following_count"],
        obj["public_metrics"]["tweet_count"],
        obj["public_metrics"]["listed_count"],
    ]
    

    return table_row

In [30]:
author_row = migrate_author_entity(author)

In [32]:
cursor = connection.cursor()

cursor.execute(
    """
    INSERT INTO authors (id, name, username, description, 
    followers_count, following_count, tweet_count, 
    listed_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """,
    author_row
)

In [33]:
connection.commit()

In [None]:
"""CREATE TABLE hashtags (
	id int8 PRIMARY KEY,
    tag text UNIQUE NOT NULL
);"""

"""
CREATE TABLE conversation_hashtags (
	id int8 PRIMARY KEY,
    conversation_id int8 NOT NULL,
    hashtag_id int8 NOT NULL,
    FOREIGN KEY (conversation_id) REFERENCES conversations (id),
    FOREIGN KEY (hashtag_id) REFERENCES hashtags (id)
);
"""

In [None]:
"""CREATE TABLE context_domains (
	id int8 PRIMARY KEY,
    name varchar(255) NOT NULL,
    description text
);"""

"""CREATE TABLE context_entities (
	id int8 PRIMARY KEY,
    name varchar(255) NOT NULL,
    description text
);"""

"""
CREATE TABLE context_annotations (
	id int8 PRIMARY KEY,
    conversation_id int8 NOT NULL,
    context_domain_id int8 NOT NULL,
    context_entity_id int8 NOT NULL,
    FOREIGN KEY (conversation_id) REFERENCES conversations (id),
    FOREIGN KEY (context_domain_id) REFERENCES context_domains (id),
    FOREIGN KEY (context_entity_id) REFERENCES context_entities (id)
);
"""

In [None]:
"""CREATE TABLE annotations (
	id int8 PRIMARY KEY,
    conversation_id int8 NOT NULL,
    value text NOT NULL,
    type text NOT NULL,
    probability numeric(4,3) NOT NULL,
    FOREIGN KEY (conversation_id) REFERENCES conversations (id),
);"""

In [None]:
"""CREATE TABLE links (
	id int8 PRIMARY KEY,
    conversation_id int8 NOT NULL,
    url varchar(2048) NOT NULL,
    title text,
    description text,
    FOREIGN KEY (conversation_id) REFERENCES conversations (id),
);"""

In [None]:
"""CREATE TABLE conversation_references (
	id int8 PRIMARY KEY,
    conversation_id int8 NOT NULL,
    parent_id int8 NOT NULL,
    type varchar(20) NOT NULL,
    FOREIGN KEY (conversation_id) REFERENCES conversations (id),
    FOREIGN KEY (parent_id) REFERENCES conversations (id),
);"""