## Load Data in Chunks

Let us understand the overall process of loading CSV data to MongoDB with attribute level mapping in chunks.
* Read data from file into a Pandas Dataframe.
* Drop the fields that are not required and rename the fields as per the target structure defined.
* Load the data to MongoDB using Bulk load. We can also load the data in chunks.
* While loading the data to target database table, it might be better to load the data in chunks. For example, if we have 10,000 records to be loaded, it is a good practice to load in smaller chunks. The chunk size will be determined considering several factors.
* For this demo, we will be loading 6 records at a time rather than loading all the 20 records in one shot.

In [None]:
import pandas as pd
customers = pd.read_csv('/data/ecomm/customers/part-00000')

* Get list of fields that need to be dropped as well as mapping between source and target columns.

In [None]:
column_mapping_str = '''{
    "customer_first_name": {"target_field_name": "FirstName", "is_required": true},
    "customer_last_name": {"target_field_name": "LastName", "is_required": true},
    "customer_email": {"target_field_name": "Email", "is_required": true},
    "product_name": {"is_required": false},
    "product_subscription": {"is_required": false}
}'''

import json
column_mapping = json.loads(column_mapping_str)

# Assigning the list of not required fields to a variable
columns_to_be_dropped = dict(
    list(
        filter(
            lambda col: not col[1]['is_required'], 
            column_mapping.items()
        )
    )
).keys()

required_columns_list = list(
    filter(
        lambda col: col[1]['is_required'], 
        column_mapping.items()
    )
)

required_columns_mapping = dict(
    map(
        lambda col: (col[0], col[1]['target_field_name']), 
        required_columns_list
    )
)

* Process the data to drop the columns that are not required and columns are renamed as per target structure.

In [None]:
# This will take care of dropping the not required fields and rename others as per mapping
customers_target = customers. \
    drop(columns=columns_to_be_dropped). \
    rename(columns=required_columns_mapping)

* Create MongoDB Connection.

In [None]:
import pymongo, getpass, configparser

username = getpass.getuser()
config = configparser.ConfigParser()
config.read(f'/home/{username}/.jupyterenv')

client = pymongo.MongoClient(
    host='pylabsmd.itversity.com', 
    username=f'{username}_scratch_user', 
    password=config['DEFAULT']['MONGO_SCRATCH_PASS'], 
    authSource='admin'
)

* Cleanup the data in the collection before loading so that we will not end up with duplicate data.

In [None]:
client[f'{username}_scratch_db']['customers'].delete_many({})

In [None]:
for doc in client[f'{username}_scratch_db']['customers'].find({}):
    print(doc)

In [None]:
customers_target.to_dict?

In [None]:
customers_list = customers_target.to_dict(orient='records')

In [None]:
len(customers_list)

In [None]:
customers_list_range = list(range(0, len(customers_list), 6))

In [None]:
customers_list_range[:-1]

In [None]:
customers_list_range[1:]

In [None]:
list(zip(customers_list_range[:-1], customers_list_range[1:]))

* Generate chunk lower bound and upper bounds

In [None]:
chunks = list(zip(customers_list_range[:-1], customers_list_range[1:]))

In [None]:
for lb, ub in chunks:
    print(f'Processing from {lb} to {ub}')
    print(customers_list[lb:ub])

print(f'Processing last set from {ub} to {len(customers_list)}')
print(customers_list[ub:])

* Store data into MongoDB collection in chunks.

In [None]:
for lb, ub in chunks:
    print(f'Processing from {lb} to {ub}')
    client[f'{username}_scratch_db']['customers'].insert_many(customers_list[lb:ub])

print(f'Processing last set from {ub} to {len(customers_list)}')
client[f'{username}_scratch_db']['customers'].insert_many(customers_list[ub:])

In [None]:
for doc in client[f'{username}_scratch_db']['customers'].find({}):
    print(doc)

In [None]:
client[f'{username}_scratch_db']['customers'].count_documents({})