Open this notebook in Google Colab: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/santoshborse/pydatanyc2024/blob/main/data-prep.ipynb)

### 1 - Setup


In [27]:
import os

if os.getenv("COLAB_RELEASE_TAG"):
   print("Running in Colab")
   RUNNING_IN_COLAB = True
else:
   print("NOT in Colab")
   RUNNING_IN_COLAB = False
!wget -O 'my_utils.py' 'https://raw.githubusercontent.com/sujee/data-prep-kit/intro-example1/examples/notebooks/intro/my_utils.py'

Running in Colab
--2024-10-31 02:10:40--  https://raw.githubusercontent.com/sujee/data-prep-kit/intro-example1/examples/notebooks/intro/my_utils.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1856 (1.8K) [text/plain]
Saving to: ‘my_utils.py’


2024-10-31 02:10:40 (29.8 MB/s) - ‘my_utils.py’ saved [1856/1856]



In [29]:
## Configuration
class MyConfig:
    pass

MY_CONFIG = MyConfig ()

MY_CONFIG.INPUT_DATA_DIR = 'input/solar-system'

MY_CONFIG.OUTPUT_FOLDER = "output"
MY_CONFIG.OUTPUT_FOLDER_FINAL = os.path.join(MY_CONFIG.OUTPUT_FOLDER , "output_final")

## Embedding model
MY_CONFIG.EMBEDDING_MODEL = 'sentence-transformers/all-MiniLM-L6-v2'

## RAY CONFIGURATION
### For local runs, we can use more parallelism
### For google colab, be conservative

if RUNNING_IN_COLAB:
  MY_CONFIG.RAY_RUNTIME_WORKERS = 2
  MY_CONFIG.RAY_NUM_CPUS =  0.3
  MY_CONFIG.RAY_MEMORY_GB = 2  # GB
else:  # local run
  num_cpus_available =  os.cpu_count()
  print(f"{num_cpus_available = }")

  MY_CONFIG.RAY_RUNTIME_WORKERS = 2
  MY_CONFIG.RAY_NUM_CPUS =  0.8
  MY_CONFIG.RAY_MEMORY_GB = 2  # GB
  # MY_CONFIG.RAY_RUNTIME_WORKERS = num_cpus_available // 3

print ('MY_CONFIG.RAY_RUNTIME_WORKERS:', MY_CONFIG.RAY_RUNTIME_WORKERS)
print ('MY_CONFIG.RAY_NUM_CPUS:', MY_CONFIG.RAY_NUM_CPUS)
print ('MY_CONFIG.RAY_MEMORY_GB:', MY_CONFIG.RAY_MEMORY_GB)

MY_CONFIG.RAY_RUNTIME_WORKERS: 2
MY_CONFIG.RAY_NUM_CPUS: 0.3
MY_CONFIG.RAY_MEMORY_GB: 2


### 2. Install required Packages

In [13]:
!pip install ipykernel datasets humanfriendly
!pip3 install 'data-prep-toolkit[ray]==0.2.2.dev1'



Collecting data-prep-toolkit-transforms==0.2.2.dev1 (from data-prep-toolkit-transforms[resize]==0.2.2.dev1)
  Downloading data_prep_toolkit_transforms-0.2.2.dev1-py3-none-any.whl.metadata (8.2 kB)
Collecting argparse (from data-prep-toolkit>=0.2.2.dev1->data-prep-toolkit-transforms==0.2.2.dev1->data-prep-toolkit-transforms[resize]==0.2.2.dev1)
  Using cached argparse-1.4.0-py2.py3-none-any.whl.metadata (2.8 kB)
