In [1]:
import pandas as pd
import numpy as np
import os
!pip install psycopg2
import psycopg2 as pg
import pandas.io.sql as sql
import sqlalchemy



In [2]:
# host is the bridge gateway of dbnet
DB_HOST = "192.168.0.1" 
DB_PORT = "5432"
DB_NAME = "warenkorb_db"
DB_SCHEMA = ""
DB_ADMIN_USER = "db_admin" 
DB_ADMIN_PASSWORD = "db_admin_0815_pw"
DB_USER = "db_user" 
DB_PASSWORD = "db_user_pw" 
DB_USER_GRANTS = "SELECT" #"SELECT, INSERT, UPDATE, DELETE"

zip_file = "data.zip"
zip_file_dir = "./data"
table_file_ending = ".csv"
table_file_delimiter = ","
# definition of special types
table_field_configs = {}

type_mapping = {
    "int64": "INT",
    "float64": "FLOAT",
    "object": "TEXT"
}

class DBConfig:
    db_str = None
    
    def __init__(self, db_host = DB_HOST, db_port = DB_PORT, db_name = DB_NAME, db_schema = DB_SCHEMA, db_admin_user = DB_ADMIN_USER, db_admin_password = DB_ADMIN_PASSWORD, db_user = DB_USER, db_password = DB_PASSWORD, db_user_grants = DB_USER_GRANTS):
        self.db_host = db_host
        self.db_port = db_port
        self.db_name = db_name
        self.db_schema = db_schema
        self.db_admin_user = db_admin_user
        self.db_admin_password = db_admin_password
        self.db_user = db_user
        self.db_password = db_password
        self.db_user_grants = db_user_grants
        
        self.create_db()
        
        self.db_str = 'postgresql://{username}:{password}@{host}:{port}/{dbname}'.format(username=db_admin_user,password=db_admin_password,host=db_host,port=db_port,dbname=db_name)
        print("Using db connection "+self.db_str)
        
        self.conn = pg.connect(self.db_str)
        self.conn.autocommit = True
        self.cur = self.conn.cursor()
        self.engine = sqlalchemy.create_engine(self.db_str)
    
    def dispose(self):
        # close communication with the PostgreSQL database server
        self.cur.close()
        self.cur = None
        # commit the changes
        self.conn.commit()
        # close the connection
        self.conn.close()
        self.conn = None
        
        self.engine.dispose()
        
    def create_db(self):
        db_str = 'postgresql://{username}:{password}@{host}:{port}'.format(username=self.db_admin_user,password=self.db_admin_password,host=self.db_host,port=self.db_port)
        print("Using db connection "+db_str+" for DB creation.")
        
        conn = pg.connect(db_str)
        conn.autocommit = True
        cur = conn.cursor()   
        
        drop_command = "DROP DATABASE IF EXISTS " +self.db_name
        create_command = "CREATE DATABASE " +self.db_name
        cur.execute(drop_command)
        cur.execute(create_command)
        conn.commit()
        
        drop_command = "DROP ROLE IF EXISTS " +self.db_user
        create_command = "CREATE ROLE " +self.db_user +" LOGIN PASSWORD '" + self.db_password +"'"
        cur.execute(drop_command)
        cur.execute(create_command)
        conn.commit()
        
        grant_command_01 = "GRANT USAGE ON SCHEMA public TO "+ self.db_user
        grant_command_02 = "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT "+ self.db_user_grants +" ON TABLES TO "+ self.db_user
        cur.execute(grant_command_01)
        cur.execute(grant_command_02)
        conn.commit()
        
        if self.db_schema is not None and "" != self.db_schema:
            drop_command = "DROP SCHEMA IF EXISTS "+ self.db_schema
            create_command = "CREATE SCHEMA "+ self.db_schema
            cur.execute(drop_command)
            cur.execute(create_command)
            conn.commit()
    
        cur.close()
        conn.close()
        
class TableConfig:
    name = None
    content = None
    types = {}
    
    def get_table_name(self, db_config):
        if db_config.db_schema is None or "" is db_config.db_schema:
            return self.name.lower()
        else:
            return db_config.db_schema.lower() + "." + self.name.lower()
    
    
def define_types(table_config, table_fields_config):
    print("Handle table '"+ table_config.name +"'")
    first_column = True
    table_config.content.columns = table_config.content.columns.str.lower()
    
    for column in table_config.content.columns[0:]:
        print("Handle field '"+ table_config.get_table_name(db_config) + "."+ column +"'.")
        if table_fields_config is not None:
            field_config = table_fields_config.get(column, None)
            
            if field_config is not None:
                for key, value in field_config.items():
                    print("Handle field '"+ table_config.name + "."+ column +"' having '"+ str(key) +" = "+ str(value) +"'.")
                    
                    if "format" == key:
                        print("Field '"+ table_config.name +"."+ column +"' is of type date.")
                        table_config.content[column] = pd.to_datetime(table_config.content[column], format=value)
                        table_config.content[column] = table_config.content[column].apply(lambda x: x.date())
                        
                        table_config.types[column] = "DATE"
                        if "%H" in key:
                            table_config.types[column] = "TIMESTAMP"
                        
                    elif "numeric" == key:
                        print("Field '"+ table_config.name +"."+ column +"' is of numeric type.")
                        table_config.content[column] = pd.to_numeric(table_config.content[column], errors='coerce')
                    elif "type" == key:
                        print("Field '"+ table_config.name +"."+ column +"' is of type = '"+str(value) +"'.'")
                        table_config.content[column] = table_config.content[column].astype(value)
                    else:
                        print("!!!!\Field '"+ table_config.name +"."+ column +"' has an unknown conversion defined '"+ str(key) +" = "+ str(value) +"'.\n!!!!")

        if column not in table_config.types:
            typ = type_mapping.get(str(table_config.content[column].dtype), None)
            
            if typ is not None:
                table_config.types[column] = typ

            else:
                print("!!!!\nThere is not type defined for '"+str(table_config.content[column].dtype) +" in "+ str(type_mapping) +"'.\n!!!!")

            
        if first_column:
            #print("Table "+ table_config.name +" has index "+ column)
            #table_config.content.set_index(column, inplace=True, verify_integrity=True)
            first_column = False
             
                
