<a href="https://colab.research.google.com/github/mansueli/Supa-Migrate/blob/mansueli-patch-1/mongo2supabase.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![Supabase](https://raw.githubusercontent.com/supabase/supabase/master/packages/common/assets/images/supabase-logo-wordmark--light.svg)



## Installing the requirements




In [9]:
!pip install mongo
!pip install psycopg2



## Set the connection URIs:

In [10]:
#Source DB variables:
%env supabase_uri=
%env mongo_uri=
# Leave this field empty to run for all databases
%env mongo_db=sample_mflix
#Examples:
#%env supabase_uri=postgresql://postgres:password@db.ref.supabase.co:5432/postgres
#%env mongo_uri=mongodb+srv://nacho:password@cluster001.xxxx.mongodb.net/?retryWrites=true&w=majority
#%env mongo_db=sample_analytics


#Source DB variables:
%env supabase_uri=postgresql://postgres:MXJBqrlHYkPGcMRr@db.lpghttpzbkfdjuwnirei.supabase.co:5432/postgres
%env mongo_uri=mongodb+srv://rodrigo:OQ3YBH7blAfgBYsF@cluster012.jf3yl2c.mongodb.net/?retryWrites=true&w=majority

env: supabase_uri=
env: mongo_uri=
env: mongo_db=sample_mflix
env: supabase_uri=postgresql://postgres:MXJBqrlHYkPGcMRr@db.lpghttpzbkfdjuwnirei.supabase.co:5432/postgres
env: mongo_uri=mongodb+srv://rodrigo:OQ3YBH7blAfgBYsF@cluster012.jf3yl2c.mongodb.net/?retryWrites=true&w=majority


In [None]:
#@title #Running the Migration: { display-mode: "form" }
from bson.decimal128 import Decimal128
import pymongo
import psycopg2
from psycopg2.extensions import AsIs
import json
from datetime import datetime
from psycopg2 import sql, extensions, connect, Error
from bson import ObjectId
import os

mongo_url = os.environ['mongo_uri']
supabase_url = os.environ['supabase_uri']

class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

psycopg2.extensions.register_adapter(Decimal128, lambda val: AsIs(str(val.to_decimal())))

# Connect to MongoDB
mongo_client = pymongo.MongoClient(mongo_url)
# Connect to PostgreSQL
pg_conn = connect(supabase_url)
pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
pg_cur = pg_conn.cursor()

# Mapping MongoDB types to PostgreSQL types
SQL_DATA_TYPE = {
    "string": "TEXT",
    "ObjectId": "TEXT",
    "datetime": "TIMESTAMP WITH TIME ZONE",
    "int": "INT",
    "list": "JSONB",
    "dict": "JSONB",
    "bool": "Boolean",
    "float": "NUMERIC",
    "default": "TEXT",
}

# Store the type of each field
field_types = {}

# Get the list of database names from MongoDB
mongo_db_manual = os.environ['mongo_db']
mongo_db_names = []
if(len(mongo_db_manual)>0):
  mongo_db_names.append(mongo_db_manual)
else:
  mongo_db_names = mongo_client.list_database_names()

# Iterate over all MongoDB databases
for db_name in mongo_db_names:
    print("Starting to migrate :"+ str(db_name))
    mongo_db = mongo_client[db_name]

    # Iterate over all collections in the current database
    for collection_name in mongo_db.list_collection_names():
        # Skip system collections
        if collection_name.startswith("system."):
            continue

        collection = mongo_db[collection_name]
        # Create table in PostgreSQL if it doesn't exist
        pg_cur.execute(sql.SQL("CREATE TABLE IF NOT EXISTS {} ()").format(
            sql.Identifier(collection_name)))

        # Iterate over all documents in the collection
        cursor = collection.find()
        for document in cursor:
            # For each document, build a list of fields and a list of values
            fields = []
            values = []
            for field, value in document.items():
                # Determine PostgreSQL type based on Python type
                if isinstance(value, ObjectId):
                    pg_type = SQL_DATA_TYPE["ObjectId"]
                    value = str(value)
                else:
                    pg_type = SQL_DATA_TYPE.get(type(value).__name__, SQL_DATA_TYPE["default"])

                # Add type suffix to field name if a new type is encountered
                field_with_type = field
                if field in field_types:
                    if type(value).__name__ not in field_types[field]:
                        field_types[field].add(type(value).__name__)
                        field_with_type = f"{field}_{type(value).__name__}"
                else:
                    field_types[field] = {type(value).__name__}

                # Add column in PostgreSQL if it doesn't exist
                try:
                    pg_cur.execute(sql.SQL("ALTER TABLE {} ADD COLUMN {} {}").format(
                        sql.Identifier(collection_name),
                        sql.Identifier(field_with_type),
                        sql.SQL(pg_type)))
                except Error:
                    pass  # Column already exists, no action needed

                # Add field and value to the lists
                fields.append(sql.Identifier(field_with_type))
                if isinstance(value, list) or isinstance(value, dict):
                    value = json.dumps(value, cls=DateTimeEncoder)
                values.append(value)

            # Insert data into PostgreSQL
            pg_cur.execute(sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
                sql.Identifier(collection_name),
                sql.SQL(', ').join(fields),
                sql.SQL(', ').join(sql.Placeholder() * len(values))),
                values)

