In [1]:
import pandas as pd

In [2]:
"""This module is used for running zero shot classification with multiple GPUs"""
import pandas as pd
import numpy as np
from transformers import pipeline
from transformers import set_seed
from tqdm.notebook import tqdm
import math


def candidate_labels_to_columns(candidate_labels: list, sort: bool = True) -> list:
    """Convert candidate labels to column names

    Args:
        candidate_labels (list): list of candidate labels

    Returns: list of column names
    """
    zero_shot_columns = [
        label.replace(" ", "_").replace("/", "_").lower() for label in candidate_labels
    ]  # replace spaces with underscores
    if sort:
        zero_shot_columns.sort()  # sort columns
    return zero_shot_columns


class model_inference:
    def __init__(
        self,
        text_docs,
        candidate_labels,
        model,
        tokeniser,
        multi_label=True,
        batch_size=4,
        seed_value=42,
        gpu_id=-1,
        text_column_name="zero_shot_text"
    ):
        self.text_docs = text_docs
        self.candidate_labels = candidate_labels
        self.multi_label = multi_label
        self.batch_size = batch_size
        self.seed_value = seed_value
        self.model = model
        self.tokenizer = tokeniser
        self.gpu_id = gpu_id
        self.text_column_name = text_column_name

    def _load_model(self):
        """Load model into memory

        Returns:
            transformers.pipelines.ZeroShotClassificationPipeline: zero shot learning model
        """
        set_seed(self.seed_value)
        classifier = pipeline(
            "zero-shot-classification",
            model=self.model,
            tokenizer=self.tokenizer,
            framework="pt",
            device=self.gpu_id,
        )
        return classifier

    def _split_data_batches(self, text_docs: list) -> list:
        """Split data into batches of a specified size

        Args:
            text_docs (list): list of text documents of type str

        Returns:
            list: list of np.array containting text documents
        """
        data_batches = np.array_split(
            text_docs,
            math.ceil(len(text_docs) / self.batch_size),
        )
        return data_batches

    def _predict_data_batches(self, data_chunks) -> list:
        """Make predictions in batches

        Args:
            data_chunks (list): list of np.array containting text documents

        Returns:
            list: list of model results
        """
        results = []
        text_desc = (
            "Classifying with CPU" if self.gpu_id == -1 else f"Classifying with GPU {self.gpu_id}"
        )
        for data in tqdm(
            data_chunks,
            total=len(data_chunks),
            desc=text_desc,
        ):
            chunk_size = len(data)
            result = self.classifier(
                list(data), self.candidate_labels, multi_label=self.multi_label
            )
            results.extend([result]) if chunk_size == 1 else results.extend(result)
        return results

    def _convert_model_results_df(self, results) -> pd.DataFrame:
        """Convert model results into a pandas dataframe

        Args:
            results (list): list of model results

        Returns:
            pd.DataFrame: dataframe of model results
        """
        # initialise dictionary with keys for each label and a text column
        # all items are initialised as empty lists so that they can be appended
        model_results = {label: [] for label in self.candidate_labels + [self.text_column_name]}

        # loop through all results and add scores and input text to dictionary
        for result in results:
            # append input text to dictionary
            model_results[self.text_column_name] += [result["sequence"]]
            # loop through all labels and add scores to dictionary
            for i, label in enumerate(result["labels"]):
                model_results[label] += [result["scores"][i]]

        # convert dictionary to pandas dataframe
        df = pd.DataFrame(model_results)
        # update column names by replacing spaces with underscores
        df.columns = candidate_labels_to_columns(df.columns, sort=False)
        return df

    def run(self):
        """Run zero shot learning pipeline

        Returns:
            pd.DataFrame: model results
        """
        # load model on GPU
        self.classifier = self._load_model()
        # split data into batches
        data_chunks = self._split_data_batches(self.text_docs)
        # label data in batches
        results = self._predict_data_batches(data_chunks)
        # convert results to dataframe
        df = self._convert_model_results_df(results)
        # sort columns
        sorted_columns = candidate_labels_to_columns(self.candidate_labels)
        sorted_columns.append(self.text_column_name)
        df = df[sorted_columns]
        return df


