In [38]:
import pyspark

In [39]:
from pyspark.sql import SparkSession

In [40]:
spark = SparkSession.builder.appName("Experiment").getOrCreate()

In [41]:
spark

In [42]:
data_path = "C:\\Users\\Pratiush\\Desktop\\PratLib\\PratLib\\data\\sales_data.csv"

# Reading a CSV file into a DataFrame
df = spark.read.csv(data_path, header=True, inferSchema=True)

In [43]:
df.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              NULL|              NULL|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              NULL|              NULL|    1422

In [44]:
## Module 1
from pyspark.ml.feature import OneHotEncoder as SparkOneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import when,col
import warnings

class OneHotEncoder:
    def __init__(self, **kwargs):
        self.encoder = None
        self.indexer = None
        self.indexer_output_col = None
        self.encoder_output_col = None

    def fit(self, df, input_col):
        self.indexer_output_col = f"{input_col}_index"
        self.encoder_output_col = f"{input_col}_encoded"
        
        self.indexer = StringIndexer(inputCol=input_col, outputCol=self.indexer_output_col).fit(df)
        indexed_df = self.indexer.transform(df)
        
        self.encoder = SparkOneHotEncoder(inputCol=self.indexer_output_col, outputCol=self.encoder_output_col).fit(indexed_df)
        return self

    def transform(self, df):
        if not self.encoder or not self.indexer:
            raise AttributeError("The encoder has not been fitted yet.")
        
        indexed_df = self.indexer.transform(df)
        new_df = self.encoder.transform(indexed_df)
        new_df = new_df.drop(self.indexer_output_col)
        return new_df

class LabelEncoder:
    def __init__(self, **kwargs):
        self.indexer = StringIndexer(**kwargs)

    def fit(self, df, input_col):
        self.indexer.setInputCol(input_col).setOutputCol(f"{input_col}_new")
        self.model = self.indexer.fit(df)
        return self
            
    def transform(self, df):
        return self.model.transform(df)

class map:
    def __init__(self):
        pass
    def mapper(self, df, column_name, dic):
        for key, value in dic.items():
            df = df.withColumn(
                column_name,
                when(df[column_name] == key, value).otherwise(df[column_name])
            )
        return df
        
class Cast:
    def __init__(self):
        pass

    def cast_col(self, df, feature, datatype):
        df = df.withColumn(
            feature, col(feature).cast(datatype) 
        )
        return df

In [45]:
## Module 2
from pyspark.ml.feature import StandardScaler as SparkStandardScaler, MinMaxScaler as SparkMinMaxScaler

class StandardScaler:
    def __init__(self, **kwargs):
        self.scaler = SparkStandardScaler(**kwargs)

    def fit(self, df, input_col):
        self.scaler.setInputCol(input_col).setOutputCol(f"{input_col}_scaled")
        self.model = self.scaler.fit(df)
        return self

    def transform(self, df):
        return self.model.transform(df)

class MinMaxScaler:
    def __init__(self, **kwargs):
        self.scaler = SparkMinMaxScaler(**kwargs)

    def fit(self, df, input_col):
        self.scaler.setInputCol(input_col).setOutputCol(f"{input_col}_scaled")
        self.model = self.scaler.fit(df)
        return self

    def transform(self, df):
        return self.model.transform(df)


In [46]:
# pratlib/preprocessing/imputer.py
from pyspark.ml.feature import Imputer as SparkImputer

class Imputer:
    def __init__(self, **kwargs):
        self.imputer = SparkImputer(**kwargs)

    def fit(self, df, input_cols, strategy):
        self.imputer.setInputCols([input_cols]).setOutputCols([f"{input_cols}_imputed"]).setStrategy(strategy)
        self.model = self.imputer.fit(df)
        return self

    def transform(self, df):
        return self.model.transform(df)

In [109]:
from pyspark.ml.feature import VectorAssembler

class Vectorizer:
    def __init__(self,**kwargs):
        self.model = VectorAssembler(**kwargs)

    def vectorize(self,df, input_col):
        self.model.setInputCols(input_col).setOutputCol("Independent Features")
        transformed = self.model.transform(df)
        return transformed

In [47]:
df = df.drop('User_ID')

In [48]:
ohe = OneHotEncoder()
ohe.fit(df,"Gender")
df = ohe.transform(df)

In [49]:
df.select('Age').distinct().collect()

[Row(Age='18-25'),
 Row(Age='26-35'),
 Row(Age='0-17'),
 Row(Age='46-50'),
 Row(Age='51-55'),
 Row(Age='36-45'),
 Row(Age='55+')]

In [50]:
map = map()

In [51]:
df = map.mapper(df, "Age", {'0-17':0, '18-25':1,"26-35":2, "36-45":3,"46-50":4,"51-55":5,"55+":6})

