# DB transformer

First set establish a postgres and install the dvdschema to it. This can be done using the following commands:

docker run --name some-postgres --rm -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust -d postgres:12.15
    
docker exec some-postgres apt update -y

docker exec some-postgres apt install wget unzip -y

docker exec some-postgres wget https://www.postgresqltutorial.com/wp-content/uploads/2019/05/dvdrental.zip

docker exec some-postgres unzip dvdrental.zip

docker exec some-postgres psql -U postgres -c "CREATE DATABASE dvdrental;"

docker exec -i some-postgres pg_restore -U postgres -d dvdrental dvdrental.tar

docker exec some-postgres psql -U postgres -d dvdrental -c "create schema target;"


In [None]:
!apt update -y -q
!apt install build-essential libpq-dev graphviz graphviz-dev -y
!pip install psycopg2
!pip install eralchemy2
!pip install graphviz
!pip install openai

In [None]:
from eralchemy2 import render_er

## Draw from Postgres database
render_er("postgresql+psycopg2://postgres@0.0.0.0:5432/dvdrental", 'erd_from_postgres.png')


In [None]:
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

img = np.asarray(Image.open('erd_from_postgres.png'))
plt.rcParams["figure.figsize"] = (15, 10)
plt.imshow(img)
plt.show()


In [None]:
conn.close()

In [None]:
import psycopg2
conn = psycopg2.connect("host=0.0.0.0 dbname=dvdrental user=postgres")
cur = conn.cursor()


In [None]:
from flask import request
import openai
openai.api_key=""

In [None]:
columnmaps={}
columnmaps['target.info.email']='public.customer.email'
columnmaps['target.info.address']='public.address.address'

columnmaps['target.movie.title']='public.film.title'
columnmaps['target.movie.category']='public.category.name'
columnmaps['target.movie.language']='public.language.name'
columnmaps['target.movie.rental_date']='public.rental.rental_date'

columnmaps['target.sales.logon']='public.staff.username'
columnmaps['target.sales.first_name']='public.staff.first_name'
columnmaps['target.sales.store_id']='public.staff.store_id'

keymaps={}
keymaps['target.salesxmovie']={'movie_id':'target.movie.movie_id','sales_id':'target.sales.sales_id'}

constraints={}
constraints['target.salesxmovie.movie_id']='target.movie.movie_id'
constraints['target.salesxmovie.sales_id']='target.sales.sales_id'
constraints['target.info.info_id']='target.movie.movie_id'


In [None]:
def gettype(schema,table,column):
    sql="select data_type from information_schema.columns where table_schema='"+schema+"' and table_name='"\
    +table+"' and column_name='"+column+"';"
    cur.execute(sql)
    a=cur.fetchall()
    return a[0][0]

In [None]:
def buildtable(schema,table,columns,types):
    sql="create table "+schema+"."+table+"("
    for i in range(len(columns)):
        column=columns[i]
        typ=types[i]
        sql=sql+" "+column+" "+typ+" NOT NULL,"
    sql=sql+" "+table+"_id INTEGER PRIMARY KEY);"
    return sql


In [None]:
def createtargetbymap(columnmaps,keymaps,constraints):
    targetmeta={}
    for k in columnmaps.keys():
        targetschema=k.split(".")[0]
        targettable=k.split(".")[1]
        targetcolumn=k.split(".")[2]
        l=columnmaps[k]
        sourceschema=l.split(".")[0]
        sourcetable=l.split(".")[1]
        sourcecolumn=l.split(".")[2]

        if targettable not in targetmeta.keys():
            targetmeta[targettable]={'columns':[],'types':[]}
        if targettable in targetmeta.keys():
            targetmeta[targettable]['columns'].append(targetcolumn)
            targettype=gettype(sourceschema,sourcetable,sourcecolumn)
            targetmeta[targettable]['types'].append(targettype)

    sql = "DROP SCHEMA IF EXISTS "+targetschema+" CASCADE;"
    try:
        cur.execute(sql)
    except Exception as err:
        print(err)
        er=err
    conn.commit()

    sql = "CREATE SCHEMA "+targetschema+";"
    try:
        cur.execute(sql)
    except Exception as err:
        print(err)
        er=err
    conn.commit()

    targettables=[]
    for targettable in targetmeta.keys():
        targettables.append(targettable)
        sql=buildtable(targetschema,targettable,targetmeta[targettable]['columns'],targetmeta[targettable]['types'])
        cur.execute(sql)
        conn.commit()
        
    for k in keymaps.keys():
        targetschema=k.split(".")[0]
        targettable=k.split(".")[1]            
        columns=[]
        types=[]
        for l in keymaps[k].keys():
            tschema=keymaps[k][l].split(".")[0]
            ttable=keymaps[k][l].split(".")[1]
            tcolumn=keymaps[k][l].split(".")[2]
            ttype=gettype(tschema,ttable,tcolumn)
            columns.append(tcolumn)
            types.append(ttype)
        if targettable not in targettables:
            sql=buildtable(targetschema,targettable,columns,types)
            #print(sql)
            cur.execute(sql)
            conn.commit()
            
    #Now alter for constraints
    for k in constraints.keys():
        fschema=k.split(".")[0]
        ftable=k.split(".")[1]
        fcolumn=k.split(".")[2]
        l=constraints[k]
        tschema=l.split(".")[0]
        ttable=l.split(".")[1]
        tcolumn=l.split(".")[2]

        sql="alter table "+fschema+"."+ftable + " add constraint fk_"+ftable+"_"+ttable+" foreign key ("+fcolumn+") references "\
        + tschema +"."+ttable+"("+tcolumn+");"
        #print(sql)
        cur.execute(sql)
        conn.commit()