In [3]:
candidate_labels = [
  "data orchestration", 
  "container orchestration",
  "data transform", 
  "data ingestion",
  "scheduling",
  "ETL jobs", 
  "learning", 
  "streaming",
  "data lake",
  "data lakehouse",
  "data warehouse",
  "data mesh",
  "career",
  "CI/CD"
]

In [4]:
from transformers import pipeline
from transformers import AutoTokenizer, AutoConfig
import argparse
import yaml


def save_zero_shot_model(model_name="facebook/bart-large-mnli", model_path="./zero-shot-model/"):
    """Save pretrained zero shot model to disk

    Args:
        model_name (str, optional): Name of sequence classification model. Defaults to "facebook/bart-large-mnli".
        model_path (str, optional): Local path to save model. Defaults to "./zero-shot-model/".
    """
    # load model
    classifier = pipeline(
        "zero-shot-classification",
        model=model_name,
        config=AutoConfig.from_pretrained(model_name),
        tokenizer=AutoTokenizer.from_pretrained(model_name),
        framework="pt",
    )
    # save model
    classifier.save_pretrained(model_path)

In [5]:
save_zero_shot_model()

KeyboardInterrupt: 

In [6]:
text_column = 'post_combined_title_description'
date_column = 'post_created_utc_date'
incremental_interval = '3 day'
other_columns = 'id,post_created_utc_date'
zero_shot_model_path = './zero-shot-model/'
ext_zero_shot_table_schema = 'ext_staging'
seed_value = 42
batch_size = 4
multi_label_prediction = True

In [7]:
text_table = "blogs.reddit_dataengineering"
zero_shot_table = "blogs.reddit_zero_shot"
is_incremental = False

In [8]:
gpu_id = -1
sorted_zero_shot_columns = candidate_labels_to_columns(candidate_labels)
sorted_zero_shot_columns.append("zero_shot_text")
select_other_columns_query = ",".join([f"{text_table}.{col}" for col in other_columns.split(",")]) if other_columns else ""
incremental_query = f"""
WHERE {date_column} >= (
SELECT
    max({date_column}) - interval '{incremental_interval}'
FROM {zero_shot_table}
)
""" if is_incremental else ""

In [9]:
print(incremental_query)




In [10]:
Q = f"""
SELECT
    {select_other_columns_query},
    {text_table}.{text_column}
FROM
    {text_table}
{incremental_query};
"""

In [11]:
print(Q)


SELECT
    blogs.reddit_dataengineering.id,blogs.reddit_dataengineering.post_created_utc_date,
    blogs.reddit_dataengineering.post_combined_title_description
FROM
    blogs.reddit_dataengineering
;



In [12]:
import psycopg2
import os
from dotenv import load_dotenv

load_dotenv("./postgres/.env")

True

In [13]:
PG_USER = os.getenv('POSTGRES_USER')
PG_PW = os.getenv('POSTGRES_PASSWORD')
PG_DB = os.getenv('POSTGRES_DB')

In [14]:
postgres_conn_args = {
    "host": "localhost",
    "database": PG_DB,
    "user": PG_USER,
    "password": PG_PW,
    "port": 5432
}

In [17]:
pg_conn = psycopg2.connect(**postgres_conn_args)

In [18]:
with pg_conn:
    df = pd.read_sql(Q, pg_conn)

In [23]:
df[df.duplicated("post_combined_title_description")].iloc[0].post_combined_title_description

'The Guide to Data Versioning '

In [24]:
df[df.post_combined_title_description == 'The Guide to Data Versioning ']

Unnamed: 0,id,post_created_utc_date,post_combined_title_description
52,rk84l6,2021-12-19,The Guide to Data Versioning
252,ra64xg,2021-12-06,The Guide to Data Versioning


In [173]:
candidate_labels

['data orchestration',
 'container orchestration',
 'data transform',
 'data ingestion',
 'scheduling',
 'ETL jobs',
 'learning',
 'streaming',
 'data lake',
 'data lakehouse',
 'data warehouse',
 'data mesh',
 'career',
 'CI/CD']

