In [2]:
from sqlalchemy import create_engine, inspect
from sqlalchemy.types import *
import re
from urllib.parse import quote_plus
import ast

In [3]:
type_mappings = {
    "postgres": {
        "smallint": "SMALLINT",
        "integer": "INT",
        "int": "INT",
        "bigint": "BIGINT",
        "decimal": "DECIMAL",  
        "numeric": "DECIMAL", 
        "real": "FLOAT",
        "double precision": "DOUBLE",
        "money": "STRING",
        "char": "STRING",
        "varchar": "STRING",
        "text": "STRING",
        "bytea": "BINARY",
        "boolean": "BOOLEAN",
        "date": "DATE",
        "timestamp": "TIMESTAMP",
        "timestamp with time zone": "TIMESTAMP",
        "timestamp without time zone": "TIMESTAMP",
        "time": "STRING",
        "time with time zone": "STRING",
        "interval": "STRING",
        "json": "STRING",
        "jsonb": "STRING",
        "uuid": "STRING",
        "inet": "STRING",
        "cidr": "STRING",
        "macaddr": "STRING",
        "xml": "STRING",
        "array": "STRING",
        "point": "STRING",
        "line": "STRING",
        "lseg": "STRING",
        "box": "STRING",
        "path": "STRING",
        "polygon": "STRING",
        "circle": "STRING",
        "tsvector": "STRING",
        "tsquery": "STRING",
        "bit": "STRING",
        "bit varying": "STRING"
    },
    "oracle": {
        "number": "DECIMAL",  
        "float": "DOUBLE",
        "binary_float": "FLOAT",
        "binary_double": "DOUBLE",
        "char": "STRING",
        "varchar2": "STRING",
        "nchar": "STRING",
        "nvarchar2": "STRING",
        "clob": "STRING",
        "nclob": "STRING",
        "blob": "BINARY",
        "raw": "BINARY",
        "long": "STRING",
        "long raw": "BINARY",
        "date": "DATE",
        "timestamp": "TIMESTAMP",
        "timestamp with time zone": "TIMESTAMP",
        "timestamp with local time zone": "TIMESTAMP",
        "interval year to month": "STRING",
        "interval day to second": "STRING",
        "rowid": "STRING",
        "urowid": "STRING",
        "xmltype": "STRING",
        "anydata": "STRING",
        "bfile": "STRING",
        "boolean": "BOOLEAN"
    },
    "mysql": {
        "tinyint": "TINYINT",
        "smallint": "SMALLINT",
        "mediumint": "INT",
        "int": "INT",
        "bigint": "BIGINT",
        "decimal": "DECIMAL", 
        "numeric": "DECIMAL",  
        "float": "FLOAT",
        "double": "DOUBLE",
        "bit": "BOOLEAN",
        "char": "STRING",
        "varchar": "STRING",
        "binary": "BINARY",
        "varbinary": "BINARY",
        "blob": "BINARY",
        "tinyblob": "BINARY",
        "mediumblob": "BINARY",
        "longblob": "BINARY",
        "text": "STRING",
        "tinytext": "STRING",
        "mediumtext": "STRING",
        "longtext": "STRING",
        "json": "STRING",
        "enum": "STRING",
        "set": "STRING",
        "date": "DATE",
        "datetime": "TIMESTAMP",
        "timestamp": "TIMESTAMP",
        "time": "STRING",
        "year": "INT",
        "geometry": "STRING",
        "point": "STRING",
        "linestring": "STRING",
        "polygon": "STRING",
        "multipoint": "STRING",
        "multilinestring": "STRING",
        "multipolygon": "STRING",
        "geometrycollection": "STRING"
    },
    "sqlserver": {
        "int": "INT",
        "bigint": "BIGINT",
        "smallint": "SMALLINT",
        "tinyint": "TINYINT",
        "bit": "BOOLEAN",
        "decimal": "DECIMAL", 
        "numeric": "DECIMAL",  
        "float": "DOUBLE",
        "real": "FLOAT",
        "money": "DECIMAL(19, 4)",
        "smallmoney": "DECIMAL(10, 4)",
        "char": "STRING",
        "varchar": "STRING",
        "nchar": "STRING",
        "nvarchar": "STRING",
        "text": "STRING",
        "ntext": "STRING",
        "binary": "BINARY",
        "varbinary": "BINARY",
        "image": "BINARY",
        "datetime": "TIMESTAMP",
        "smalldatetime": "TIMESTAMP",
        "date": "DATE",
        "time": "STRING",
        "timestamp": "BINARY",
        "uniqueidentifier": "STRING",
        "xml": "STRING"
    }
}

