# Use BulkWriter for Data Import (3): Use BulkImport

This notebook helps you learn how to use PyMilvus' BulkImport API to import your prepared data to a Zilliz Cloud collection.

## Before you start

Ensure that:

- Install the dependencies, including PyMilvus and MinIO Python Client.
- Create an output folder for the storage of the BulkWriter output.

In [1]:
%pip install pymilvus minio


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


# Import dependencies



In [7]:
from urllib.parse import urlparse
import time, json

from minio import Minio

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
    utility,
    bulk_import,
    get_import_progress,
    list_import_jobs,
)

# Check the prepared data files you have

ACCESS_KEY = "YOUR_OBJECT_STORAGE_ACCESS_KEY"
SECRET_KEY = "YOUR_OBJECT_STORAGE_SECRET_KEY"
BUCKET_NAME = "YOUR_OBJECT_STORAGE_BUCKET_NAME"
REMOTE_PATH = "DATA_FILES_PATH_IN_BLOCK_STORAGE"

## Check prepared data

Once you have prepared your data using a LocalBulkWriter and upload the generated file to your object storage, or prepared your data using a RemoteBulkWriter and got the path to the remote folder. You are ready to import them to a Zilliz Cloud collection.

To check whether they are ready, do as follows:

In [3]:
client = Minio(
    endpoint="storage.googleapis.com", # use 's3.amazonaws.com' for GCS
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    secure=True)

objects = client.list_objects(
    bucket_name=BUCKET_NAME,
    prefix=REMOTE_PATH,
    recursive=True
)

print([obj.object_name for obj in objects])

['numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/claps.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/id.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/link.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/publication.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/reading_time.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/responses.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/title.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/1/vector.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3b-a39d-eafcc3dc3887/1/claps.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3b-a39d-eafcc3dc3887/1/id.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3b-a39d-eafcc3dc3887/1/link.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3b-a39d-eafcc3dc3887/1/publication.npy', 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3

## Create collection and import data

Once your data files are ready, connect to a Zilliz Cloud cluster, create a collection out of the schema, and import the data from the files in the storage bucket.

Since Zilliz Cloud does not allow cross-cloud data transmission, you need to create your cluster on the same public cloud that house your prepared dataset.

In [8]:
# set up your collection

CLUSTER_ENDPOINT = "YOUR_CLUSTER_ENDPOINT"
CLUSTER_TOKEN = "YOUR_CLUSTER_TOKEN"
COLLECTION_NAME = "medium_articles"
API_KEY = "YOUR_CLUSTER_TOKEN"
CLUSTER_ID = urlparse(CLUSTER_ENDPOINT).netloc.split(".")[0] if urlparse(CLUSTER_ENDPOINT).netloc.startswith("in") else None
CLOUD_REGION = [ x for x in urlparse(CLUSTER_ENDPOINT).netloc.split(".") if x.startswith("gcp") or x.startswith("aws") or x.startswith("ali")][0] if urlparse(CLUSTER_ENDPOINT).netloc.startswith("in") else None

if CLOUD_REGION is None:
    raise Exception("Invalid cluster endpoint")
elif CLOUD_REGION.startswith("gcp"):
    OBJECT_URL = f"gs://{BUCKET_NAME}/{REMOTE_PATH}/"
elif CLOUD_REGION.startswith("aws"):
    OBJECT_URL = f"s3://{BUCKET_NAME}/{REMOTE_PATH}/"
elif CLOUD_REGION.startswith("ali"):
    OBJECT_URL = f"oss://{BUCKET_NAME}/{REMOTE_PATH}/"

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="link", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="reading_time", dtype=DataType.INT64),
    FieldSchema(name="publication", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="claps", dtype=DataType.INT64),
    FieldSchema(name="responses", dtype=DataType.INT64)
]

schema = CollectionSchema(fields)

connections.connect(
    uri=CLUSTER_ENDPOINT,
    token=CLUSTER_TOKEN,
    secure=True
)

collection = Collection(COLLECTION_NAME, schema)

collection.create_index(
    field_name="vector",
    index_params={
        "index_type": "AUTOINDEX",
        "metric_type": "L2"
    }
)

collection.load()

# bulk-import your data from the prepared data files

res = bulk_import(
    url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
    api_key=API_KEY,
    object_url=OBJECT_URL,
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
    cluster_id=CLUSTER_ID,
    collection_name=COLLECTION_NAME
)

print(res.json())

{'code': 200, 'data': {'jobId': '0f7fe853-d93e-4681-99f2-4719c63585cc'}}


## Check Bulk Import Progress

You can check the progress of a specified bulk-import job.

In [9]:
job_id = res.json()['data']['jobId']
res = get_import_progress(
    url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
    api_key=API_KEY,
    job_id=job_id,
    cluster_id=CLUSTER_ID
)

# check the bulk-import progress

while res.json()["data"]["readyPercentage"] < 1:
    time.sleep(5)

    res = get_import_progress(
        url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
        api_key=API_KEY,
        job_id=job_id,
        cluster_id=CLUSTER_ID
    )

print(res.json())

{'code': 200, 'data': {'collectionName': 'medium_articles', 'fileName': 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3b-a39d-eafcc3dc3887/1/', 'fileSize': 26571700, 'readyPercentage': 1, 'completeTime': '2023-11-03T10:41:32Z', 'errorMessage': None, 'jobId': '0f7fe853-d93e-4681-99f2-4719c63585cc', 'details': [{'fileName': 'numpy-files/04923874-dfe8-44e6-85ab-f329db9379a2/3a3437ca-1bbb-4d3b-a39d-eafcc3dc3887/1/', 'fileSize': 26571700, 'readyPercentage': 1, 'completeTime': '2023-11-03T10:41:32Z', 'errorMessage': None}]}}


## List all bulk-import jobs

You can list all bulk-import jobs to learn about their details.

In [10]:
res = list_import_jobs(
    url=f"controller.api.{CLOUD_REGION}.zillizcloud.com",
    api_key=API_KEY,
    cluster_id=CLUSTER_ID,
    page_size=10,
    current_page=1,
)

print(res.json())

{'code': 200, 'data': {'tasks': [{'collectionName': 'medium_articles', 'jobId': '0f7fe853-d93e-4681-99f2-4719c63585cc', 'state': 'ImportCompleted'}, {'collectionName': 'medium_articles', 'jobId': '9d0bc230-6b99-4739-a872-0b91cfe2515a', 'state': 'ImportCompleted'}, {'collectionName': 'medium_articles', 'jobId': '53632e6c-c078-4476-b840-10c4793d9c08', 'state': 'ImportCompleted'}, {'collectionName': 'medium_articles', 'jobId': '95e7d4c4-cf60-4ce1-ac49-145459ee0f99', 'state': 'ImportCompleted'}, {'collectionName': 'medium_articles', 'jobId': 'ddca617e-8f2f-4612-9d6a-12b6edb69833', 'state': 'ImportCompleted'}, {'collectionName': 'medium_articles', 'jobId': '79fb0137-9e28-48e0-b7b1-e96706bb921f', 'state': 'ImportCompleted'}, {'collectionName': 'YOUR_COLLECTION_NAME', 'jobId': 'dd391fed-822f-4e17-b5a7-8a43d49f1eb7', 'state': 'ImportCompleted'}, {'collectionName': 'YOUR_COLLECTION_NAME', 'jobId': 'cf11ac48-2e1e-47d3-ab88-0e38736d9629', 'state': 'ImportCompleted'}, {'collectionName': 'YOUR_COLL