<a href="https://colab.research.google.com/github/mansueli/mongo-2-postgres/blob/main/mongodump2postgres.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![Supabase](https://raw.githubusercontent.com/supabase/supabase/master/packages/common/assets/images/supabase-logo-wordmark--light.svg)



## Connection URIs

In [None]:
#Source DB variables:
%env supabase_uri=postgresql://postgres:<pass>@db.<ref>.supabase.co:5432/postgres
%env mongo_uri=mongodb+srv://<user>:<ref>@cluster012.mongodb.net/?retryWrites=true&w=majority

## Setting up the environment

In [1]:
!pip install mongo &>log
!pip install psycopg2 &>log
!sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
!wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
!wget --quiet -O - https://pgp.mongodb.com/server-7.0.asc | sudo gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor
!echo "deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/7.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-7.0.list
!sudo apt-get update &>log
!sudo apt -y install postgresql &>log
!sudo apt install mongodb-org-tools &>log

OK
deb [ arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/7.0 multiverse


## Creating the mongo Backup:

In [3]:
!mongodump --uri "$mongo_uri" -o ./mongo-backup &>log

## Running the migration

In [None]:
import os
import bson
import json
import psycopg2
from psycopg2 import sql, extensions, connect
from datetime import datetime
from bson.decimal128 import Decimal128

# Define PostgreSQL connection
postgres_conn_string = os.environ['supabase_uri']
pg_conn = connect(postgres_conn_string)
pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
pg_cur = pg_conn.cursor()

# Mapping MongoDB types to PostgreSQL types
SQL_DATA_TYPE = {
  "str": "TEXT",
  "ObjectId": "TEXT",
  "datetime.datetime": "TIMESTAMP WITH TIME ZONE",
  "datetime": "TIMESTAMP WITH TIME ZONE",
  "int": "INT",
  "list": "JSONB",
  "dict": "JSONB",
  "bool": "Boolean",
  "float": "NUMERIC",
  "default": "TEXT",
  "NoneType":"TEXT",
  "Decimal128":"NUMERIC",
}

# Define your backup directory here
mongo_backup_dir = "./mongo-backup"

# Function to handle circular references within lists and dictionaries
def detect_and_handle_circular_references(obj, seen=None):
    if seen is None:
        seen = set()
    obj_id = id(obj)
    if obj_id in seen:
        return 'CircularReference'  # Replace circular references with a marker
    seen.add(obj_id)
    if isinstance(obj, list):
        return [detect_and_handle_circular_references(item, seen) for item in obj]
    elif isinstance(obj, dict):
        return {key: detect_and_handle_circular_references(value, seen) for key, value in obj.items()}
    return obj

# Function to convert Decimal128 to string within lists and dictionaries
def convert_decimal_to_str(obj):
    if isinstance(obj, Decimal128):
        return str(obj)
    return obj

# Iterate over all databases in the MongoDB backup directory
for database_name in os.listdir(mongo_backup_dir):
    # Skip non-directory entries
    if not os.path.isdir(os.path.join(mongo_backup_dir, database_name)):
        continue

    # Iterate over all collections in the current database directory
    for collection_file_name in os.listdir(os.path.join(mongo_backup_dir, database_name)):
        # Ignore files that do not end in .bson
        if not collection_file_name.endswith('.bson'):
            continue

        collection_name = collection_file_name.replace(".bson", "")

        bson_file_path = os.path.join(mongo_backup_dir, database_name, collection_file_name)
        with open(bson_file_path, 'rb') as f:
            data = bson.decode_all(f.read())

        # Store the type of each field
        field_types = {}

        # For each document in the collection, infer the schema
        for doc in data:
            for field, value in doc.items():
                # Determine PostgreSQL type based on the Python type
                pg_type = SQL_DATA_TYPE.get(str(type(value)), SQL_DATA_TYPE["default"])
                field_with_type = f"{field}"

                if field not in field_types:
                    field_types[field] = {field_with_type}
                elif field_with_type not in field_types[field]:
                    field_types[field].add(field_with_type)

        # Construct table name as "database_collection"
        table_name = f"{database_name}_{collection_name}"

        # Create table command
        pg_cur.execute(
            sql.SQL("CREATE TABLE IF NOT EXISTS {} ()").format(
                sql.Identifier(table_name)))

        # Add new columns to the existing table
        for field_with_type_set in field_types.values():
            for field_with_type in field_with_type_set:
                try:
                    pg_cur.execute(sql.SQL("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}").format(
                        sql.Identifier(table_name),
                        sql.Identifier(field_with_type),
                        sql.SQL(SQL_DATA_TYPE["default"]))  # Use the default data type
                    )
                except psycopg2.errors.DuplicateColumn:
                    pass  # Column already exists


        # Serialize and save data for each iteration
        with open(table_name+'.csv', 'w') as f:
            # Write the header line with column names enclosed in double-quotes
            header = [f'"{field_with_type.split("_")[0]}"' for field_with_type_set in field_types.values() for field_with_type in field_with_type_set]
            f.write('\t'.join(header) + '\n')

            for document in data:
                row = []
                for field_with_type_set in field_types.values():
                    field = next(iter(field_with_type_set)).split("_")[0]  # Get the field name
                    for field_with_type in field_with_type_set:
                        try:
                            value = document.get(field, None)  # Use None if field is not present

                            # Continue with the rest of the serialization logic
                            if isinstance(value, datetime):  # Serialize datetime objects to string
                                value = value.isoformat()
                            if value is not None:
                                value = str(value)  # Convert None to a string representation
                            else:
                                value = '\\N'  # Use "\\N" for NULL values
                        except Exception as e:
                            value_before = f"Value before serialization: {document.get(field, None)}"
                            value_after = f"Value after serialization: {value}"
                            print(f"Error for Field: {field}")
                            print(value_before)
                            print(value_after)
                            raise e  # Re-raise the exception to halt processing
                        row.append(value)
                f.write('\t'.join(row) + '\n')

        # Construct a list of non-empty column names enclosed in double-quotes
        non_empty_columns = [f'"{field}"' for field in field_types.keys()]

        # Execute COPY STDIN command in PSQL with correct column names and double-quotes
        pg_conn.close()  # Close the connection before the shell command
        column_str = ', '.join(non_empty_columns)
        psql_str = f"psql {postgres_conn_string} -c '\\copy public.\"{table_name}\" FROM {table_name}.csv' WITH CSV HEADER DELIMITER as '\t'"
        #print("!" + psql_str)
        os.system(psql_str)
        pg_conn = connect(postgres_conn_string)  # Reopen it afterward
        pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        pg_cur = pg_conn.cursor()

# Close PostgreSQL connection
pg_cur.close()
pg_conn.close()
