# Cassandra: Import Large Datasets

In this module, we import large datasets in CSV files into Cassandra.

## Dataset
Monthly behavior datasets from multi category store (about 50 Millions of records each month).

https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store

There are 4 types of logged events:
* view - a user viewed a product
* cart - a user added a product to shopping cart
* remove_from_cart - a user removed a product from shopping cart
* purchase - a user purchased a product. A session can have multiple purchase events. It's normal, because it's a single order.

## Cassandra Cluster

In a Cassandra cluster, there is no central primary (or master) node. All nodes in the cluster are peers. There are mechanisms, such as the Gossip protocol to determine when the cluster is first started for nodes to discover each other.

Once the topology is established, however, it is not static. This same Gossip mechanism helps to determine when additional nodes are added to the cluster, or when nodes are removed from the cluster.

### Check Cluster Status

You can conduct a Cassandra cluster health check with nodetool status. Nodetool status commands allow you to check Cassandra cluster status and view things like data distribution among nodes, whether nodes are up or down, node states, node data loads, token numbers, and related information.

The nodetool info command offers node information, including active or passive gossip status, uptime, disk load, chunk cache information, times started (generation), heap memory usage, and more.

Finally, the nodetool tpstats command shows thread pool usage statistics at each stage.

```bash
$ nodetool status
$ nodetool status keyspace.table
$ nodetool tablestats
```

### Setup Cluster with Docker

1. Download