def create_table(db_config, table_config):
    drop_command = "DROP TABLE IF EXISTS "+ table_config.get_table_name(db_config)
    create_command = "CREATE TABLE IF NOT EXISTS "+ table_config.get_table_name(db_config) +" ("
    #grant_command_01 = "GRANT "+ db_config.db_user_grants +" ON ALL TABLES IN SCHEMA "+ db_config.db_name +" TO "+ db_config.db_user
    #grant_command_01 = "GRANT ALL PRIVILEGES ON DATABASE "+ db_config.db_name +" TO "+ db_config.db_user
    #grant_command_01 = "GRANT "+ db_config.db_user_grants +" PRIVILEGES ON ALL TABLES IN SCHEMA public TO "+ db_config.db_user
    #grant_command_02 = "GRANT "+ db_config.db_user_grants +" PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO "+ db_config.db_user
    grant_command_01 = "GRANT "+ db_config.db_user_grants +" ON ALL TABLES IN SCHEMA public TO "+ db_config.db_user
    
    first_column = True
    column = table_config.content.columns[0]
    if not table_config.content[column].is_unique:
        create_command += "index INT PRIMARY KEY NOT NULL"
        first_column = False
    
    for column in table_config.content.columns[0:]:
        print("Handle field '"+ table_config.get_table_name(db_config) + "."+ column +"' being of type '"+ str(table_config.content[column].dtype) +"'.")
        
        if not first_column:
            create_command += ","
            
        create_command += column + " "
        create_command += str(table_config.types[column])
        
        if first_column:
            create_command += " PRIMARY KEY NOT NULL"
            table_config.content.set_index(column, inplace=True, verify_integrity=True)
            first_column = False
    
    create_command += ")"
    
    print("Execute "+ drop_command)
    db_config.cur.execute(drop_command)
    print("Execute "+ create_command)
    db_config.cur.execute(create_command)
    print("Execute "+ grant_command_01)
    db_config.cur.execute(grant_command_01)
    #print("Execute "+ grant_command_02)
    #db_config.cur.execute(grant_command_02)
    db_config.conn.commit()
    
    
def add_data(db_config, table_config):
    if db_config.db_schema is None or db_config.db_schema == "":
        table_config.content.to_sql(table_config.name, db_config.engine, if_exists='append', chunksize=1000)
    else:
        table_config.content.to_sql(table_config.name, db_config.engine, schema=db_config.db_schema, if_exists='append', chunksize=1000)

In [3]:
# unpack the zip file
from shutil import unpack_archive
os.makedirs(zip_file_dir, exist_ok=True)
unpack_archive(zip_file, '.')

db_config = DBConfig()

for root, dirs, files in os.walk(zip_file_dir):
    for file in files:
        if file.endswith(table_file_ending):
            table_config = TableConfig()
            
            table_config.name = os.path.splitext(file)[0]
            table = pd.read_csv(os.path.join(root, file), delimiter=table_file_delimiter)
            table_config.content = table  
            #print("Read table content of '"+table_config.name +"'.")
            
            define_types(table_config, table_field_configs.get(table_config.name, None))
            create_table(db_config, table_config)
            add_data(db_config, table_config)

# remove data files
from shutil import rmtree
rmtree(zip_file_dir, ignore_errors=True)

db_config.dispose()

Using db connection postgresql://db_admin:db_admin_0815_pw@192.168.0.1:5432 for DB creation.
Using db connection postgresql://db_admin:db_admin_0815_pw@192.168.0.1:5432/warenkorb_db
Handle table 'order_products_train'
Handle field 'order_products_train.order_id'.
Handle field 'order_products_train.product_id'.
Handle field 'order_products_train.add_to_cart_order'.
Handle field 'order_products_train.reordered'.
Handle field 'order_products_train.order_id' being of type 'int64'.
Handle field 'order_products_train.product_id' being of type 'int64'.
Handle field 'order_products_train.add_to_cart_order' being of type 'int64'.
Handle field 'order_products_train.reordered' being of type 'int64'.
Execute DROP TABLE IF EXISTS order_products_train
Execute CREATE TABLE IF NOT EXISTS order_products_train (index INT PRIMARY KEY NOT NULL,order_id INT,product_id INT,add_to_cart_order INT,reordered INT)
Execute GRANT SELECT ON ALL TABLES IN SCHEMA public TO db_user
Handle table 'departments'
Handle fi