In [13]:
import os
import pandas as pd

# Extract

In [14]:
from abc import ABC, abstractmethod
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Tweeter Data ETL").getOrCreate()

class ExtractAbstract(ABC):
    def __init__(self, path):
        self.path = path

    def extractCsv(self):
        pass

    def extractParquet(self):
        pass

class Extract(ExtractAbstract):

    def extractCsv(self):
        df = spark.read.option("header", 'true')\
        .option("inferSchema", 'true').\
        csv(self.path)
        return df

    def extractParquet(self):
        df = spark.read.option("header", 'true')\
        .option("inferSchema", 'true').\
        parquet(self.path)
        return df

In [15]:
class ExtractWorkflow:
    def __init__(self, file_format:str):
        self.file_format = file_format

    def workflow(self, path:str):
        """Extract data with sepcified format and returns dataframe"""
        extractor = Extract(path=path)
        if self.file_format == "csv":
            data = extractor.extractCsv()
        elif self.file_format == "parquet":
            data = extractor.extractParquet()
        return data


# Transform

In [16]:
from abc import ABC

class TransformAbstract(ABC):
    def __init__(self, df, cols:list):
        self.df = df
        self.cols = cols

    def featureExtraction(self):
        pass
   
    def dropNa(self):
        pass

    def splitByCatefory(self):
        pass
   
class Transform(TransformAbstract):
    def featureExtraction(self):
        """ cols: list (This is list of column names which are considered for data transformation)
            Extract Required Features only and returns dataframd"""
        self.df = self.df.select(self.cols)
        return self
    
    def dropNa(self):
        """Dops rows which contains None value and returns dataframe"""
        self.df = self.df.na.drop()
        return self.df
    
    def splitByCategory(self, target:str) -> dict:
        """ target: str (this is the name of column on which we are applying splitBy)
            Accepts target column name (string) and returns dictionary with splitted df by category"""
        print(self.df.show())
        print("target", target)
        unique_values = [row[target] for row in self.df.select(target).distinct().collect()]
        print(f"columns = {unique_values}")
        temp = {}
        for col in unique_values:
            temp[col] = self.df[self.df[target] == col]
        return temp

In [17]:
class TransformWorkflow:
    def __init__(self, cols:list, target:str):
        self.cols = cols
        self.target = target

    def workflow(self, df) -> dict:
        """featureExtraction > dropNa > splitByCategory
            Returns: tuple (full_df, splitted_by_category_dict)"""
        transformer = Transform(df=df, cols=self.cols)
        transformer.featureExtraction()
        full_df = transformer.dropNa()
        splitted_by_category_dict = transformer.splitByCategory(target=self.target)
        print("splitted_by_category_dict", splitted_by_category_dict)
        return {"full_df":full_df, "splitted_df":splitted_by_category_dict}
    

# Load

In [18]:
from abc import ABC
from pyspark.sql import DataFrame
import os

class LoadAbstract(ABC):
    def __init__(self, folder_path: str):
        self.folder_path = folder_path

    def loadData(self, path: str):
        pass

class Load(LoadAbstract):

    def sparkToDf(self, spark_df, cols:list):
        temp = []
        for row in list(spark_df.collect()[:]):
            temp.append(list(row[:]))
        df = pd.DataFrame(temp, columns=cols)
        return df

    def singleCsvLoader(self, df: DataFrame, file_name: str, cols:list):
        """df (spark dataframe), file_name (string)
            Returns: None
            Stores df into specified file_name (ex. train.csv)"""
        
        path = os.path.join(self.folder_path, file_name) + ".csv"
        df = self.sparkToDf(spark_df=df, cols = cols)
        print(f"Dataframe name = {file_name}")
        df.head()
        df.to_csv(path, index=False)
    
    def multiCsvLoader(self, data: dict, cols:list):
        """data (dict of spark dataframe and file_name)
            Returns: None
            Stores each df in 'data' dict into the specified file_name."""
        
        for file_name, df in data.items():       
            path = os.path.join(self.folder_path, file_name) + ".csv"
            df = self.sparkToDf(spark_df=df, cols = cols)
            print(f"Dataframe with sentiment {file_name}")
            df.head()
            sampler = SampleIndices(y=y)
            indices = sampler.getSamples(sample_rate=0.2, stratify=True)
            df.iloc[indices].to_csv(path, index=False)



In [19]:
class Pipe:
    TRAIN_PATH = "data/row_data/twitter_training.csv"
    TEST_PATH = "data/row_data/twitter_validation.csv"
    OUTPUT_DATA = "data/processed_data"

    def pipeline_runner(self, file_format:str, cols:list, target:str):
        extractor = ExtractWorkflow(file_format=file_format)
        train_df = extractor.workflow(path=self.TRAIN_PATH)
        test_df = extractor.workflow(path=self.TEST_PATH)

        transform_workflow = TransformWorkflow(cols=cols, target=target)
        transformed_dict_train = transform_workflow.workflow(df=train_df)
        transformed_dict_test = transform_workflow.workflow(df=test_df)

        loader = Load(folder_path=self.OUTPUT_DATA)

        # Stores full Dataframe
        loader.singleCsvLoader(df=transformed_dict_train['full_df'], file_name="training", cols=cols)
        loader.singleCsvLoader(df=transformed_dict_test['full_df'], file_name="testing", cols=cols)
        
        # Stores data with split by category 
        loader.multiCsvLoader(data=transformed_dict_train['splitted_df'], cols=cols)
        loader.multiCsvLoader(data=transformed_dict_test['splitted_df'], cols=cols)

In [None]:
class SampleIndices:
    def __init__(self, y):
        self.y = y

    def stratifiedIndices(self, sample_rate:float) -> dict:
        label_indices = []
        for label in self.y.unique():
            label_indices.append(self.y[self.y == label][:int(sample_rate * len(self.y[self.y == label]))].index.tolist())
        label_indices = [val for sublist in label_indices for val in sublist] 
        return label_indices

    def getSamples(self, sample_rate:float=0.7, stratify:bool=True) -> pd.DataFrame:
        """ Sample_rate: float range from 0.1 to 1.0, default=70
            statify: bool, default True """
        if stratify:
            label_indices = self.stratifiedIndices(sample_rate)
            return label_indices
        return self.y.index.tolist()[:int(sample_rate * len(self.y))]

In [24]:
import os
pipe_runner = Pipe()
pipe_runner.pipeline_runner(file_format='csv', cols=["topics", "sentiments", "tweets"], target='sentiments')

+-----------+----------+--------------------+
|     topics|sentiments|              tweets|
+-----------+----------+--------------------+
|Borderlands|  Positive|im getting on bor...|
|Borderlands|  Positive|I am coming to th...|
|Borderlands|  Positive|im getting on bor...|
|Borderlands|  Positive|im coming on bord...|
|Borderlands|  Positive|im getting on bor...|
|Borderlands|  Positive|im getting into b...|
|Borderlands|  Positive|So I spent a few ...|
|Borderlands|  Positive|So I spent a coup...|
|Borderlands|  Positive|So I spent a few ...|
|Borderlands|  Positive|So I spent a few ...|
|Borderlands|  Positive|2010 So I spent a...|
|Borderlands|  Positive|                 was|
|Borderlands|   Neutral|Rock-Hard La Varl...|
|Borderlands|   Neutral|Rock-Hard La Varl...|
|Borderlands|   Neutral|Rock-Hard La Varl...|
|Borderlands|   Neutral|Rock-Hard La Vita...|
|Borderlands|   Neutral|Live Rock - Hard ...|
|Borderlands|   Neutral|I-Hard like me, R...|
|Borderlands|  Positive|that was t