The official image of a Cassandra worker is available at [Docker Hub](https://hub.docker.com/_/cassandra). 

```bash
$ docker pull cassandra
Using default tag: latest
latest: Pulling from library/cassandra
5544ebdc0c7b: Pull complete
9f11d3ecf1bb: Pull complete
67ea14bd9996: Pull complete
94e309096fff: Pull complete
9330c089b83e: Pull complete
f3837b8087ee: Pull complete
11c520a8aabe: Pull complete
ce53e4df33e6: Pull complete
f46e33d0cbbe: Pull complete
Digest: sha256:e3a505f2cb0f53b730d43ea06235b5fae14bf5006d904a096089baf77dbaf217
Status: Downloaded newer image for cassandra:latest
docker.io/library/cassandra:latest
```

2. Basic configurations

With `docker run`, we can pre-configure the cluster:

* `--name cassandra_node0`: name of the container which is creating
* `-d`: Run in the background (detached mode)
* `--memory="8g" --cpus="2.0"`: limit the resources used by the container. In this case, 8GB RAM and 2.0 cpus are used (equal to use 2 cpu cores).
* `-v E:/shared_data/v0:/var/lib/cassandra`: specify external storage for Cassandra.
* `-p 9042:9042 -p 7000:7000 -p 7001:7001 -p 7199:7199`: binding to come common ports using by external and internal to communicate with cluster.
* `-e CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch`: specify which Gossip mechanism is used. `GossipingPropertyFileSnitch` is recommended for production use in yaml file.
* `-e CASSANDRA_BROADCAST_ADDRESS=192.168.137.101`: Set broadcast IP address. Must set for the main nodes or first node to be turned on.
* `-e CASSANDRA_SEEDS=192.168.137.101`: tell other nodes to connect first to this node to get and exchange additional information.
* `-e CASANDRA_CLUSTER_NAME=cass_recommender`: Set cluster name
* `-e CASSANDRA_DC=dc1` and `-e CASSANDRA_RACK=rack1`: Set Data Center and Rack number of this node

Examples:

Main node - node0
```bash
$ docker run --memory="8g" --cpus="2.0" \
    --name cassandra_node0 \
    -v E:/shared_data/v0:/var/lib/cassandra \
    -e CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch \
    -e CASSANDRA_BROADCAST_ADDRESS=192.168.137.101 \
    -e CASANDRA_CLUSTER_NAME=cass_recommender \
    -e CASSANDRA_DC=dc1 \
    -p 9042:9042 -p 7000:7000 -p 7001:7001 -p 7199:7199 \
    -d cassandra:latest
```

Other nodes - node1

```bash
$ docker run --memory="8g" --cpus="3.0" \
    --name cassandra_node1 \
    -v E:/shared_data/v1:/var/lib/cassandra  \
    -e CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch \
    -e CASSANDRA_SEEDS=192.168.137.101 \
    -e CASANDRA_CLUSTER_NAME=cass_recommender \
    -e CASSANDRA_DC=dc1 \
    -d cassandra:latest
```

To get the IP of a running container, we can use `inspect`:
```
$ docker inspect cassandra_node0
```

3. External Storage

If there was no external storage mounted to a container, we might lost data if the container is broken or deleted. It's also hard to move data around as well as take care of it. Hence, we can create a new volume with Docker using `docker volume create` command, or we can point a local directory and mount it when creating the container.

```bash
$ docker run -v volume/dir:remote/dir
$ docker run -v local/dir:remote/dir
```


3. Run Cluster

First, turn on the main node - the node0. Then turn on other nodes. It takes time for all nodes in the cluster to communicate and connect with each other. Once all nodes are connected, we can check status:

```
$ docker exec -it cassandra_node0 nodetool status
Datacenter: dc1
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address          Load        Tokens  Owns (effective)  Host ID                               Rack
UN  172.17.0.3       117.57 KiB  16      64.7%             d9f529c7-9b81-462e-8ec1-85938fba359d  rack1
UN  192.168.137.101  134.89 KiB  16      62.1%             ad7179b0-fd7c-4ce4-8903-4451d1a59af6  rack1

Datacenter: dc2
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address          Load        Tokens  Owns (effective)  Host ID                               Rack
UN  172.17.0.2       105.89 KiB  16      73.2%             8b37d127-3e34-4fe4-b9a8-aa415ee6023a  rack1
```

To access the cqlsh:

```bash
$ docker exec -it container_ID bash
root@64a8dddfaebf:/#cqlsh
```
or 

```bash
$ docker exec -it container_ID cqlsh
```


### Notes 

1. External Data Center

In case there are external data centers which are outside of the current Docker environment, we need to map port 7000 (-p 7000:7000), assign first node with real IP CASSANDRA_BROADCAST_ADDRESS=192.168.137.101, tell second node about this seed  CASSANDRA_SEEDS=192.168.137.101 (from first node). 

Note: It's not easy to access internal running containers from outside of Docker, for example, from another computer or new virtual machine. In this case, we only can run one instance of cassandra for each Docker and assign the machine's IP for each container (bound with port 7000)

2. Recommended resources

A minimal production server requires at least 2 cores, and at least 8GB of RAM. Typical production servers have 8 or more cores and at least 32GB of RAM. 

We can allocate when creating a new container:
```bash
$ docker run --memory="5g" --cpus="1.0" IMAGE
```

Or adjust the current container
```bash
$ docker container update --memory="5g" --cpus="1.0" CONTAINER
```

## Load data from CSV file

In [2]:
import numpy as np
import pandas as pd
from pathlib import Path
import time

from datatools import DBWrapper, LoadDataToCassandra

In [3]:
# Cassandra configurations
cassandra_cluster_ips = ["192.168.137.101"]
keyspace = "events_by_users"
table_name = "logs_all_events"


# CSV info and configuration
files = ["2019-Dec.csv"]  # ["2019-Oct.csv", "2019-Nov.csv", "2019-Dec.csv"]
path = "E:/coding/input/ecommerce-behavior-data-from-multi-category-store/"
DTYPE = {
    "event_time": str,
    "event_type": "category",
    "product_id": str,
    "category_id": str,
    "category_code": str,
    "brand": str,
    "price": float,
    "user_id": str,
    "user_session": str,
}

# Size of batch when inserting to Cassandra = number of records for each batch.
BATCH_SIZE = 10000
# Due to large size of the CSV file, we load rows by chunk.
CHUNK_SIZE = 10**6

## Processing Data and Importing to Cassandra

Depending on the size of the batch (more BATCH_SIZE meaning heavier batch query), we need to change the threshold for for batch processing in cassandra.yaml

```
> batch_size_warn_threshold: 10240KiB
> batch_size_fail_threshold: 50240KiB
```

To copy the `cassandra.yaml` from a container in Docker, we can use the following commands

```bash
$ docker cp cassandra_node0:/etc/cassandra/cassandra.yaml ~/local/
$ nano ~/local/cassandra.yaml
$ docker cp ~/local/cassandra.yaml cassandra_node0:/etc/cassandra
```

In this setup, it took about 4.5 hours (270 minutes) to load in to the cassandra for 68 million records (rate ~ 4000 records/second). The csv file was 9 GB.

In [3]:
info = {
    "ips": cassandra_cluster_ips,
    "keyspace": keyspace,
    "table_name": table_name,
}
cassandra_conn = DBWrapper("cassandra", info)

# if the program stop in the middle of the process, set SKIP_ROWS to continue
SKIP_ROWS = None  # None
loader = LoadDataToCassandra()

for name in files:
    file_path = Path(path + name)
    print(file_path)
    start = time.time()
    loader.save_to_cassandra(
        cassandra_conn,
        file_path,
        dtype=DTYPE,
        BATCH_SIZE=BATCH_SIZE,
        CHUNK_SIZE=CHUNK_SIZE,
        SKIP_ROWS=SKIP_ROWS,
    )
    print("Running time = ", time.time() - start)

cassandra_conn.disconnect()



E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv


DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 0
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 1000000
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 2000000
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 3000000
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 4000000
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 5000000
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 6000000
DEBUG:datatools:E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv Current # of rows = 7000000
DEBUG:datatools:E:\cod

Running time =  16282.749917507172


## Process Data

We can clean up and fill any missing data with default values in a CSV file and save the processed data into a new CSV file.

In [6]:
from datatools import LoadDataFromCSV

SKIP_ROWS = None  # None
loader = LoadDataFromCSV()

for name in files:
    file_path = Path(path + name)
    file_path_new = Path(path + "cleaned_" + name)
    print(file_path)
    start = time.time()
    with open(file_path, "r") as f:
        first_line = f.readline()
        with open(file_path_new, "w") as f_new:
            f_new.write(first_line)

    for df in loader.load_by_chunk(
        file_path, dtype=DTYPE, chunksize=CHUNK_SIZE, skiprows=SKIP_ROWS
    ):
        df.to_csv(file_path_new, index=False, header=False, mode="a")

    print("Running time = ", time.time() - start)

E:\coding\input\ecommerce-behavior-data-from-multi-category-store\2019-Dec.csv
Running time =  428.40974593162537


## Import CSV files: Other Methods

If we have well-formed CSV files without missing data or wrong formatting, we can directly import to Cassandra. If the CSV file needs to be processed, we can pre-process the CSV file using Pandas, and save it to a new CSV file.

1. `COPY` function from `cqlsh`

Besides using Pandas in Python, we can also import CSV files directly using the `COPY` function from `cqlsh`. For example:

```
> COPY keyspace.table_name (col1, col2, col3) FROM '~/data.csv' WITH HEADER=TRUE ;
```

Read more here: https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlshCopy.html



2. DataStax Bulk Loader

DataStax

```
$ dsbulk load -url ~/data.csv -k keyspace -t table_name
```

Load data from `url`, which could be HTTP link or local file, into `keyspace`.`table_name`

More detals here:

* https://downloads.datastax.com/#bulk-loader
* https://docs.datastax.com/en/dsbulk/docs/reference/dsbulkCmd.html
* https://www.datastax.com/blog/datastax-bulk-loader-introduction-and-loading


## `COPY` function from `cqlsh`


In bash shell (`docker exec -it cassandra_node0 bash`)

```bash
$ cqlsh
$ cqlsh> COPY events_by_users.demo_table (event_time, event_type, product_id, category_id, category_code, brand, price, user_id, user_session) FROM '/dir/cassandra/cleaned_2019-Nov.csv'  WITH HEADER=TRUE ;

```

During my testing run, the rate was about 5000 records per second. Although there were some possible problems:

1. `WriteTimeout` error occurred. If the number of records is large, it can cause over load issues and the data might be lost. The `COPY` command should be used with small datasets, such as the number of records is less than millions.

2. Can not handle the errors during the importing process. For example, the values are missing, the values are in wrong format (string instead of integer data type). Or even the cases of corupted files due to transportation, for example, moving between servers via networks or copying files to disk or usb.

3. Hard to stop or resume the importing process.

4. By batching, the capability of servers could be different to handle throughput. If the number of records is large, it can reduce the throughput greatly. The optimal batch size might need to be investigated.

Bellow is the error I encountered, the cluster was frozen.

```bash
Exceeded maximum number of insert errors 1000
Failed to process 1005 rows; failed rows written to import_events_by_users_demo_table.err
Exceeded maximum number of insert errors 1000
Processed: 41925000 rows; Rate:   16906 rows/s; Avg. rate:    4767 rows/s
41923995 rows imported from 0 files in 0 day, 2 hours, 26 minutes, and 33.924 seconds (0 skipped).
```