In [2]:
cd ../

/app


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


In [3]:
from abc import ABC, abstractmethod
import dask.dataframe as dd
from src.utils.utils import get_logger  
from dask_ml.model_selection import train_test_split
import os
from typing import Optional

In [61]:
class DatasetReader(ABC):
    required_columns = {"text", "label", "split", "dataset_name"}
    split_names = {"train", "dev", "test"}
    
    def __init__(self, dataset_dir:str, dataset_name:str) -> None:
        self.logger = get_logger(self.__class__.__name__)
        self.dataset_dir=dataset_dir
        self.dataset_name=dataset_name
        
        
    # def read_data(self) -> dd.core.DataFrame:
    #     train_df, dev_df, test_df = self._read_data()
    #     df = self.assign_split_names_to_data_frames_and_merge(train_df, dev_df, test_df)
    #     df["dataset_name"] = self.dataset_name
    #     if any(required_column not in df.columns.values for required_column in self.required_columns):
    #         raise ValueError(f"Dataset must contain all required columns: {self.required_columns}")
    #     unique_split_names = set(df["split"].unique().compute().tolist())
    #     if unique_split_names != self.split_names:
    #         raise ValueError(f"Dataset must contain all required split names: {self.split_names}")
    #     final_df: dd.core.DataFrame = df[list(self.required_columns)]
    #     return final_df
    def read_data(self) -> dd.core.DataFrame:
        train_df, dev_df, test_df = self._read_data()
        df = self.assign_split_names_to_data_frames_and_merge(train_df, dev_df, test_df)
        df["dataset_name"] = self.dataset_name
        df = df.compute()
        if any(required_column not in df.columns.values for required_column in self.required_columns):
            raise ValueError(f"Dataset must contain all required columns: {self.required_columns}")
        unique_split_names = set(df["split"].unique().tolist()) 
        print(unique_split_names)
        if unique_split_names != self.split_names:
            raise ValueError(f"Dataset must contain all required split names: {self.split_names}")
        final_df: dd.core.DataFrame = df[list(self.required_columns)]
        return final_df
        
        
    @abstractmethod
    def _read_data(self) -> tuple[dd.core.DataFrame, dd.core.DataFrame, dd.core.DataFrame]:
        """
        Read and split dataset into 3 splits: train, dev, test.
        The return value must be a dd.core.DataFrame, with required columns: self.required_columns
        """
        
    def assign_split_names_to_data_frames_and_merge(self, train_df:dd.core.DataFrame, dev_df: dd.core.DataFrame, test_df:dd.core.DataFrame) -> dd.core.DataFrame:
       train_df["split"] = "train"
       dev_df["split"] = "dev"
       test_df["split"] = "test"
       return dd.concat([train_df, dev_df, test_df])
        
        
    def split_dataset(
        self, df: dd.core.DataFrame, test_size: float, stratify_column: Optional[str] = None
    ) -> tuple[dd.core.DataFrame, dd.core.DataFrame]:
        if stratify_column is None:
            return train_test_split(df, test_size=test_size, random_state=1234, shuffle=True)  # type: ignore
        unique_column_values = df[stratify_column].unique()
        first_dfs = []
        second_dfs = []
        for unique_set_value in unique_column_values:
            sub_df = df[df[stratify_column] == unique_set_value]
            sub_first_df, sub_second_df = train_test_split(sub_df, test_size=test_size, random_state=1234, shuffle=True)
            first_dfs.append(sub_first_df)
            second_dfs.append(sub_second_df)

        first_df = dd.concat(first_dfs)  # type: ignore
        second_df = dd.concat(second_dfs)  # type: ignore
        return first_df, second_df

    # def get_remote_data_url(self, dataset_path: str) -> str:
    #     dataset_url: str = get_url(path=dataset_path, repo=self.dvc_remote_repo, rev=self.version)
    #     return dataset_url
        
        