In [211]:
zero_shot_model = model_inference(
    text_docs=df[text_column].values,
    candidate_labels=candidate_labels,
    model=zero_shot_model_path,
    tokeniser=zero_shot_model_path,
    multi_label=multi_label_prediction,
    batch_size=batch_size,
    seed_value=seed_value,
    gpu_id=gpu_id
)

In [None]:
df_results = zero_shot_model.run()

Classifying with CPU:   0%|          | 0/1913 [00:00<?, ?it/s]

In [177]:
df_results

Unnamed: 0,career,ci_cd,container_orchestration,data_ingestion,data_lake,data_lakehouse,data_mesh,data_orchestration,data_transform,data_warehouse,etl_jobs,learning,scheduling,streaming,zero_shot_text
0,0.098197,0.157531,0.171678,0.174216,0.160785,0.276192,0.376911,0.310253,0.148455,0.264665,0.022643,0.10046,0.055658,0.125571,Considerations for System Design to solve Scal...
1,0.962993,0.248706,0.080922,0.207498,0.235998,0.315218,0.311318,0.235031,0.320939,0.201492,0.112974,0.93502,0.247679,0.340568,Any advice for 20-30 years commitment to Data ...
2,0.878054,0.182176,0.045917,0.125504,0.162288,0.145944,0.148986,0.100657,0.191071,0.106329,0.078659,0.301565,0.086557,0.295934,Is data engineering stressful? Hi everyone!\n\...
3,0.259345,0.063874,0.015281,0.346872,0.150331,0.425382,0.053838,0.697812,0.130947,0.149373,0.000651,0.209318,0.087859,0.028639,Save NumPy Arrays to CSV Files
4,0.651201,0.149031,0.023593,0.051948,0.062665,0.142306,0.146887,0.088188,0.14468,0.031745,0.060156,0.503379,0.014783,0.147784,Is being a data engineer just a specialised so...
5,0.66614,0.015538,0.059496,0.488118,0.430132,0.363005,0.565051,0.529929,0.446419,0.420775,0.038153,0.703167,0.330835,0.488099,Need help to prepare for FAANG data engineer i...
6,0.238368,0.209683,0.071292,0.26585,0.149031,0.149039,0.043503,0.376699,0.327314,0.131192,0.121924,0.174928,0.361687,0.213443,Anyone using python ray.io framework in produc...


In [181]:
sample_text = df_results.iloc[0].zero_shot_text

In [None]:
list()

In [180]:
classifier = pipeline(
            "zero-shot-classification",
            model=zero_shot_model_path,
            tokenizer=zero_shot_model_path,
            framework="pt",
            device=gpu_id,
        )

In [183]:
df

Unnamed: 0,id,post_created_utc_date,post_combined_title_description
0,rou5l5,2021-12-26,Considerations for System Design to solve Scal...
1,roqnd3,2021-12-26,Any advice for 20-30 years commitment to Data ...
2,roq6gw,2021-12-26,Is data engineering stressful? Hi everyone!\n\...
3,roh6z4,2021-12-25,Save NumPy Arrays to CSV Files
4,rofnm0,2021-12-25,Is being a data engineer just a specialised so...
...,...,...,...
7646,9obxqc,2018-10-15,Iceberg: Improving The Utility Of Cloud-Native...
7647,9mz3eq,2018-10-10,Flink Vs Spark | Apache Flink is successor to ...
7648,9mrf65,2018-10-09,Insight Data Engineering Fellowship coming to ...
7649,9monpj,2018-10-09,"Fast, Scalable, and Flexible Data For Applicat..."


In [189]:
final_df = df.merge(
    df_results, left_on = "post_combined_title_description", right_on = "zero_shot_text"
).drop("post_combined_title_description", axis=1)

In [188]:
["a", "b"] + ["c"]

['a', 'b', 'c']

In [206]:
final_df_2 = df.iloc[0:7].join(df_results)

In [207]:
final_df_2 = final_df_2.drop("post_combined_title_description", axis=1)

In [208]:
final_df_2.iloc[0:7].shape

(7, 17)

In [210]:
final_df.equals(final_df_2)

True

In [204]:
result = final_df.compare(final_df_2)

In [205]:
result