In [8]:
with open('/Users/nachanon/projects/hive_datatype/type_mappings.txt') as f:
    data = f.read()
type_mappings = ast.literal_eval(data)

In [9]:

def get_hive_type(db_type , column_type):
   
    match = re.match(r"\w+\((\d+),\s*(\d+)\)",column_type)
    if match:
        type_name = match.group().split('(')[0]
        precision = int(match.groups()[0])
        scale = int(match.groups()[1])
        if precision > 38:
            precision = 38
        if (precision >0 and precision <= 38) and (scale >=0 and scale <= precision):
            return f"DECIMAL({precision},{scale})"
    
    if db_type in type_mappings and column_type in type_mappings[db_type]:
        return type_mappings[db_type][column_type]
    
    
    return 'STRING'  

def convert_schema_to_hive(engine, inspector, db_schema, db_type):
    
    schema = {}
    
    for table_name in inspector.get_table_names(db_schema):
        columns = []
        for column in inspector.get_columns(table_name = table_name, schema = db_schema):
            hive_type = get_hive_type(db_type,str(column['type']).lower())
            columns.append({
                'name':column['name'],
                'hive_type':hive_type,
                'comment':column['comment']}
              )
        schema[table_name] = columns

    return schema

def generate_sql_ddl(hive_schema, schema_name, table_name, table_comment, location = '/staging/ois' , stored_as = 'PARQUET'):
    ddl = f"CREATE EXTERNAL TABLE IF NOT EXISTS {schema_name}.{table_name} (\n"
    cols= []
    for col in hive_schema[table_name]:
        comment = f"COMMENT '{col['comment']}'" if col['comment'] else ''
        cols.append(f"{col['name']} {col['hive_type']} {comment}")
        
    ddl += "    "
    ddl += ",\n    ".join(cols)
    ddl += "\n)\n"

    ddl += f"COMMENT '{table_comment['text']}'\n" if table_comment['text'] else ''
    ddl += f"STORED AS {stored_as}\n"
    ddl += f"LOCATION '{location}';"
    
    
    return ddl


In [10]:
conn_string = "postgresql://postgres:P%40ssw0rdsit@192.168.170.13:5432/postgres"
engine = create_engine(conn_string)
inspector = inspect(engine)
 
hive_schema = convert_schema_to_hive(engine, inspector, inspector.get_schema_names()[0],'postgres')
ddl = generate_sql_ddl(hive_schema, 'default','WaterPump',inspector.get_table_comment(table_name= 'WaterPump',schema = 'OIS'),'/user/bdaadmin')

In [11]:
hive_schema