pg_cur.close()
pg_conn.close()


## Alternative approach (greedy)

## Mega greedy

In [12]:
from bson.decimal128 import Decimal128
import pymongo
import psycopg2
from psycopg2.extensions import AsIs
import json
from datetime import datetime
from psycopg2 import sql, extensions, connect, Error
from bson import ObjectId
import os

mongo_url = os.environ['mongo_uri']
supabase_url = os.environ['supabase_uri']

class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, ObjectId):
            return str(obj)
        if isinstance(obj, datetime):
            return obj.isoformat()
        if isinstance(obj, Decimal128):
            return str(obj)
        if isinstance(obj, complex):
            return [obj.real, obj.imag]
        return json.JSONEncoder.default(self, obj)


psycopg2.extensions.register_adapter(Decimal128, lambda val: AsIs(str(val.to_decimal())))

# Connect to MongoDB
mongo_client = pymongo.MongoClient(mongo_url)
# Connect to PostgreSQL
pg_conn = connect(supabase_url)
pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
pg_cur = pg_conn.cursor()

# Mapping MongoDB types to PostgreSQL types
SQL_DATA_TYPE = {
    "str": "TEXT",
    "ObjectId": "TEXT",
    "datetime.datetime": "TIMESTAMP WITH TIME ZONE",
    "datetime": "TIMESTAMP WITH TIME ZONE",
    "int": "INT",
    "list": "JSONB",
    "dict": "JSONB",
    "bool": "Boolean",
    "float": "NUMERIC",
    "default": "TEXT",
    "NoneType": "TEXT",
    "Decimal128": "NUMERIC",
}

# Store the type of each field
field_types = {}

# Function to check if a table exists in PostgreSQL
def table_exists(cursor, table_name):
    cursor.execute(
        sql.SQL("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = %s)"),
        [table_name]
    )
    return cursor.fetchone()[0]

# Function to reprocess a failed batch
def reprocess_batch(collection_name, batch):
    # Initialize fields and values lists
    for document in batch:
        if isinstance(document, str):
            continue  # Skip string documents, as they can't be processed

        fields = []
        values = []

        # Iterate through documents in the batch
        for field, value in document.items():
            # Determine PostgreSQL type based on Python type
            if isinstance(value, ObjectId):
                pg_type = SQL_DATA_TYPE["ObjectId"]
                value = str(value)
            else:
                pg_type = SQL_DATA_TYPE.get(type(value).__name__, SQL_DATA_TYPE["default"])

            # Add type suffix to field name if a new type is encountered
            field_with_type = field
            if field in field_types:
                if type(value).__name__ not in field_types[field]:
                    field_types[field].add(type(value).__name__)
                    field_with_type = f"{field}_{type(value).__name__}"
            else:
                field_types[field] = {type(value).__name__}

            # Add column in PostgreSQL if it doesn't exist
            try:
                pg_cur.execute(sql.SQL("ALTER TABLE {} ADD COLUMN {} {}").format(
                    sql.Identifier(collection_name),
                    sql.Identifier(field_with_type),
                    sql.SQL(pg_type)))
            except Error:
                pass  # Column already exists, no action needed

            # Add field and value to the lists
            fields.append(sql.Identifier(field_with_type))
            if isinstance(value, list) or isinstance(value, dict):
                value = json.dumps(value, cls=CustomEncoder)
            values.append(value)

        try:
            pg_cur.execute(
                sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
                    sql.Identifier(collection_name),
                    sql.SQL(', ').join(fields),
                    sql.SQL(', ').join(sql.Placeholder() * len(values))),
                values)
        except Exception as e:
            print("Error during migration:", str(e))
            # Handle the error appropriately