Downloading data_prep_toolkit_transforms-0.2.2.dev1-py3-none-any.whl (275 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m275.2/275.2 kB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m
[?25hUsing cached argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Installing collected packages: argparse, data-prep-toolkit-transforms
Successfully installed argparse-1.4.0 data-prep-toolkit-transforms-0.2.2.dev1


###

In [3]:
from huggingface_hub import hf_hub_download
!mkdir -p 'yelp-dataset'
hf_hub_download(repo_type="dataset", repo_id="yelp/yelp_review_full", filename="yelp_review_full/test-00000-of-00001.parquet", local_dir="./yelp-dataset")
hf_hub_download(repo_type="dataset", repo_id="yelp/yelp_review_full", filename="yelp_review_full/train-00000-of-00001.parquet", local_dir="./yelp-dataset")
input_folder = "yelp-dataset/yelp_review_full"


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


### 3 - Inspect input


In [18]:
from my_utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(input_folder)

print ("Output dimensions (rows x columns)= ", output_df.shape)

output_df.head(5)

Output dimensions (rows x columns)=  (700000, 2)


Unnamed: 0,label,text
0,0,Holly cow this place sucks! First of all I was...
1,0,"This place is terrible! First, I had a hard t..."
2,3,"I love three things in life: My mom, my bulld..."
3,0,This is a very popular (Why?) casual restauran...
4,2,This place was my favorite till I ordered the ...


### 4. Resize

In [None]:
!pip install 'data-prep-toolkit-transforms[resize]==0.2.2.dev1'

In [7]:
STAGE = "1"
output_folder =  "yelp-dataset/s1-resized"
from data_processing.utils import ParamsUtils


In [4]:


import os
import sys

from data_processing.runtime.pure_python import PythonTransformLauncher
from resize_transform_python import ResizePythonTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    # resize configuration
    # "resize_max_mbytes_per_table":  0.02,
    "resize_max_rows_per_table": 35000,
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = PythonTransformLauncher(ResizePythonTransformConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

20:34:44 INFO - Split file parameters are : {'max_rows_per_table': 35000, 'max_mbytes_per_table': -1, 'size_type': 'disk'}
INFO:resize_transform:Split file parameters are : {'max_rows_per_table': 35000, 'max_mbytes_per_table': -1, 'size_type': 'disk'}
20:34:44 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
20:34:44 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
20:34:44 INFO - data factory data_ is using local data access: input_folder - yelp-dataset/yelp_review_full output_folder - yelp-dataset/s1-resized
INFO:data_processing.data_access.data_access_factory_base4930187d-0650-4796-9eb6-ccd2ecf870a6:data factory data_ is using local data access: input_folder - yelp-dataset/yelp_review_full output_folder - yelp-dataset/s1-resized
20:34:44 INFO - data factory data_ max_files -1, n_sample -1
INFO:data_processing.data_access.data_access_factory_base4930187d-0650-4796-9eb6-ccd2ecf8

✅ Stage:1 completed successfully


In [5]:
STAGE = "2"
input_folder = output_folder
output_folder =  "yelp-dataset/s2-hap"

In [3]:
!pip install 'data-prep-toolkit-transforms[ray,hap]==0.2.2.dev1'

Collecting argparse (from data-prep-toolkit>=0.2.2.dev1->data-prep-toolkit-transforms==0.2.2.dev1->data-prep-toolkit-transforms[hap,ray]==0.2.2.dev1)
  Using cached argparse-1.4.0-py2.py3-none-any.whl.metadata (2.8 kB)
Using cached argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Installing collected packages: argparse
Successfully installed argparse-1.4.0


In [None]:
import sys, os
from data_processing.runtime.pure_python import PythonTransformLauncher
from hap_transform_python import HAPPythonTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # hap_params
    "model_name_or_path": 'ibm-granite/granite-guardian-hap-38m',
    "annotation_column": "hap_score",
    "doc_text_column": "text",
    "inference_engine": "CPU",
    "max_length": 512,
    "batch_size": 128,
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = PythonTransformLauncher(HAPPythonTransformConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

### DocID

In [16]:
STAGE = "3"
input_folder = "yelp-dataset/s1-resized"
output_folder =  "yelp-dataset/s3-docid"

In [17]:
from data_processing.runtime.pure_python import PythonTransformLauncher
from doc_id_transform_python import DocIDPythonTransformRuntimeConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    # doc id configuration
    "doc_id_doc_column": "text",
    "doc_id_hash_column": "text_hash",
    "doc_id_int_column": "text_id",
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = PythonTransformLauncher(DocIDPythonTransformRuntimeConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

01:55:20 INFO - Doc id parameters are : {'doc_column': 'text', 'hash_column': 'text_hash', 'int_column': 'text_id', 'start_id': 0}
INFO:doc_id_transform_base:Doc id parameters are : {'doc_column': 'text', 'hash_column': 'text_hash', 'int_column': 'text_id', 'start_id': 0}
01:55:20 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
01:55:20 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
01:55:20 INFO - data factory data_ is using local data access: input_folder - yelp-dataset/s1-resized output_folder - yelp-dataset/s3-docid
INFO:data_processing.data_access.data_access_factory_baseea9a25ae-7c77-428d-b7b3-aee2631fe0f4:data factory data_ is using local data access: input_folder - yelp-dataset/s1-resized output_folder - yelp-dataset/s3-docid
01:55:20 INFO - data factory data_ max_files -1, n_sample -1
INFO:data_processing.data_access.data_access_factory_baseea9a25ae-7c77-428d-b7b3-aee

✅ Stage:3 completed successfully


In [19]:
from my_utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Output dimensions (rows x columns)= ", output_df.shape)

output_df.head(5)

Output dimensions (rows x columns)=  (700000, 4)


Unnamed: 0,label,text,text_hash,text_id
0,0,Holly cow this place sucks! First of all I was...,39e53208ab17c688f585dd7621b513df1fb343e74d44c8...,210000
1,0,"This place is terrible! First, I had a hard t...",91449714d66720fcc918d8857c1ace665e0d17c6a86cc1...,210001
2,3,"I love three things in life: My mom, my bulld...",be54478dd50ba786408e6e161890b31c41c3626e81afa1...,210002
3,0,This is a very popular (Why?) casual restauran...,3f02ab344b8388acf591c469bf0f276faba15dd38bf0a2...,210003
4,2,This place was my favorite till I ordered the ...,03a2a95e5fd517f398b2af49a4d2f3f188f79f7ccca422...,210004


### Lang ID

In [20]:
STAGE = "4"
input_folder = "yelp-dataset/s3-docid"
output_folder =  "yelp-dataset/s4-lang-detected"

In [22]:
!pip install fasttext

Collecting fasttext
  Downloading fasttext-0.9.3.tar.gz (73 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.4/73.4 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pybind11>=2.2 (from fasttext)
  Using cached pybind11-2.13.6-py3-none-any.whl.metadata (9.5 kB)
Using cached pybind11-2.13.6-py3-none-any.whl (243 kB)
Building wheels for collected packages: fasttext
  Building wheel for fasttext (pyproject.toml) ... [?25l[?25hdone
  Created wheel for fasttext: filename=fasttext-0.9.3-cp310-cp310-linux_x86_64.whl size=4296183 sha256=310efbf553e6a32fa1e4ce4f4654dd1355ae72923eb4aebea4539d9f4790f6b4
  Stored in directory: /root/.cache/pip/wheels/0d/a2/00/81db54d3e6a8199b829d58

In [25]:
import os
import sys

from data_processing.utils import ParamsUtils
from data_processing_ray.runtime.ray import RayTransformLauncher
from lang_id_transform import (
    content_column_name_cli_param,
    model_credential_cli_param,
    model_kind_cli_param,
    model_url_cli_param,
    output_lang_column_name_cli_param,
    output_score_column_name_cli_param,
)
from lang_id_transform_ray import LangIdentificationRayTransformConfiguration
from lang_models import KIND_FASTTEXT


local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 2,
    "runtime_creation_delay": 0,
    # lang_id params
    model_credential_cli_param: "PUT YOUR OWN HUGGINGFACE CREDENTIAL",
    model_kind_cli_param: KIND_FASTTEXT,
    model_url_cli_param: "facebook/fasttext-language-identification",
    content_column_name_cli_param: "text",
    output_lang_column_name_cli_param: "ft_lang",
    output_score_column_name_cli_param: "ft_score",
}
if __name__ == "__main__":
    # Set the simulated command line args
    sys.argv = ParamsUtils.dict_to_req(d=params)
    # create launcher
    launcher = RayTransformLauncher(LangIdentificationRayTransformConfiguration())
    # Launch the ray actor(s) to process the input
    launcher.launch()

02:05:36 INFO - lang_id parameters are : {'model_credential': 'PUT YOUR OWN HUGGINGFACE CREDENTIAL', 'model_kind': 'fasttext', 'model_url': 'facebook/fasttext-language-identification', 'content_column_name': 'text', 'output_lang_column_name': 'ft_lang', 'output_score_column_name': 'ft_score'}
INFO:lang_id_transform:lang_id parameters are : {'model_credential': 'PUT YOUR OWN HUGGINGFACE CREDENTIAL', 'model_kind': 'fasttext', 'model_url': 'facebook/fasttext-language-identification', 'content_column_name': 'text', 'output_lang_column_name': 'ft_lang', 'output_score_column_name': 'ft_score'}
02:05:36 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
02:05:36 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
02:05:36 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 2 worker opti

[36m(orchestrate pid=94591)[0m created [Actor(RayTransformFileProcessor, 2501229f7070e8db269e2aaf01000000), Actor(RayTransformFileProcessor, 6df10c9b88452f959479e7e801000000)], alive []


[36m(orchestrate pid=94591)[0m Traceback (most recent call last):
[36m(orchestrate pid=94591)[0m   File "/usr/local/lib/python3.10/dist-packages/data_processing_ray/runtime/ray/transform_orchestrator.py", line 89, in orchestrate
[36m(orchestrate pid=94591)[0m     processors = RayUtils.create_actors(
[36m(orchestrate pid=94591)[0m   File "/usr/local/lib/python3.10/dist-packages/data_processing_ray/runtime/ray/ray_utils.py", line 121, in create_actors
[36m(orchestrate pid=94591)[0m     raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive")
[36m(orchestrate pid=94591)[0m data_processing.utils.unrecoverable.UnrecoverableException: out of 2 created actors only 0 alive
[36m(orchestrate pid=94591)[0m 02:07:46 ERROR - Exception during execution out of 2 created actors only 0 alive: None
[36m(orchestrate pid=94591)[0m Traceback (most recent call last):
[36m(orchestrate pid=94591)[0m   File "/usr/local/lib/python3.10/dist-packages/data_proc