In [52]:
df.dtypes

[('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('Gender_encoded', 'vector')]

In [53]:
cast = Cast()

In [54]:
df = cast.cast_col(df,"Age","int")

In [55]:
df.dtypes

[('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('Gender_encoded', 'vector')]

In [56]:
df = map.mapper(df, "Stay_In_Current_City_Years", {'4+':4})

In [57]:
df = cast.cast_col(df, "Stay_In_Current_City_Years", "int")

In [58]:
df.dtypes

[('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'int'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('Gender_encoded', 'vector')]

In [59]:
ohe1 = OneHotEncoder()
ohe1.fit(df,"City_Category")
df = ohe1.transform(df)

In [60]:
df.columns

['Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Gender_encoded',
 'City_Category_encoded']

In [61]:
imp = Imputer()

In [80]:
imp.fit(df, 'Product_Category_2', 'mode')

<__main__.Imputer at 0x1d8d102bc90>

In [81]:
df = imp.transform(df)

In [94]:
imp2 = Imputer()
imp.fit(df, 'Product_Category_3', 'mode')
df = imp.transform(df)

In [95]:
df.dtypes

[('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'int'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int'),
 ('Gender_encoded', 'vector'),
 ('City_Category_encoded', 'vector'),
 ('Product_Category_2_imputed', 'int'),
 ('Product_Category_3_imputed', 'int')]

In [96]:
df.select('Product_Category_3_imputed').show()

+--------------------------+
|Product_Category_3_imputed|
+--------------------------+
|                        16|
|                        14|
|                        16|
|                        16|
|                        16|
|                        16|
|                        17|
|                        16|
|                        16|
|                        16|
|                        16|
|                        16|
|                        16|
|                         5|
|                        14|
|                        16|
|                         4|
|                        16|
|                        16|
|                        15|
+--------------------------+
only showing top 20 rows



In [97]:
df.show()

+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------+---------------------+--------------------------+--------------------------+
|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Gender_encoded|City_Category_encoded|Product_Category_2_imputed|Product_Category_3_imputed|
+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------+---------------------+--------------------------+--------------------------+
| P00069042|     F|  0|        10|            A|                         2|             0|                 3|              NULL|              NULL|    8370|     (1,[],[])|            (2,[],[])|                         8|                        16|
| P00248

In [98]:
df.columns

['Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Gender_encoded',
 'City_Category_encoded',
 'Product_Category_2_imputed',
 'Product_Category_3_imputed']

In [110]:
vect = Vectorizer()
vectors = vect.vectorize(df,['Age','Occupation','City_Category_encoded','Stay_In_Current_City_Years','Marital_Status','Product_Category_1','Product_Category_2_imputed','Product_Category_3_imputed','Gender_encoded','City_Category_encoded'])

In [111]:
vectors.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|(12,[1,4,6,7,8],[...|
|(12,[1,4,6,7,8],[...|
|(12,[1,4,6,7,8],[...|
|(12,[1,4,6,7,8],[...|
|[6.0,16.0,0.0,1.0...|
|[2.0,15.0,0.0,0.0...|
|[4.0,7.0,1.0,0.0,...|
|[4.0,7.0,1.0,0.0,...|
|[4.0,7.0,1.0,0.0,...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|(12,[0,1,4,6,7,8]...|
|(12,[0,1,4,6,7,8]...|
|(12,[0,1,4,6,7,8]...|
|(12,[0,1,4,6,7,8]...|
|[3.0,1.0,1.0,0.0,...|
|[2.0,12.0,0.0,1.0...|
+--------------------+
only showing top 20 rows



In [99]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['Age','Occupation','City_Category_encoded','Stay_In_Current_City_Years','Marital_Status','Product_Category_1','Product_Category_2_imputed','Product_Category_3_imputed','Gender_encoded','City_Category_encoded'],outputCol="Independent Features")
finalized_data=output.select("Independent Features","purchase")

In [100]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|(12,[1,4,6,7,8],[...|
|(12,[1,4,6,7,8],[...|
|(12,[1,4,6,7,8],[...|
|(12,[1,4,6,7,8],[...|
|[6.0,16.0,0.0,1.0...|
|[2.0,15.0,0.0,0.0...|
|[4.0,7.0,1.0,0.0,...|
|[4.0,7.0,1.0,0.0,...|
|[4.0,7.0,1.0,0.0,...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|[2.0,20.0,0.0,0.0...|
|(12,[0,1,4,6,7,8]...|
|(12,[0,1,4,6,7,8]...|
|(12,[0,1,4,6,7,8]...|
|(12,[0,1,4,6,7,8]...|
|[3.0,1.0,1.0,0.0,...|
|[2.0,12.0,0.0,1.0...|
+--------------------+
only showing top 20 rows



In [101]:
finalized_data=output.select("Independent Features","purchase")

In [102]:
finalized_data.show()

+--------------------+--------+
|Independent Features|purchase|
+--------------------+--------+
|(12,[1,4,6,7,8],[...|    8370|
|(12,[1,4,6,7,8],[...|   15200|
|(12,[1,4,6,7,8],[...|    1422|
|(12,[1,4,6,7,8],[...|    1057|
|[6.0,16.0,0.0,1.0...|    7969|
|[2.0,15.0,0.0,0.0...|   15227|
|[4.0,7.0,1.0,0.0,...|   19215|
|[4.0,7.0,1.0,0.0,...|   15854|
|[4.0,7.0,1.0,0.0,...|   15686|
|[2.0,20.0,0.0,0.0...|    7871|
|[2.0,20.0,0.0,0.0...|    5254|
|[2.0,20.0,0.0,0.0...|    3957|
|[2.0,20.0,0.0,0.0...|    6073|
|[2.0,20.0,0.0,0.0...|   15665|
|(12,[0,1,4,6,7,8]...|    5378|
|(12,[0,1,4,6,7,8]...|    2079|
|(12,[0,1,4,6,7,8]...|   13055|
|(12,[0,1,4,6,7,8]...|    8851|
|[3.0,1.0,1.0,0.0,...|   11788|
|[2.0,12.0,0.0,1.0...|   19614|
+--------------------+--------+
only showing top 20 rows



In [103]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='purchase')
regressor=regressor.fit(train_data)

In [104]:
regressor.coefficients

DenseVector([124.2398, 6.0639, 84.8423, 350.5784, 10.1534, -59.8281, -412.3959, -74.3645, 113.8365, 489.1411, 84.8423, 350.5784])

In [105]:
regressor.intercept

9808.80073034847

In [106]:
pred_results=regressor.evaluate(test_data)

In [107]:
## Final comparison
pred_results.predictions.show()

+--------------------+--------+------------------+
|Independent Features|purchase|        prediction|
+--------------------+--------+------------------+
|(12,[0,1,4,6,7,8]...|    8329|  9967.46891558951|
|(12,[0,1,4,6,7,8]...|   11924|10318.137529794309|
|(12,[0,1,4,6,7,8]...|   11752| 9723.221570096885|
|(12,[0,1,4,6,7,8]...|   12741|10128.835149767056|
|(12,[0,1,4,6,7,8]...|   13187|10128.835149767056|
|(12,[0,1,4,6,7,8]...|    9896| 9905.741664880521|
|(12,[0,1,4,6,7,8]...|    8057| 9790.803779815447|
|(12,[0,1,4,6,7,8]...|    2798| 8739.440399062432|
|(12,[0,1,4,6,7,8]...|    8638|  8668.55407013916|
|(12,[0,1,4,6,7,8]...|    1885| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    2010| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    3464| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    3661| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    5278| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    5373| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    6934| 8594.189575176983|
|(12,[0,1,4,6,7,8]...|    6967|

In [108]:
### PErformance Metrics
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.12967787240737583, 3582.671384438788, 21870759.419125516)

In [35]:
# pratlib/vectorizer/vectorizer.py

from pyspark.ml.feature import VectorAssembler

class Vectorizer:
    def __init__(self):
        self.assembler = None
        self.output_col = "Independent Features"

    def fit_transform(self, df, input_cols, label_col):
        # self.output_col = "Independent Features"
        self.assembler = VectorAssembler(inputCols=input_cols, outputCol=self.output_col)
        transformed_df = self.assembler.transform(df)
        # out = df.select(self.output_col, label_col)
        return transformed_df

    # def get_features_and_label(self, df, label_col):
    #     return df.select(self.output_col, label_col)


In [31]:
vec = Vectorizer()

In [32]:
df = vec.fit_transform(df,['Age','Occupation','City_Category_encoded','Stay_In_Current_City_Years','Marital_Status','Product_Category_1','Product_Category_2_imputed','Product_Category_3_imputed','Gender_encoded','City_Category_encoded'], 'Purchase').show()

+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------+---------------------+--------------------------+--------------------------+--------------------+
|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Gender_encoded|City_Category_encoded|Product_Category_2_imputed|Product_Category_3_imputed|Independent Features|
+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------+---------------------+--------------------------+--------------------------+--------------------+
| P00069042|     F|  0|        10|            A|                         2|             0|                 3|              NULL|              NULL|    8370|     (1,[],[])|            (2,[],[])|

In [74]:
# vec.get_features_and_label(df,"Purchase")

AttributeError: 'Vectorizer' object has no attribute 'get_features_and_label'

In [33]:
df.select("Independent Features")

AttributeError: 'NoneType' object has no attribute 'select'