In [1]:
import geopandas as gpd
from pipeline.utils.db_conn import db_connection
from pipeline.utils.db_conn import *
from pipeline.utils.read_sql import read_sql_file
import warnings
import os
from dotenv import load_dotenv
warnings.filterwarnings('ignore')

load_dotenv()

DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
DIR_EXTRACT_QUERY = os.getenv("DIR_EXTRACT_QUERY")

try:
    tables_to_extract = ['public.pub_bangunan', 
                            'public.pub_pengawasan_dan_penindakan']

    src_engine, _ = db_connection()

    extract_query = read_sql_file(
            file_path = f'{DIR_EXTRACT_QUERY}/all-tables.sql'
        )

    for index, table_name in enumerate(tables_to_extract):
        # Read data into DataFrame
        df = gpd.read_postgis(extract_query.format(table_name = table_name), src_engine, geom_col='geometry')

        # Write DataFrame to CSV
        df.to_file(f"{DIR_TEMP_DATA}/{table_name}.geojson", index=False, driver='GeoJSON')

except Exception as e:
    print(f"Error: {e}")

In [2]:
gdf_temp = gpd.read_file(f"{DIR_TEMP_DATA}/{table_name}.geojson")
gdf_temp.head()

Unnamed: 0,id,shape_leng,shape_area,kode_bangu,nama_bangu,alamat,kelurahan,kecamatan,kab_kota,tujuan,...,keluraha_1,kab_kota_1,provinsi_1,ukuran_ban,luas_tanah,jumlah_lan,jumlah_bas,sumber,link_foto,geometry
0,1,125.311981,606.096072,A1,Bangunan A1,Jalan Taman Haji,AA,AA,Jakarta Barat,Non Rumah Tinggal,...,,,,0.0,0.0,0,0,,,"MULTIPOLYGON Z (((11895574.951 -693184.8183 0,..."
1,2,48.334468,142.575286,B1,Bangunan B1,Jalan Taman Haji,BB,BB,Jakarta Barat,Rumah Tinggal,...,,,,0.0,0.0,0,0,,,"MULTIPOLYGON Z (((11895592.9421 -693160.777 0,..."
2,3,51.584819,166.010625,D784,Bangunan D,,,,Jakarta Barat,Rumah Tinggal,...,,,,0.0,0.0,0,0,,,MULTIPOLYGON Z (((11895607.5898 -693050.1235 0...


In [3]:
from pipeline.utils.db_conn import db_connection
from pipeline.utils.read_sql import read_sql_file
from sqlalchemy.orm import sessionmaker
import sqlalchemy
import pandas as pd
import geopandas as gpd
import os
import logging
from geoalchemy2 import Geography, Geometry  # Pastikan diimpor dengan benar
from pipeline.utils.read_list_schema import list_schemas, list_tables


# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

try:
    # Define db connection engine
    _, dwh_engine = db_connection()

    # Define DIR
    DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
    DIR_LOAD_QUERY = os.getenv("DIR_LOAD_QUERY")

    #-------------------------------Truncate tables in public schema ------------------------
    # Read query to truncate public schema in dwh
    truncate_query = read_sql_file(
        file_path=f'{DIR_LOAD_QUERY}/public-truncate_table.sql'
    )

    # Split the SQL queries if multiple queries are present
    truncate_query = truncate_query.split(';')

    # Remove newline characters and leading/trailing whitespaces
    truncate_query = [query.strip() for query in truncate_query if query.strip()]

    # Create session
    Session = sessionmaker(bind=dwh_engine)
    session = Session()

    # Execute each query
    for query in truncate_query:
        query = sqlalchemy.text(query)
        session.execute(query)

    # Commit the transaction
    session.commit()

    # Close session
    session.close()

    #-------------------------------Part Of Load to public schema ------------------------
    # Data to be loaded into public schema
    bangunan = gpd.read_file(f'{DIR_TEMP_DATA}/public.pub_bangunan.geojson')
    pengawasan = gpd.read_file(f'{DIR_TEMP_DATA}/public.pub_pengawasan_dan_penindakan.geojson')

    # Reproject
    # bangunan_rep = bangunan.to_crs(epsg=3857)
    # pengawasan = pengawasan.to_crs(epsg=3857)

    dtype = {'geometry': Geometry(geometry_type='MULTIPOLYGON', srid=4326)}

    # Load to public schema
    # Load category tables
    bangunan_rep.to_postgis('bangunan',
                        con=dwh_engine,
                        if_exists='append',
                        index=False,
                        schema='public',
                        dtype=dtype)

    # Load subcategory tables
    bangunan_query = read_sql_file(
        file_path=f'{DIR_LOAD_QUERY}/stg-bangunan.sql'
    )

    ## Load Into Staging schema
    # List query
    load_stg_queries = [bangunan_query]

    # Create session
    Session = sessionmaker(bind=dwh_engine)
    session = Session()

    # Execute each query
    for query in load_stg_queries:
        query = sqlalchemy.text(query)
        session.execute(query)

    session.commit()

    # Close session
    session.close()

except Exception as e:
    logger.error(f"Error loading data: {e}", exc_info=True)

ERROR:__main__:Error loading data: (psycopg2.errors.UndefinedTable) relation "public.bangunan" does not exist

[SQL: TRUNCATE TABLE public.bangunan CASCADE]
(Background on this error at: https://sqlalche.me/e/20/f405)
Traceback (most recent call last):
  File "/home/ubuntu/datawh/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/home/ubuntu/datawh/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 941, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UndefinedTable: relation "public.bangunan" does not exist


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/tmp/ipykernel_3111152/3093129159.py", line 44, in <module>
    session.execute(query)
  File "/home/ubuntu/datawh/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
    return self._execute_internal(
  File "/home/ubuntu/dataw

In [6]:

try:
    # Define db connection engine
    _, dwh_engine = db_connection()
    
except Exception as e:
    logger.error(f"Error loading data: {e}", exc_info=True)

In [7]:
dwh_engine

Engine(postgresql://postgres:***@localhost:5434/dwh)

In [14]:
from sqlalchemy import create_engine, inspect
def list_tables(engine):
    try:
        # Use SQLAlchemy's inspect function to get table information
        inspector = inspect(engine)
        tables = inspector.get_table_names()
        return tables
    except Exception as e:
        logger.error(f"Error listing tables: {e}", exc_info=True)
        return []

tables = list_tables(dwh_engine)



['bangunan', 'spatial_ref_sys', 'pengawasan_dan_penindakan']

In [17]:
import logging
from sqlalchemy import create_engine, inspect

# Initialize logger
logger = logging.getLogger(__name__)

def db_connection():
    # Replace with your actual connection string
    connection_string = 'postgresql://postgres:mypassword@localhost:5434/dwh'
    
    # Create an engine
    engine = create_engine(connection_string)
    
    return engine

def read_sql_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()

def list_tables(engine, schema):
    try:
        inspector = inspect(engine)
        tables = inspector.get_table_names(schema=schema)
        return tables
    except Exception as e:
        logger.error(f"Error listing tables in schema {schema}: {e}", exc_info=True)
        return []

def truncate_tables(engine, schema, tables):
    try:
        Session = sessionmaker(bind=engine)
        session = Session()
        
        for table in tables:
            query = sqlalchemy.text(f'TRUNCATE TABLE {schema}.{table} CASCADE;')
            logger.info(f"Executing query: {query}")
            session.execute(query)
        
        session.commit()
        session.close()
    except Exception as e:
        logger.error(f"Error truncating tables in schema {schema}: {e}", exc_info=True)

try:
    # Define db connection engine
    dwh_engine = db_connection()

    # Define DIR
    DIR_TEMP_DATA = os.getenv("DIR_TEMP_DATA")
    DIR_LOAD_QUERY = os.getenv("DIR_LOAD_QUERY")

    # Read query to truncate public schema in dwh
    truncate_query = read_sql_file(
        file_path=f'{DIR_LOAD_QUERY}/public-truncate_table.sql'
    )

    # Split the SQL queries if multiple queries are present
    truncate_query = truncate_query.split(';')

    # Remove newline characters and leading/trailing whitespaces
    truncate_query = [query.strip() for query in truncate_query if query.strip()]

    # Create session
    Session = sessionmaker(bind=dwh_engine)
    session = Session()

    # Execute each query
    for query in truncate_query:
        query = sqlalchemy.text(query)
        logger.info(f"Executing query: {query}")
        session.execute(query)

    # Commit the transaction
    session.commit()

    # Close session
    session.close()

    # List all tables in the public schema
    public_schema = 'public'
    tables = list_tables(dwh_engine, public_schema)

    # Truncate tables in the public schema
    truncate_tables(dwh_engine, public_schema, tables)

except Exception as e:
    logger.error(f"Error loading data: {e}", exc_info=True)

INFO:__main__:Executing query: TRUNCATE TABLE public.bangunan CASCADE
INFO:__main__:Executing query: TRUNCATE TABLE public.pengawasan_dan_penindakan CASCADE
INFO:__main__:Executing query: TRUNCATE TABLE public.bangunan CASCADE;
INFO:__main__:Executing query: TRUNCATE TABLE public.spatial_ref_sys CASCADE;
INFO:__main__:Executing query: TRUNCATE TABLE public.pengawasan_dan_penindakan CASCADE;
