In [10]:
import psycopg2
import os
from datetime import date
import pandas as pd

processing_date = str(date.today()) #execution date -> 'YYYY-MM-DD' or str(date.today())

def save_postgres_to_csv(table, processing_date):
      current_path = os.getcwd()
      source = 'postgres'
      source_path = os.path.join(current_path, source)

      #create source folder if not exists
      try:
            os.mkdir(source_path)
            print(f'path created: ', source_path)
      except:
            pass

      #create specific table folder if not exists
      table_path = os.path.join(source_path, table)

      try:
            os.mkdir(table_path)
            print(f'path created: ', table_path)
      except:
            pass

      date_path = os.path.join(table_path, processing_date)

      #create processing date folder if not exists
      try:
            os.mkdir(date_path)
            print(f'path created: ', date_path)
      except:
            pass

      filename = f"{table}.csv"      
      final_path = os.path.join(date_path, filename)
    
      cur = conn.cursor()
      sql = f"COPY (select * from {table}) TO STDOUT WITH CSV HEADER"  
      
      with open(final_path, "w", encoding = 'utf-8') as file:
            cur.copy_expert(sql, file)
            file.close     

def save_csv_to_csv(table, processing_date):
      current_path = os.getcwd()
      existing_file_path = f'data\{table}.csv'
      csv_source_path = os.path.join(current_path, existing_file_path)

      csv_reader = pd.read_csv(csv_source_path, sep=',')
      
      source = 'csv'
      source_path = os.path.join(current_path, source)

      #create source folder if not exists
      try:
            os.mkdir(source_path)
            print(f'path created: ', source_path)
      except:
            pass

      #create specific table folder if not exists
      table_path = os.path.join(source_path, table)

      try:
            os.mkdir(table_path)
            print(f'path created: ', table_path)
      except:
            pass

      date_path = os.path.join(table_path, processing_date)

      #create processing date folder if not exists
      try:
            os.mkdir(date_path)
            print(f'path created: ', date_path)
      except:
            pass

      filename = f"{table}.csv"
      final_path = os.path.join(date_path, filename)
    
      csv_reader.to_csv(final_path, sep = ',', index = False, encoding = 'utf-8')

try:
      conn = psycopg2.connect(host = "localhost", database="northwind",
                        user="northwind_user", password="thewindisblowing")
except Exception as e:
      print(e)

if conn is not None:
      print('Connection established to PostgreSQL')
      postgres_tables = ['categories', 'customer_customer_demo', 'customer_demographics', 'customers', 'employee_territories', 'employees',
                   'orders', 'products', 'region', 'shippers', 'suppliers', 'territories', 'us_states']

      for table in postgres_tables:
            print(f"Processing the {table} table")
            save_postgres_to_csv(table, processing_date)
      conn.close()

csv_table = 'order_details'
save_csv_to_csv(csv_table, processing_date)

Connection established to PostgreSQL
Processing the categories table
Processing the customer_customer_demo table
Processing the customer_demographics table
Processing the customers table
Processing the employee_territories table
Processing the employees table
Processing the orders table
Processing the products table
Processing the region table
Processing the shippers table
Processing the suppliers table
Processing the territories table
Processing the us_states table


In [11]:
import pymongo
import os
from datetime import date
import pandas as pd

processing_date = str(date.today()) #execution date -> 'YYYY-MM-DD' or str(date.today())

def csv_to_mongo(table, source, processing_date):
    current_path = os.getcwd()
    filename = f"{table}.csv"
    file_path = os.path.join(current_path, source, table, processing_date, filename)
    
    #validate step 1 execution
    try: 
        data = pd.read_csv(file_path, sep = ',', encoding='utf-8')
    except:
        raise Exception(f"not able to read csv {table} table from the {processing_date} date")

    client = pymongo.MongoClient("mongodb+srv://northwind_user:thewindisblowing@cluster0.ti9zwzb.mongodb.net/?retryWrites=true&w=majority")

    if data.empty:
        print(f'{table} is an empty table, not added to the mongo collection')
    else:
        #create mongodb database if not exists
        if "northwind_db" not in client.list_database_names():
            client["northwind_db"]
            print("northwind_db dabatase created")

        #create or overwrite mongodb collection 
        db = client["northwind_db"]
        collection = db[f"{table}_{processing_date}_collection"]
        
        if f"{table}_{processing_date}_collection" not in client["northwind_db"].list_collection_names():
            collection
            print(f"{table}_{processing_date}_collection created")
        else:
            collection.drop()
            collection
            print(f"{table}_{processing_date}_collection overwritten")

        collection.insert_many(data.to_dict('records'))

    client.close()