class GHCDatasetReader(DatasetReader):
    def __init__(
        self,
        dataset_dir: str,
        dataset_name: str,
        dev_split_ratio: float,
        # gcp_project_id: str,
        # gcp_github_access_token_secret_id: str,
        # dvc_remote_repo: str,
        # github_user_name: str,
        # version: str,
    ) -> None:
        super().__init__(
            dataset_dir,
            dataset_name,
            # gcp_project_id,
            # gcp_github_access_token_secret_id,
            # dvc_remote_repo,
            # github_user_name,
            # version,
        )
        self.dev_split_ratio = dev_split_ratio

    def _read_data(self) -> tuple[dd.core.DataFrame, dd.core.DataFrame, dd.core.DataFrame]:
        self.logger.info("Reading GHC dataset...")
        train_tsv_path = os.path.join(self.dataset_dir, "ghc_train.tsv")
        # train_tsv_url = self.get_remote_data_url(train_tsv_path)
        # train_df = dd.read_csv(train_tsv_url, sep="\t", header=0)
        train_df = dd.read_csv(train_tsv_path, sep="\t", header=0)

        test_tsv_path = os.path.join(self.dataset_dir, "ghc_test.tsv")
        test_df = dd.read_csv(test_tsv_path, sep="\t", header=0)
        # test_tsv_url = self.get_remote_data_url(test_tsv_path)
        # test_df = dd.read_csv(test_tsv_url, sep="\t", header=0)
        
        train_df["label"] = (train_df["hd"] + train_df["cv"] + train_df["vo"] > 0).astype(int)
        test_df["label"] = (test_df["hd"] + test_df["cv"] + test_df["vo"] > 0).astype(int)

        train_df, dev_df = self.split_dataset(train_df, self.dev_split_ratio, stratify_column="label")

        return train_df, dev_df, test_df

In [62]:
dataset_reader = GHCDatasetReader(dataset_dir= "./data/raw/ghc", dataset_name="ghc", dev_split_ratio= 0.2)


In [63]:
dataset_reader.read_data()

{'test', 'dev', 'train'}


Unnamed: 0,dataset_name,text,split,label
13501,ghc,Boris Johnson has written another storming art...,train,0
14241,ghc,@Tripper,train,0
542,ghc,"Who changed the truth of God into a lie, and w...",train,0
2001,ghc,Thanks Ken! And thanks always for your photogr...,train,0
15944,ghc,First cousin marriages do not always produce g...,train,0
...,...,...,...,...
5505,ghc,"Trump To ""Counter"" DNC Lawsuit; Seeks Servers,...",test,0
5506,ghc,i guess eu is gonna have to back track a littl...,test,0
5507,ghc,A good read here.... https://countrysquire....,test,0
5508,ghc,The only way to change things is to have compa...,test,0


In [55]:
train_df, dev_df, test_df = dataset_reader._read_data()

In [28]:
len(train_df)

17676

In [8]:
len(dev_df)

4360

In [9]:
len(test_df)

5510

In [29]:
df = dataset_reader.assign_split_names_to_data_frames_and_merge(train_df, dev_df, test_df)
df.head(5)

Unnamed: 0,text,hd,cv,vo,label,split
13501,Boris Johnson has written another storming art...,0,0,0,0,train
14241,@Tripper,0,0,0,0,train
542,"Who changed the truth of God into a lie, and w...",0,0,0,0,train
2001,Thanks Ken! And thanks always for your photogr...,0,0,0,0,train
15944,First cousin marriages do not always produce g...,0,0,0,0,train


In [9]:
train_df = dd.read_csv("./data/raw/ghc/ghc_train.tsv", sep="\t", header=0)
test_df = dd.read_csv("./data/raw/ghc/ghc_test.tsv", sep="\t", header=0)


In [10]:
print(f"{train_df.shape=}")
print(f"{test_df.shape=}")

train_df.shape=(<dask_expr.expr.Scalar: expr=ReadCSV(28dabe4).size() // 4, dtype=int64>, 4)
test_df.shape=(<dask_expr.expr.Scalar: expr=ReadCSV(79b189f).size() // 4, dtype=int64>, 4)


In [11]:
train_df.head(10)

Unnamed: 0,text,hd,cv,vo
0,He most likely converted to islam due to his n...,0,0,0
1,So Ford lied about being a psychologist. Recor...,0,0,0
2,Jobs. Education. Ending abuse of Nation. CA43.,0,0,0
3,"I share a lot of your values, & like many who ...",0,0,0
4,I am so ready to get back to blogging! www.ben...,0,0,0
5,taking a look at new opportunity called FX Pro...,0,0,0
6,Reflecting back when I was in school with Spec...,0,0,0
7,"Let's be honest everyone, last year there were...",0,0,0
8,2007 Nuke plant in Syria,0,0,0
9,NBC's Chuck Todd Thinks He's Figured It All Ou...,0,0,0


In [7]:
test_df.head(10)

Unnamed: 0,text,hd,cv,vo
0,https://www.youtube.com/watch?v=kACWpKAKtak A ...,0,0,0
1,Very nice! I tend to get tired of the constant...,0,0,0
2,Watch today. https://circumcisionmovie.com/,0,0,0
3,""" Thinking Venues "" First Color Layer blocking...",0,0,0
4,What about death penalty for perpetrators and...,0,0,0
5,You have insecurities? Fuck you,0,0,0
6,Powerful Truths About Donald Trump the Media D...,0,0,0
7,Just saw an Oreo cookie commercial. Drool.,0,0,0
8,I bet a neural net with half a dozen nodes co...,0,0,0
9,"Country by country, the 'Prague Spring' that w...",0,0,0