In [None]:
createtargetbymap(columnmaps,keymaps,constraints)

In [None]:
import os
def givetabledef(schema,table):
    com="pg_dump -U postgres --schema-only -h 0.0.0.0 dvdrental -t '"+schema+"."+table+"' > dump.sql"
    os.system(com)
    flag=False
    with open('dump.sql', 'r') as f:
        lines = f.readlines()
    result=[]
    for line in lines:
        if ('CREATE' in line) or ('ALTER' in line):
            flag=True
        if flag:
            result.append(line)
        if flag and ';' in line:
            flag=False
    return result

In [None]:
def givetableconstraints(schema,table):
    com="pg_dump -U postgres --schema-only -h 0.0.0.0 dvdrental -t '"+schema+"."+table+"' > dump.sql"
    os.system(com)
    flag=False
    with open('dump.sql', 'r') as f:
        lines = f.readlines()
    result=[]
    for line in lines:
        if  ('ALTER' in line):
            if not ('OWNER TO postgres' in line):
                flag=True
        if flag:
            result.append(line)
        if flag and ';' in line:
            flag=False
    return result

In [None]:
givetableconstraints('public','address')

In [None]:
givetableconstraints('target','movie')

In [None]:
import random
import string

def generate_random_string(length):
    letters = string.ascii_letters
    return (''.join(random.choice(letters) for i in range(length))).lower()

random_string = generate_random_string(10)
print(random_string)


In [None]:
def getexampledata(schema,table,column="*"):
    sql="select "+column+" from "+schema+"."+table+" limit 10";
    #print(sql)
    cur.execute(sql)
    res=cur.fetchall()
    #print(res)
    results=[]
    for r in res:
        results.append(str(r[1:]))
    return(results)


In [None]:
getexampledata('public','address')

In [None]:
def ddlemb(schema):
    #schema='public'
    sql="SELECT table_name FROM information_schema.tables WHERE table_schema = '"+schema+"' ORDER BY table_name;"
    cur.execute(sql)
    res=cur.fetchall()


    MODEL = "text-embedding-ada-002"

    #res
    #cosine = np.dot(A,B)/(norm(A)*norm(B))

    emb={}
    for a in res:
        #s=givetabledef(schema,a[0])
        s=givetableconstraints(schema,a[0])
        r = openai.Embedding.create(input=[''.join(s)], engine=MODEL)
        emb[a[0]]=r['data'][0]['embedding']
    return emb
    


sql="SELECT con.* \
FROM pg_catalog.pg_constraint con \
INNER JOIN pg_catalog.pg_class rel \
ON rel.oid = con.conrelid \
INNER JOIN pg_catalog.pg_namespace nsp \
ON nsp.oid = connamespace \
WHERE nsp.nspname = 'public' \
AND rel.relname = 'address';"

cur.execute(sql)
a=cur.fetchall()
a


In [None]:
givetabledef('public','address')

In [None]:
import re

