# Task 2 -  Data Modeling and Transformation (Transform)
**Under this Task the following activities are accomplished
- the scraped JSON files from my data lake was loaded into a raw schema in my PostgreSQL database.
- Installing  dbt and its PostgreSQL adapter and setting  up a DBT project.
- by Initialize a DBT project connection  to  PostgreSQL database.
- Develop dbt Models in Layers:


In [1]:
# import dependencies
import json
from pathlib import Path
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
import logging
import sys 
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

In [2]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('load_json_to_postgres.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

Function Definition:
It defines the function load_json_to_postgres with two arguments:
data_lake_path: The path to the directory where the JSON files are located (default is 'data/raw/telegram_messages').
db_url: The connection URL for the PostgreSQL database (default is 'postgresql://user:password@localhost:5432/kara_db').
Database Connection and Schema Creation:
It uses the sqlalchemy library to create a connection engine to the PostgreSQL database using the provided db_url.
It then connects to the database and executes SQL commands to:
Create a schema named raw if it doesn't already exist.
Create a table named telegram_messages within the raw schema if it doesn't exist. This table is designed to store the data from the JSON files with specific columns like message_id, channel_name, date, text, etc.
JSON File Collection:
It uses the pathlib module to find all JSON files (.json) within the specified data_lake_path and its subdirectories.
If no JSON files are found, it logs a warning and returns 0, indicating that no records were loaded.
Iterating and Loading JSON Files:
It iterates through each found JSON file.
For each file:
It extracts the channel_name and date_str from the file's path based on a presumed directory structure (e.g., data/raw/telegram_messages/YYYY-MM-DD/channel_name/file.json).
It opens and loads the JSON data from the file into a Python list of dictionaries.
It converts this list of dictionaries into a pandas DataFrame.
It adds a channel_name column to the DataFrame with the extracted channel name.
It adds a load_date column with the current timestamp.
It uses the df.to_sql() method to append the DataFrame's data to the telegram_messages table in the raw schema of the PostgreSQL database.
Error Handling and Logging:
It includes try...except blocks to handle potential errors during file processing or database operations.
It uses the logging module to log information about the process (e.g., database connection, number of records loaded from each file) and any errors that occur.
Return Value:
The function returns the total number of records successfully loaded from all the JSON fil

In [14]:
import json
import os
import psycopg2
from psycopg2.extras import Json

# Database connection parameters
db_params = {
    'dbname': 'kara_medical_db',
    'user': 'postgres',
    'password': 'n5090',
    'host': 'localhost',
    'port': '5432'
}

# Data lake directory
data_lake_path = 'F:/Intelligent_Ethiopian_Medical_Business_peplines-/data/raw/telegram_messages'

def create_raw_table():
    try:
        conn = psycopg2.connect(**db_params)
        cur = conn.cursor()
        cur.execute("""
            CREATE SCHEMA IF NOT EXISTS raw;
            DROP TABLE IF EXISTS raw.telegram_messages CASCADE;
            CREATE TABLE raw.telegram_messages (
                id SERIAL PRIMARY KEY,
                json_data JSONB NOT NULL,
                file_name VARCHAR(255),
                channel_name VARCHAR(255),
                loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """)
        conn.commit()
        print("Raw table created successfully.")
    except Exception as e:
        print(f"Error creating raw table: {e}")
    finally:
        cur.close()
        conn.close()

def load_json_files():
    try:
        conn = psycopg2.connect(**db_params)
        cur = conn.cursor()
        json_files = [f for f in os.listdir(data_lake_path) if f.endswith('.json')]
        if len(json_files) != 3:
            print(f"Warning: Found {len(json_files)} JSON files, expected 3: {json_files}")
        for file in json_files:
            file_path = os.path.join(data_lake_path, file)
            # Derive channel_name from file name (e.g., 'channel1' from 'channel1.json')
            channel_name = os.path.splitext(file)[0]
            with open(file_path, 'r', encoding='utf-8') as f:
                try:
                    json_data = json.load(f)
                    # Handle list of messages
                    if isinstance(json_data, list):
                        for item in json_data:
                            cur.execute(
                                "INSERT INTO raw.telegram_messages (json_data, file_name, channel_name) VALUES (%s, %s, %s)",
                                (Json(item), file, channel_name)
                            )
                    else:
                        cur.execute(
                            "INSERT INTO raw.telegram_messages (json_data, file_name, channel_name) VALUES (%s, %s, %s)",
                            (Json(json_data), file, channel_name)
                        )
                except json.JSONDecodeError as je:
                    print(f"Error decoding JSON in {file_path}: {je}")
        conn.commit()
        print("JSON data loaded successfully.")
    except Exception as e:
        print(f"Error loading JSON data: {e}")
    finally:
        cur.close()
        conn.close()

if __name__ == '__main__':
    create_raw_table()
    load_json_files()

Raw table created successfully.
JSON data loaded successfully.


In [None]:
psql -U postgres -d kara_medical_db -c "SELECT 1;"


In [None]:
def load_json_to_postgres(data_lake_path='F:/Intelligent_Ethiopian_Medical_Business_peplines-/data/raw/telegram_messages',
                         db_url='postgresql://postgres:n5090@localhost:5432/kara_db'):
    """
    Load JSON files from the data lake into a PostgreSQL raw schema.

    Args:
        data_lake_path (str): Path to the data lake directory
        db_url (str): PostgreSQL connection URL

    Returns:
        int: Number of records loaded
    """
    try:
        # Initialize database connection
        engine = create_engine(db_url)
        logger.info("Connected to PostgreSQL database")

        # Create raw schema if not exists
        with engine.connect() as conn:
            conn.execute("CREATE SCHEMA IF NOT EXISTS raw")
            conn.execute("""
                CREATE TABLE IF NOT EXISTS raw.telegram_messages (
                    message_id BIGINT,
                    channel_name TEXT,
                    date TIMESTAMP,
                    text TEXT,
                    sender_id BIGINT,
                    views INTEGER,
                    forwards INTEGER,
                    media_path TEXT,
                    load_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            conn.commit()

        # Collect all JSON files
        json_files = list(Path(data_lake_path).glob('**/*.json'))
        if not json_files:
            logger.warning("No JSON files found in data lake")
            return 0

        total_records = 0
        for json_file in json_files:
            try:
                # Extract channel name and date from path
                channel_name = json_file.parent.name
                date_str = json_file.parent.parent.name  # YYYY-MM-DD

                # Load JSON data
                with open(json_file, 'r', encoding='utf-8') as f:
                    messages = json.load(f)

                # Convert to DataFrame
                df = pd.DataFrame(messages)
                df['channel_name'] = channel_name
                df['load_date'] = pd.Timestamp.now()

                # Write to PostgreSQL
                df.to_sql('telegram_messages', engine, schema='raw', if_exists='append', index=False)
                logger.info(f"Loaded {len(df)} records from {json_file}")
                total_records += len(df)

            except Exception as e:
                logger.error(f"Error processing {json_file}: {str(e)}")
                continue

        logger.info(f"Total records loaded: {total_records}")
        return total_records

    except Exception as e:
        logger.error(f"Fatal error in load_json_to_postgres: {str(e)}")
        raise

In [None]:
def main():
    """
    Main function to run the JSON loader in Colab.
    """
    # Replace with your PostgreSQL credentials
    db_url = input("Enter PostgreSQL connection URL (e.g., postgresql://user:password@localhost:5432/): ")
kara_medical_db
    # Load data
    records_loaded = load_json_to_postgres(data_lake_path='F:/Intelligent_Ethiopian_Medical_Business_peplines-/data/raw/telegram_messages', db_url=db_url)

    
if __name__ == "__main__":
    main()

2025-08-19 19:56:38,139 - INFO - Connected to PostgreSQL database
2025-08-19 19:56:38,758 - ERROR - Fatal error in load_json_to_postgres: Not an executable object: 'CREATE SCHEMA IF NOT EXISTS raw'


ObjectNotExecutableError: Not an executable object: 'CREATE SCHEMA IF NOT EXISTS raw'

In [11]:
import os
import json
import psycopg2

# PostgreSQL connection
conn = psycopg2.connect(
    dbname="kara_medical_db",
    user="postgres",
    password="n5090",
    host="localhost",
    port="5432"
)
cur = conn.cursor()

# Create raw schema if not exists
cur.execute("CREATE SCHEMA IF NOT EXISTS raw;")

# Drop old table (optional, good for dev to avoid missing columns issue)
cur.execute("DROP TABLE IF EXISTS raw.telegram_messages;")

# Create raw table with correct schema
cur.execute("""
CREATE TABLE raw.telegram_messages (
    message_id BIGINT,
    sender_id BIGINT,
    message_text TEXT,
    posted_at TIMESTAMPTZ,
    views INT,
    forwards INT,
    media_url TEXT,
    raw_json JSONB,
    PRIMARY KEY (sender_id, message_id)  -- composite key for ON CONFLICT
);
""")
conn.commit()

# Load JSON files
data_dir = "F:/Intelligent_Ethiopian_Medical_Business_peplines-/data/raw/telegram_messages"
for file in os.listdir(data_dir):
    if file.endswith(".json"):
        with open(os.path.join(data_dir, file), "r", encoding="utf-8") as f:
            messages = json.load(f)
            for msg in messages:
                cur.execute("""
                    INSERT INTO raw.telegram_messages
                    (message_id, sender_id, message_text, posted_at, views, forwards, media_url, raw_json)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (sender_id, message_id) DO NOTHING;
                """, (
                    msg.get("message_id"),
                    msg.get("sender_id"),
                    msg.get("text"),
                    msg.get("date"),
                    msg.get("views"),
                    msg.get("forwards"),
                    msg.get("media"),
                    json.dumps(msg)
                ))

conn.commit()
cur.close()
conn.close()



In [13]:

conn = psycopg2.connect(
    dbname="kara_medical_db",
    user="postgres",
    password="n5090",
    host="localhost",
    port="5432"
)
cur = conn.cursor()

cur.execute("SELECT COUNT(*) FROM raw.telegram_messages;")
print("Total messages:", cur.fetchone()[0])

cur.close()
conn.close()


Total messages: 276