{'WaterPump': [{'name': 'yyyymmdd', 'hive_type': 'BIGINT', 'comment': None},
  {'name': 'office_wwcode', 'hive_type': 'BIGINT', 'comment': None},
  {'name': 'office_mcode', 'hive_type': 'BIGINT', 'comment': None},
  {'name': 'water_type_code', 'hive_type': 'STRING', 'comment': None},
  {'name': 'water_type_name', 'hive_type': 'STRING', 'comment': None},
  {'name': 'water_id', 'hive_type': 'BIGINT', 'comment': None},
  {'name': 'water_name', 'hive_type': 'STRING', 'comment': None},
  {'name': 'plant_id', 'hive_type': 'BIGINT', 'comment': None},
  {'name': 'plant_name', 'hive_type': 'STRING', 'comment': None},
  {'name': 'untreated_water', 'hive_type': 'DOUBLE', 'comment': None},
  {'name': 'menu_water', 'hive_type': 'DOUBLE', 'comment': None}],
 'WaterLevel': [{'name': 'office_wwcode',
   'hive_type': 'BIGINT',
   'comment': None},
  {'name': 'office_id', 'hive_type': 'BIGINT', 'comment': None},
  {'name': 'office_showname', 'hive_type': 'STRING', 'comment': None},
  {'name': 'plant_id'

In [12]:
for table_name in inspector.get_table_names('OIS'):
    print(f"TABLE: {table_name}")
    for count, column in enumerate(inspector.get_columns(table_name=table_name, schema ='OIS')):
        
        print(f"NAME : {column['name']}   OLD DATA TYPE : {column['type']}   ----    HIVE DATA TYPE : {hive_schema[table_name][count]['hive_type']}")

TABLE: WaterPump
NAME : yyyymmdd   OLD DATA TYPE : BIGINT   ----    HIVE DATA TYPE : BIGINT
NAME : office_wwcode   OLD DATA TYPE : BIGINT   ----    HIVE DATA TYPE : BIGINT
NAME : office_mcode   OLD DATA TYPE : BIGINT   ----    HIVE DATA TYPE : BIGINT
NAME : water_type_code   OLD DATA TYPE : TEXT   ----    HIVE DATA TYPE : STRING
NAME : water_type_name   OLD DATA TYPE : TEXT   ----    HIVE DATA TYPE : STRING
NAME : water_id   OLD DATA TYPE : BIGINT   ----    HIVE DATA TYPE : BIGINT
NAME : water_name   OLD DATA TYPE : TEXT   ----    HIVE DATA TYPE : STRING
NAME : plant_id   OLD DATA TYPE : BIGINT   ----    HIVE DATA TYPE : BIGINT
NAME : plant_name   OLD DATA TYPE : TEXT   ----    HIVE DATA TYPE : STRING
NAME : untreated_water   OLD DATA TYPE : DOUBLE PRECISION   ----    HIVE DATA TYPE : DOUBLE
NAME : menu_water   OLD DATA TYPE : DOUBLE PRECISION   ----    HIVE DATA TYPE : DOUBLE
TABLE: WaterLevel
NAME : office_wwcode   OLD DATA TYPE : BIGINT   ----    HIVE DATA TYPE : BIGINT
NAME : offic

In [112]:
print(generate_sql_ddl(hive_schema, 'default','WaterPump',inspector.get_table_comment(table_name= 'WaterPump',schema = 'OIS'),'/user/bdaadmin'))


CREATE EXTERNAL TABLE IF NOT EXISTS default.WaterPump (
    yyyymmdd STRING ,
    office_wwcode STRING ,
    office_mcode STRING ,
    water_type_code STRING ,
    water_type_name STRING ,
    water_id STRING ,
    water_name STRING ,
    plant_id STRING ,
    plant_name STRING ,
    untreated_water STRING ,
    menu_water STRING 
)
STORED AS PARQUET
LOCATION '/user/bdaadmin';




In [128]:
schema_hive_all = {}
ddl = ''
for schema in inspector.get_schema_names():
    schema_hive_all[schema] =convert_schema_to_hive(engine, inspector, schema,'postgres')
    
for schema in schema_hive_all:
    for table in schema_hive_all[schema]:
        print(generate_sql_ddl(schema_hive_all[schema], schema, table,inspector.get_table_comment(table_name= table, schema=schema)))

CREATE EXTERNAL TABLE IF NOT EXISTS OIS.WaterPump (
    yyyymmdd BIGINT ,
    office_wwcode BIGINT ,
    office_mcode BIGINT ,
    water_type_code STRING ,
    water_type_name STRING ,
    water_id BIGINT ,
    water_name STRING ,
    plant_id BIGINT ,
    plant_name STRING ,
    untreated_water DOUBLE ,
    menu_water DOUBLE 
)
COMMENT 'k'
STORED AS PARQUET
LOCATION '/staging/ois';


CREATE EXTERNAL TABLE IF NOT EXISTS OIS.WaterLevel (
    office_wwcode BIGINT ,
    office_id BIGINT ,
    office_showname STRING ,
    plant_id BIGINT ,
    plant_name STRING ,
    water_id BIGINT ,
    water_name STRING ,
    machine_id BIGINT ,
    machine_code STRING ,
    category_code STRING ,
    category_name STRING ,
    category_style STRING ,
    group_code DOUBLE ,
    group_neme STRING ,
    untreated_date BIGINT ,
    volume DOUBLE ,
    level DOUBLE 
)
STORED AS PARQUET
LOCATION '/staging/ois';


CREATE EXTERNAL TABLE IF NOT EXISTS OIS.CustomerM09 (
    yyyymmdd BIGINT ,
    office_wwcode B

In [147]:
from urllib.parse import quote_plus
quote_plus(conn_string)

'postgresql%3A%2F%2Fpostgres%3AP%2540ssw0rdsit%40192.168.170.13%3A5432%2Ftest1'

In [237]:
# Test for DECIMAL(X,X)

In [144]:
#from urllib.parse import quote_plus
conn_string = "postgresql://postgres:P%40ssw0rdsit@192.168.170.13:5432/test1"
engine = create_engine(conn_string)
inspector = inspect(engine)

hive_schema = convert_schema_to_hive(engine, inspector, 'public','postgres')

In [16]:
inspector.get_table_names('OIS')

['WaterPump', 'WaterLevel', 'CustomerM09']

In [135]:
inspector.get_columns(schema = 'public',table_name = 'employee2')


[{'name': 'id',
  'type': INTEGER(),
  'nullable': False,
  'default': 'nextval(\'"public".employee2_id_seq\'::regclass)',
  'autoincrement': True,
  'comment': None},
 {'name': 'name',
  'type': VARCHAR(length=50),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'salary',
  'type': NUMERIC(precision=100, scale=2),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None}]

In [6]:
inspector.get_columns(table_name = 'employee',schema='public')

[{'name': 'id',
  'type': INTEGER(),
  'nullable': False,
  'default': 'nextval(\'"public".employee_id_seq\'::regclass)',
  'autoincrement': True,
  'comment': None},
 {'name': 'name',
  'type': VARCHAR(length=50),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'salary',
  'type': NUMERIC(precision=100, scale=2),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None}]

In [24]:
print(generate_sql_ddl(hive_schema, 'default', 'employee',inspector.get_table_comment('employee'), location = '/test_ddl' , stored_as = 'PARQUET'))

SyntaxError: incomplete input (403838789.py, line 1)

In [136]:
inspector.get_check_constraints(table_name = 'employee2',schema = 'public')

[]

In [146]:
inspector.get_unique_constraints('employee2')

[{'column_names': ['salary'], 'name': 'employee2_unique', 'comment': None},
 {'column_names': ['name'], 'name': 'employee_unique2', 'comment': None}]

In [19]:
inspector.get_columns(schema='OIS',table_name = 'WaterPump')

[{'name': 'yyyymmdd',
  'type': BIGINT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'office_wwcode',
  'type': BIGINT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'office_mcode',
  'type': BIGINT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'water_type_code',
  'type': TEXT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'water_type_name',
  'type': TEXT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'water_id',
  'type': BIGINT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'water_name',
  'type': TEXT(),
  'nullable': True,
  'default': None,
  'autoincrement': False,
  'comment': None},
 {'name': 'plant_id',
  'type': BIGINT(),
  'nullable': True,
  'default': None,
  'autoincr

In [22]:
hive_schema['WaterPump']

[{'name': 'yyyymmdd', 'hive_type': 'BIGINT', 'comment': None},
 {'name': 'office_wwcode', 'hive_type': 'BIGINT', 'comment': None},
 {'name': 'office_mcode', 'hive_type': 'BIGINT', 'comment': None},
 {'name': 'water_type_code', 'hive_type': 'STRING', 'comment': None},
 {'name': 'water_type_name', 'hive_type': 'STRING', 'comment': None},
 {'name': 'water_id', 'hive_type': 'BIGINT', 'comment': None},
 {'name': 'water_name', 'hive_type': 'STRING', 'comment': None},
 {'name': 'plant_id', 'hive_type': 'BIGINT', 'comment': None},
 {'name': 'plant_name', 'hive_type': 'STRING', 'comment': None},
 {'name': 'untreated_water', 'hive_type': 'DOUBLE', 'comment': None},
 {'name': 'menu_water', 'hive_type': 'DOUBLE', 'comment': None}]

In [31]:
!python --version

Python 3.12.4


In [33]:
!pip install thrift_sasl


Collecting thrift_sasl
  Using cached thrift_sasl-0.4.3-py2.py3-none-any.whl.metadata (1.2 kB)
Collecting pure-sasl>=0.6.2 (from thrift_sasl)
  Using cached pure_sasl-0.6.2-py3-none-any.whl
Using cached thrift_sasl-0.4.3-py2.py3-none-any.whl (8.3 kB)
Installing collected packages: pure-sasl, thrift_sasl
Successfully installed pure-sasl-0.6.2 thrift_sasl-0.4.3


In [38]:
from pyhive import hive
conn = hive.Connection(
    host='192.168.170.224', 
    port=10000, 
    username='bdaadmin',
    password='P@ssw0rdsit', 
    database='default',
    auth='LDAP'
)

In [46]:
ddl2 = re.sub(r'\s+', ' ', ddl.strip())
    
# Remove space before commas
ddl2 = re.sub(r'\s+,', ',', ddl2)

# Remove trailing semicolon and everything after it
ddl2 = re.sub(r';.*$', '', ddl2)

# Remove STORED AS and LOCATION clauses
ddl2 = re.sub(r'STORED AS.*', '', ddl2).strip()

In [47]:
def clean_ddl(ddl):
    ddl2 = re.sub(r'\s+', ' ', ddl.strip())
    
    ddl2 = re.sub(r'\s+,', ',', ddl2)
    
    ddl2 = re.sub(r';.*$', '', ddl2)
    
    ddl2 = re.sub(r'STORED AS.*', '', ddl2).strip()
    return ddl2

'CREATE EXTERNAL TABLE IF NOT EXISTS default.WaterPump ( yyyymmdd BIGINT, office_wwcode BIGINT, office_mcode BIGINT, water_type_code STRING, water_type_name STRING, water_id BIGINT, water_name STRING, plant_id BIGINT, plant_name STRING, untreated_water DOUBLE, menu_water DOUBLE )'

In [49]:
cursor = conn.cursor()

cursor.execute(ddl)

OperationalError: TExecuteStatementResp(status=TStatus(statusCode=3, infoMessages=["*org.apache.hive.service.cli.HiveSQLException:Error while compiling statement: FAILED: ParseException line 15:25 extraneous input ';' expecting EOF near '<EOF>':17:16", 'org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:400', 'org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:187', 'org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:271', 'org.apache.hive.service.cli.operation.Operation:run:Operation.java:337', 'org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:439', 'org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:HiveSessionImpl.java:405', 'org.apache.hive.service.cli.CLIService:executeStatement:CLIService.java:257', 'org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:501', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1313', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1298', 'org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39', 'org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39', 'org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56', 'org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286', 'java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149', 'java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624', 'java.lang.Thread:run:Thread.java:748', "*org.apache.hadoop.hive.ql.parse.ParseException:line 15:25 extraneous input ';' expecting EOF near '<EOF>':21:5", 'org.apache.hadoop.hive.ql.parse.ParseDriver:parse:ParseDriver.java:212', 'org.apache.hadoop.hive.ql.parse.ParseDriver:parse:ParseDriver.java:166', 'org.apache.hadoop.hive.ql.Driver:compile:Driver.java:522', 'org.apache.hadoop.hive.ql.Driver:compileInternal:Driver.java:1356', 'org.apache.hadoop.hive.ql.Driver:compileAndRespond:Driver.java:1343', 'org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:185'], sqlState='42000', errorCode=40000, errorMessage="Error while compiling statement: FAILED: ParseException line 15:25 extraneous input ';' expecting EOF near '<EOF>'"), operationHandle=None)

In [50]:
ddl

"CREATE EXTERNAL TABLE IF NOT EXISTS default.WaterPump (\n    yyyymmdd BIGINT ,\n    office_wwcode BIGINT ,\n    office_mcode BIGINT ,\n    water_type_code STRING ,\n    water_type_name STRING ,\n    water_id BIGINT ,\n    water_name STRING ,\n    plant_id BIGINT ,\n    plant_name STRING ,\n    untreated_water DOUBLE ,\n    menu_water DOUBLE \n)\nSTORED AS PARQUET\nLOCATION '/user/bdaadmin';\n\n"