class Prompt():
    def __init__(self,sourceschema,targetschema,targettable,columnmappings):
        self.sourcetables=[]
        self.sourceschema=sourceschema
        self.targetschema=targetschema
        self.targettable=targettable
        self.columnmappings=columnmappings
        self.messages=[]
        #self.ddlemb=ddlemb(self.sourceschema)


    def initialmessage(self):
        prompt={
          "role": "system",
          "content": "Generate sql statements in a separate sql code block."
        }
        self.messages.append(prompt)

        prompt={
          "role": "system",
          "content": "You are a developer writing SQL queries."
        }
        self.messages.append(prompt)
        
        s=givetabledef(self.targetschema,self.targettable)
        prompt={
        "role":"user",
        "content":''.join(s)
        }
        #self.messages.append(prompt)   
        
    def ask4info(self):
        prompt={
          "role": "system",
          "content": "Your job is to move data from the schema public to the schema target." +\
            "Generate one sql statement which will give you some foreign key constraints."+\
            "You can querythe database pg_constraint and pg_attribute and use " +\
            " pg_get_constraintdef toward that task. You should limit the statement to the one "+\
            "schema and the one table that you think matters the most. Only return unique constraints "+\
            "and return the schema in the DDL."
        }
        self.messages.append(prompt)
        
    def ask4info(self):
        prompt={
          "role": "system",
          "content": "Your job is to move data from the schema public to the schema target." +\
            "Give a pair ('schema','table') for which the DDL of the schema.table will help you"
        }
        self.messages.append(prompt)
        
    def ask4solution(self):
        prompt={
          "role": "system",
          "content": "Your job is to move data from the schema public to the schema target." +\
            "Generate one sql statement which will do that, mapping the columns as described."
        }
        self.messages.append(prompt)

    def addmappings(self):
        prompt={
          "role": "user",
          "content": "Mappings between some of the "+self.sourceschema+" columns and columns in "+self.targetschema+"."+self.targettable+" are:"
        } 
        self.messages.append(prompt)
        for k in self.columnmappings.keys():
            targetschema=k.split(".")[0]
            targettable=k.split(".")[1]
            targetcolumn=k.split(".")[2]
            #if targetschema==self.targetschema:
                #if targettable==self.targettable:
            prompt={
              "role": "user",
              "content": k+":"+self.columnmappings[k]
            } 
            self.messages.append(prompt)
            
    def addddl(self,list_text):
        for schema,tab in list_text:
            prompt={
            "role":"user",
            "content":"Table "+schema+"."+tab+" has DDL:"
            }
            self.messages.append(prompt)
            s=givetabledef(self.sourceschema,tab)
            prompt={
            "role":"user",
            "content":''.join(s)
            }
            self.messages.append(prompt)
                    
    def extractsql(self,text):
        match = re.search(r"```sql([\s\S]*?)```", text,re.IGNORECASE) 
        sql = ""
        if match: 
            sql = match.group(1) 
        return(sql)
    
    def getreply(self,messages=None):
        if messages is None:
            messages=self.messages
        r=openai.ChatCompletion.create(
          #model="gpt-3.5-turbo-16k",
          model="gpt-4",
          #model="gpt-4-32k",
          messages=messages
        )
        #print(self.messages)
        print(r)
        text=r['choices'][0]['message']['content']
        return(text)
    
    def testsql(self,sql,depth=1):
        createtargetbymap(columnmaps,keymaps,constraints)
        row_text=""
        if depth>0:
            try:
                cur.execute(sql)
                res=cur.fetchall()
                # Fetch the results as text
                row_text=""
                for row in res:
                    row_text = row_text+"\n"+','.join(map(str, row))

            except Exception as err:
                conn.rollback()
                print(err)
                er=str(err)
                if depth>2:
                    text=self.fixsql(sql,err)
                    sql=self.extractsql(text)
                    print("new sql is "+sql)
                    self.testsql(sql,depth=depth-1)
        conn.commit()
        return row_text
    
    def fixsql(self,sql,err):
        prompt={
            "role":"user",
            "content":"The sqlstatement "+sql + " gave the error: "+er+". Can you correct the sqlstatement?"
        }
        text=self.getreply(self,messages=[prompt])
        return text
    
    def addsqlandanswer(self,sql):
        row_text=self.testsql(sql)
        if row_text!="":
            prompt={"role":"user",
            "content":"The sqlstatement "+sql+" reached the following answer "+ row_text}
            self.messages.append(prompt)
            self.interactions.append(row_text)

    
    def buildsqlquestion(self):
        self.messages=[]
        self.initialmessage()
        self.addmappings()
            
    def buildsql(self):
        #print(self.messages)
        self.buildsqlquestion()
        for j in range(2):
            r=self.getreply()
            print("------------------")
            pprint(r)
            print("------------------")
            sql=self.extractsql(r)
            print("------------------")
            print(sql)
            print("------------------")
            prompt={"role":"user",
           "content":"You answered with this sql:"+sql}
            self.messages.append(prompt)
            text=self.testsql(sql)
            prompt={"role":"user",
           "content":"Which gave this answer:"+text}
            self.messages.append(prompt)
        return None
    


