NamedTuple

In [1]:
from collections import namedtuple

DataIngestionArtifact = namedtuple("DataIngestionArtifact",
                                   ["feature_store_file_path", "metadata_file_path", "download_dir"])

In [4]:
DI = DataIngestionArtifact("path_for_data_ingestion", "path_for_metadata_file", "path_for_download_dir")

In [5]:
DI[0]

'path_for_data_ingestion'

In [6]:
DI.metadata_file_path

'path_for_metadata_file'

In [7]:
DI._fields

('feature_store_file_path', 'metadata_file_path', 'download_dir')

### Pyspark Basics

In [3]:
from dotenv import load_dotenv
load_dotenv()
from finance_complaint.config.spark_manager import spark_session

In [4]:
file_path = "/home/captain/INeuron/Industry Ready/Finance Project/Repo/finance-complaint/finance_artifact/data_ingestion/feature_store/finance_complaint"

In [5]:
dataset = spark_session.read.parquet(file_path)

                                                                                

In [7]:
dataset.columns

['company',
 'company_public_response',
 'company_response',
 'complaint_id',
 'complaint_what_happened',
 'consumer_consent_provided',
 'consumer_disputed',
 'date_received',
 'date_sent_to_company',
 'issue',
 'product',
 'state',
 'sub_issue',
 'sub_product',
 'submitted_via',
 'tags',
 'timely',
 'zip_code']

In [8]:
dataset

DataFrame[company: string, company_public_response: string, company_response: string, complaint_id: string, complaint_what_happened: string, consumer_consent_provided: string, consumer_disputed: string, date_received: string, date_sent_to_company: string, issue: string, product: string, state: string, sub_issue: string, sub_product: string, submitted_via: string, tags: string, timely: string, zip_code: string]

In [9]:
from pyspark.sql.functions import col

In [11]:
df = dataset.select(col('state').alias(f'g_state')).groupby(f'g_state').count().withColumn('freq_count', col('count'))

In [13]:
df = df.drop('count')

In [14]:
df.show()

+--------------------+----------+
|             g_state|freq_count|
+--------------------+----------+
|UNITED STATES MIN...|        29|
|                  SC|      8868|
|                  AZ|      7988|
|                  LA|      7880|
|                  MN|      2912|
|                  NJ|     13640|
|                  DC|      1585|
|                  OR|      1821|
|                  VA|     10562|
|                null|       582|
|                  RI|       831|
|                  KY|      1969|
|                  WY|       187|
|                  NH|       514|
|                  MI|     10549|
|                  NV|      6869|
|                  WI|      3417|
|                  ID|       520|
|                  CA|     43178|
|                  CT|      3236|
+--------------------+----------+
only showing top 20 rows



In [15]:
df.collect()

[Row(g_state='UNITED STATES MINOR OUTLYING ISLANDS', freq_count=29),
 Row(g_state='SC', freq_count=8868),
 Row(g_state='AZ', freq_count=7988),
 Row(g_state='LA', freq_count=7880),
 Row(g_state='MN', freq_count=2912),
 Row(g_state='NJ', freq_count=13640),
 Row(g_state='DC', freq_count=1585),
 Row(g_state='OR', freq_count=1821),
 Row(g_state='VA', freq_count=10562),
 Row(g_state=None, freq_count=582),
 Row(g_state='RI', freq_count=831),
 Row(g_state='KY', freq_count=1969),
 Row(g_state='WY', freq_count=187),
 Row(g_state='NH', freq_count=514),
 Row(g_state='MI', freq_count=10549),
 Row(g_state='NV', freq_count=6869),
 Row(g_state='WI', freq_count=3417),
 Row(g_state='ID', freq_count=520),
 Row(g_state='CA', freq_count=43178),
 Row(g_state='CT', freq_count=3236),
 Row(g_state='NE', freq_count=607),
 Row(g_state='MT', freq_count=428),
 Row(g_state='NC', freq_count=16768),
 Row(g_state='VT', freq_count=182),
 Row(g_state='MD', freq_count=11770),
 Row(g_state='DE', freq_count=2614),
 Row(g_s

In [16]:
df.collect()[0]

Row(g_state='UNITED STATES MINOR OUTLYING ISLANDS', freq_count=29)

In [17]:
df.collect()[0].g_state

'UNITED STATES MINOR OUTLYING ISLANDS'

In [18]:
df.collect()[0].freq_count

29

In [20]:
spark_session.createDataFrame(df.collect()).show()

[Stage 21:>                                                         (0 + 1) / 1]

+--------------------+----------+
|             g_state|freq_count|
+--------------------+----------+
|UNITED STATES MIN...|        29|
|                  SC|      8868|
|                  AZ|      7988|
|                  LA|      7880|
|                  MN|      2912|
|                  NJ|     13640|
|                  DC|      1585|
|                  OR|      1821|
|                  VA|     10562|
|                null|       582|
|                  RI|       831|
|                  KY|      1969|
|                  WY|       187|
|                  NH|       514|
|                  MI|     10549|
|                  NV|      6869|
|                  WI|      3417|
|                  ID|       520|
|                  CA|     43178|
|                  CT|      3236|
+--------------------+----------+
only showing top 20 rows



                                                                                

### Pyspark Data Transformer

In [None]:
from pyspark.ml.transformer import Transformer
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.pipeline import Pipeline
from pyspark.sql import DataFrame

# Read data as pyspark dataframe

In [None]:


class SquareTransformer(Transformer):
    def __init__(self):
        super(SquareTransformer, self).__init__()
    
    def setInputCol(self, inputCol: str) -> None:
        self.inputCol = inputCol
        
    def setOutputCol(self, outputCol: str) -> None:
        self.outputCol = outputCol
        
    def transform(self, dataset: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
        square = lambda x: x**2
        squared_udf = udf(square, DoubleType())
        return dataset.withColumn(self.outputCol, squared_udf(self.inputCol))


In [None]:
# Create an instance of the SquareTransformer class
square_transformer = SquareTransformer()

# Set the input and output columns for the transformer
square_transformer.setInputCol('col1')
square_transformer.setOutputCol('squared')

# Create a PySpark pipeline and add the square_transformer to it
pipeline = Pipeline(stages=[square_transformer])

# Fit the pipeline to a dataset and apply the transformation
transformed_dataset = pipeline.fit(dataset).transform(dataset)


True