table_names = ['categories', 'customer_customer_demo', 'customer_demographics', 'customers', 'employee_territories', 'employees',
                   'order_details', 'orders', 'products', 'region', 'shippers', 'suppliers', 'territories', 'us_states']

for table in table_names:
    print(f'Processing the {table} table')
    if table == 'order_details':
        csv_to_mongo(table, 'csv', processing_date)
    else:
        csv_to_mongo(table, 'postgres', processing_date)

Processing the categories table
categories_2023-01-31_collection overwritten
Processing the customer_customer_demo table
customer_customer_demo is an empty table, not added to the mongo collection
Processing the customer_demographics table
customer_demographics is an empty table, not added to the mongo collection
Processing the customers table
customers_2023-01-31_collection overwritten
Processing the employee_territories table
employee_territories_2023-01-31_collection overwritten
Processing the employees table
employees_2023-01-31_collection overwritten
Processing the order_details table
order_details_2023-01-31_collection overwritten
Processing the orders table
orders_2023-01-31_collection overwritten
Processing the products table
products_2023-01-31_collection overwritten
Processing the region table
region_2023-01-31_collection overwritten
Processing the shippers table
shippers_2023-01-31_collection overwritten
Processing the suppliers table
suppliers_2023-01-31_collection overwrit

In [13]:
import pymongo
import pandas as pd

client = pymongo.MongoClient("mongodb+srv://northwind_user:thewindisblowing@cluster0.ti9zwzb.mongodb.net/?retryWrites=true&w=majority")

db = client["northwind_db"]

table = 'orders'
collection = db[f"{table}_{processing_date}_collection"]
query = {}
cursor = collection.find(query)
orders_schema = {
    'order_id': 'string',
    'customer_id': 'string',
    'employee_id': 'string',
    'order_date': 'datetime64[ns]',
    'required_date': 'datetime64[ns]',
    'shipped_date': 'datetime64[ns]',
    'ship_via': 'string',
    'freight': 'float64',
    'ship_name': 'string',
    'ship_address': 'string',
    'ship_city': 'string',
    'ship_region': 'string',
    'ship_postal_code': 'string',
    'ship_country': 'string'
}
orders = pd.DataFrame(list(cursor)).drop(['_id'], axis=1).astype(orders_schema)

table = 'order_details'
collection = db[f"{table}_{processing_date}_collection"]
query = {}
cursor = collection.find(query)
order_details_schema = {
    'order_id': 'string', 
    'product_id': 'string',  
    'unit_price': 'float64',
    'quantity': 'int64',  
    'discount': 'float64'
}
order_details = pd.DataFrame(list(cursor)).drop(['_id'], axis=1).astype(order_details_schema)

final_df = orders.join(order_details.set_index('order_id'), on = 'order_id', how = 'left')

current_path = os.getcwd()
filename = f"full_order_table.csv"
file_path = os.path.join(current_path, filename)

final_df.to_csv(file_path, sep = ',', index = False, encoding = 'utf-8')

final_df

Unnamed: 0,order_id,customer_id,employee_id,order_date,required_date,shipped_date,ship_via,freight,ship_name,ship_address,ship_city,ship_region,ship_postal_code,ship_country,product_id,unit_price,quantity,discount
0,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,11,14.00,12,0.00
0,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,42,9.80,10,0.00
0,10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,72,34.80,5,0.00
1,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,14,18.60,9,0.00
1,10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,51,42.40,40,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
829,11077,RATTC,1,1998-05-06,1998-06-03,NaT,2,8.53,Rattlesnake Canyon Grocery,2817 Milton Dr.,Albuquerque,NM,87110,USA,64,33.25,2,0.03
829,11077,RATTC,1,1998-05-06,1998-06-03,NaT,2,8.53,Rattlesnake Canyon Grocery,2817 Milton Dr.,Albuquerque,NM,87110,USA,66,17.00,1,0.00
829,11077,RATTC,1,1998-05-06,1998-06-03,NaT,2,8.53,Rattlesnake Canyon Grocery,2817 Milton Dr.,Albuquerque,NM,87110,USA,73,15.00,2,0.01
829,11077,RATTC,1,1998-05-06,1998-06-03,NaT,2,8.53,Rattlesnake Canyon Grocery,2817 Milton Dr.,Albuquerque,NM,87110,USA,75,7.75,4,0.00