# Get the list of database names from MongoDB
mongo_db_manual = os.environ['mongo_db']
mongo_db_names = []
if len(mongo_db_manual) > 0:
    mongo_db_names.append(mongo_db_manual)
else:
    mongo_db_names = mongo_client.list_database_names()

# Iterate over all MongoDB databases
for db_name in mongo_db_names:
    print("Starting to migrate: " + str(db_name))
    mongo_db = mongo_client[db_name]

    # Iterate over all collections in the current database
    for collection_name in mongo_db.list_collection_names():
        # Skip system collections
        if collection_name.startswith("system."):
            continue

        collection = mongo_db[collection_name]
        # Create table in PostgreSQL if it doesn't exist
        if not table_exists(pg_cur, collection_name):
            pg_cur.execute(sql.SQL("CREATE TABLE {} ()").format(
                sql.Identifier(collection_name)))

        # Process each batch of documents
        documents = collection.find().limit(250)
        first_flag = True
        batch_values = []  # To store values for batch insertion
        for document in documents:
            try:
                # For each document, build a list of fields and a list of values
                fields = []
                values = []
                for field, value in document.items():
                    # Determine PostgreSQL type based on Python type
                    if first_flag:
                        if isinstance(value, ObjectId):
                            pg_type = SQL_DATA_TYPE["ObjectId"]
                            value = str(value)
                        else:
                            pg_type = SQL_DATA_TYPE.get(type(value).__name__, SQL_DATA_TYPE["default"])
                        # Add type suffix to field name if a new type is encountered
                        field_with_type = field
                        if field in field_types:
                            if type(value).__name__ not in field_types[field]:
                                field_types[field].add(type(value).__name__)
                                field_with_type = f"{field}_{type(value).__name__}"
                        else:
                            field_types[field] = {type(value).__name__}
                        # Add column in PostgreSQL if it doesn't exist
                        try:
                            pg_cur.execute(sql.SQL("ALTER TABLE {} ADD COLUMN {} {}").format(
                                sql.Identifier(collection_name),
                                sql.Identifier(field_with_type),
                                sql.SQL(pg_type)))
                        except Error:
                            pass  # Column already exists, no action needed
                    else:
                        # For non-first documents, simply prepare values for batch insertion
                        # Handle ObjectId type separately
                        if isinstance(value, ObjectId):
                            pg_type = SQL_DATA_TYPE["ObjectId"]
                            value = str(value)
                        else:
                            pg_type = SQL_DATA_TYPE.get(type(value).__name__, SQL_DATA_TYPE["default"])
                        # Add field and value to the lists
                        fields.append(sql.Identifier(field_with_type))
                        if isinstance(value, list) or isinstance(value, dict):
                            value = json.dumps(value, cls=CustomEncoder)
                        values.append(value)

                # If it's not the first document, add values to the batch
                if not first_flag:
                    batch_values.append(values)

            except Exception as e:
                print("Error during migration:", str(e))
            finally:
                first_flag = False  # Set the flag to False after processing the first document

        # After processing all documents, insert batch_values into the database
        if batch_values:
            try:
                placeholders = sql.SQL(', ').join(sql.Placeholder() * len(fields))
                insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
                    sql.Identifier(collection_name),
                    sql.SQL(', ').join(fields),
                    placeholders)

                # Execute the insert query for each set of values in batch_values
                for values in batch_values:
                    pg_cur.execute(insert_query, values)
            except Exception as e:
                print("Error when trying to insert:", str(e))
                reprocess_batch(collection, batch)

Starting to migrate: sample_mflix
Error when trying to insert: tuple index out of range
Error when trying to insert: syntax error at or near "'5a9427648b0beebeb6957a22'"
LINE 1: ...", "date", "date", "date", "date", "date") VALUES '5a9427648...
                                                             ^

Error when trying to insert: syntax error at or near "'59a47286cfa9a3a73e51e734'"
LINE 1: ...ters" ("location", "location", "location") VALUES '59a47286c...
                                                             ^

Error when trying to insert: syntax error at or near "'59b99dbacfa9a34dcd7885c1'"
LINE 1: ...word", "password", "password", "password") VALUES '59b99dbac...
                                                             ^