In [None]:
columnmaps={}
columnmaps['target.info.email']='public.customer.email'
columnmaps['target.info.address']='public.address.address'

columnmaps['target.movie.title']='public.film.title'
columnmaps['target.movie.category']='public.category.name'
columnmaps['target.movie.language']='public.language.name'
columnmaps['target.movie.rental_date']='public.rental.rental_date'

columnmaps['target.sales.logon']='public.staff.username'
columnmaps['target.sales.first_name']='public.staff.first_name'
columnmaps['target.sales.store_id']='public.staff.store_id'

keymaps={}
keymaps['target.salesxmovie']={'movie_id':'target.movie.movie_id','sales_id':'target.sales.sales_id'}

constraints={}
constraints['target.salesxmovie.movie_id']='target.movie.movie_id'
constraints['target.salesxmovie.sales_id']='target.sales.sales_id'
constraints['target.info.info_id']='target.movie.movie_id'

createtargetbymap(columnmaps,keymaps,constraints)

In [None]:
p=Prompt(sourceschema='public',targetschema='target',targettable='info',columnmappings=columnmaps)

In [None]:
p.initialmessage()
p.addmappings()
p.ask4info()
p.messages

In [None]:
text=p.getreply()
text=text.replace("\n","")
import ast
list_text = ast.literal_eval(text)

In [None]:
list_text

In [None]:
p.addddl(list_text)

In [None]:
p.messages

In [None]:
p.ask4info()


In [None]:
p.messages

In [None]:
text=p.getreply()
text=text.replace("\n","")
import ast
list_text = ast.literal_eval(text)

In [None]:
list_text

In [None]:
from pprint import pprint
sql=p.extractsql(text)
#pprint(text)
pprint(sql)
#p.messages

In [None]:
sql

In [None]:
text=p.testsql(sql,depth=3)
print(text)

In [None]:
p.messages=[]
p.initialmessage()
p.addmappings()
prompt={
    "role":"user",
    "content":"Some database information is: "+''.join(text)}
p.messages.append(prompt)
p.ask4solution()
p.messages

In [None]:
len(str(p.messages))

In [None]:
text1=p.getreply()

In [None]:
#sql="select * from public.address limit 3;"

sql="SELECT con.* " + \
"FROM pg_catalog.pg_constraint con "+ \
"INNER JOIN pg_catalog.pg_class rel ON rel.oid = con.conrelid "+ \
"INNER JOIN pg_catalog.pg_namespace nsp ON nsp.oid = connamespace "+ \
"WHERE nsp.nspname = 'public' "+ \
"AND rel.relname = 'address'; "


sql="SELECT conname AS constraint_name, "+\
"       pg_get_constraintdef(c.oid) AS ddl "+\
"FROM   pg_constraint c "+\
"JOIN   pg_namespace n ON n.oid = c.connamespace "+\
"WHERE  conrelid = ("+\
"    SELECT oid "+\
"    FROM   pg_class cl "+\
"    JOIN   pg_namespace ns ON ns.oid = cl.relnamespace"+\
"    WHERE  nspname = 'public'"+\
"    AND    relname = 'address'"+\
");"

cur.execute(sql)
res=cur.fetchall()


In [None]:
for r in res:
    print(r)

In [None]:
p.messages

In [None]:
p.messages

In [None]:
p.ask4solution()

In [None]:
text=p.getreply()

In [None]:
import re


pattern = r"```sql([\s\S]*?)```"
sql_statement = re.search(pattern, text, re.IGNORECASE)

if sql_statement:
    print(sql_statement.group(1))
else:
    print("No SQL statement found.")



In [None]:
text

In [None]:
p.extractsql(r)

In [None]:
p.testsql(sql)

In [None]:
from pprint import pprint
pprint(r)

In [None]:
createweirdtarget02(constraint=False)

In [None]:
p=Prompt(sourceschema='public',targetschema='target',targettable='movie',columnmappings=columnmaps)

In [None]:
r=p.buildsql()

In [None]:
from pprint import pprint
pprint(r)

In [None]:
from pprint import pprint
pprint(r)

In [None]:
from pprint import pprint
pprint(r)

In [None]:
p.messages

In [None]:
r=openai.ChatCompletion.create(
  model="gpt-3.5-turbo-16k",
  #model="gpt-4",
  messages=p.messages
)
r


In [None]:
text=r['choices'][0]['message']['content']
match = re.search(r"sql\n([\s\S]*?)```\n", text) 
sql = ""
if match: 
    sql = match.group(1) 


In [None]:
sql

In [None]:
pprint(sql)

In [None]:
p.testsql(sql)