In [64]:
import psycopg2
from dotenv import load_dotenv
import os 
from io import StringIO
from psycopg2.sql import SQL, Identifier

load_dotenv()

True

In [65]:
def getopenconnection(dbname='postgres'):
    return psycopg2.connect(
        dbname=dbname,
        user=os.getenv("USER"), 
        password=os.getenv("PASSWORD"),
        host=os.getenv("HOST"),
        port=os.getenv("PORT")
    )
 

def create_db(dbname):
    """
    We create a DB by connecting to the default user and database of Postgres
    The function first checks if an existing database exists for a given name, else creates it.
    :return:None
    """
    conn = None
    cur = None 
    try:
        conn = getopenconnection(dbname='postgres') 
        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        cur = conn.cursor()

        # Check if database exists (proper parameterized query)
        cur.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s", (dbname,))
        
        if not cur.fetchone():
            cur.execute(SQL("CREATE DATABASE {}").format(Identifier(dbname)))
            print(f"Database '{dbname}' created successfully")
        else:
            print(f"Database '{dbname}' already exists")
        
    except psycopg2.Error as e:
        print(e)
        raise
    finally:
        if cur: cur.close() 
        if conn: conn.close() 

In [66]:
def preprocess_line(line):
    parts = line.strip().split("::")
    return f"{parts[0]}\t{parts[1]}\t{parts[2]}\n"

def LoadRatings(ratings_file_path):
    conn = None 
    cur = None 
    try:
        dbname = 'assignment1'
        create_db(dbname=dbname) 

        conn = getopenconnection(dbname=dbname)
        cur = conn.cursor()

        cur.execute("""
            CREATE TABLE IF NOT EXISTS ratings (UserId INTEGER, MovieId INTEGER, Rating FLOAT)  
        """) 
        conn.commit()

        batch_size=200_000
        buffer = StringIO()
        with open(ratings_file_path, 'r') as file:
            for i, line in enumerate(file, 1):
                buffer.write(preprocess_line(line))
                if i % batch_size == 0:
                    buffer.seek(0)
                    cur.copy_from(buffer, 'ratings', sep='\t', columns=('userid', 'movieid', 'rating'))

                    buffer.seek(0) 
                    buffer.truncate() 

            if buffer.tell():
                buffer.seek(0)
                cur.copy_from(buffer, 'ratings', sep='\t', columns=('userid', 'movieid', 'rating'))
       
        conn.commit()
    except psycopg2.Error as e:
        print(e) 
        if conn: conn.rollback()
        raise 
    except IOError as e:
        print(e) 
        raise 
    except Exception as e:
        print(e) 
        if conn: conn.rollback()
        raise 
    finally:
        if cur: cur.close()
        if conn: conn.close() 



In [None]:
LoadRatings('./ml-10m/ml-10M100K/ratings.dat') 
# 7.3s

Database 'assignment1' already exists
