# AMS 598 Project
Jiecheng Song

In [1]:
# Loading required module
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, countDistinct

## Set up spark context and SparkSession

In [2]:
spark = SparkSession.builder.appName("AMS 598 Project 3 Linear regreesion")\
           .config("spark.some.config.option", "some-value").getOrCreate()

## Load dataset

The data was cleaned in excel, replaced the comma in the 'fiProductClassDesc' variable

In [3]:
# read data
train_filename = "/Users/jiechengsong/Documents/2021 fall/AMS 598/project3/Train_project3.csv"
test_filename = "/Users/jiechengsong/Documents/2021 fall/AMS 598/project3/Test_project3.csv"
train_data = spark.read.options(header=True, inferSchema=True).csv(train_filename)
test_data =spark.read.options(header=True, inferSchema=True).csv(test_filename)

In [4]:
train_data.columns

['SalesID',
 'SalePrice',
 'MachineID',
 'ModelID',
 'datasource',
 'auctioneerID',
 'YearMade',
 'MachineHoursCurrentMeter',
 'UsageBand',
 'saledate',
 'fiModelDesc',
 'fiBaseModel',
 'fiSecondaryDesc',
 'fiModelSeries',
 'fiModelDescriptor',
 'ProductSize',
 'fiProductClassDesc',
 'state',
 'ProductGroup',
 'ProductGroupDesc',
 'Drive_System',
 'Enclosure',
 'Forks',
 'Pad_Type',
 'Ride_Control',
 'Stick',
 'Transmission',
 'Turbocharged',
 'Blade_Extension',
 'Blade_Width',
 'Enclosure_Type',
 'Engine_Horsepower',
 'Hydraulics',
 'Pushblock',
 'Ripper',
 'Scarifier',
 'Tip_Control',
 'Tire_Size',
 'Coupler',
 'Coupler_System',
 'Grouser_Tracks',
 'Hydraulics_Flow',
 'Track_Type',
 'Undercarriage_Pad_Width',
 'Stick_Length',
 'Thumb',
 'Pattern_Changer',
 'Grouser_Type',
 'Backhoe_Mounting',
 'Blade_Type',
 'Travel_Controls',
 'Differential_Type',
 'Steering_Controls']

In [33]:
len(train_data.columns)

53

In [34]:
train_data.count()

300000

In [5]:
test_data.columns

['SalesID',
 'MachineID',
 'ModelID',
 'datasource',
 'auctioneerID',
 'YearMade',
 'MachineHoursCurrentMeter',
 'UsageBand',
 'saledate',
 'fiModelDesc',
 'fiBaseModel',
 'fiSecondaryDesc',
 'fiModelSeries',
 'fiModelDescriptor',
 'ProductSize',
 'fiProductClassDesc',
 'state',
 'ProductGroup',
 'ProductGroupDesc',
 'Drive_System',
 'Enclosure',
 'Forks',
 'Pad_Type',
 'Ride_Control',
 'Stick',
 'Transmission',
 'Turbocharged',
 'Blade_Extension',
 'Blade_Width',
 'Enclosure_Type',
 'Engine_Horsepower',
 'Hydraulics',
 'Pushblock',
 'Ripper',
 'Scarifier',
 'Tip_Control',
 'Tire_Size',
 'Coupler',
 'Coupler_System',
 'Grouser_Tracks',
 'Hydraulics_Flow',
 'Track_Type',
 'Undercarriage_Pad_Width',
 'Stick_Length',
 'Thumb',
 'Pattern_Changer',
 'Grouser_Type',
 'Backhoe_Mounting',
 'Blade_Type',
 'Travel_Controls',
 'Differential_Type',
 'Steering_Controls']

In [35]:
len(test_data.columns)

52

In [36]:
test_data.count()

101125

## Combine regressors for preprocessing (for generate dummy var)

In [6]:
df = train_data.drop('SalePrice').union(test_data)

## glimpse of data

In [7]:
# show first 5 line and format of data
df.show(5, True)
df.printSchema()

+-------+---------+-------+----------+------------+--------+------------------------+---------+---------------+-----------+-----------+---------------+-------------+-----------------+-----------+--------------------+--------------+------------+------------------+------------+----------+-------------------+--------+-------------------+-----+------------+------------+---------------+-----------+--------------+-----------------+----------+---------+------+---------+-----------+-------------------+-------------------+-------------------+-------------------+---------------+----------+-----------------------+------------+-----+---------------+------------+----------------+----------+---------------+-----------------+-----------------+
|SalesID|MachineID|ModelID|datasource|auctioneerID|YearMade|MachineHoursCurrentMeter|UsageBand|       saledate|fiModelDesc|fiBaseModel|fiSecondaryDesc|fiModelSeries|fiModelDescriptor|ProductSize|  fiProductClassDesc|         state|ProductGroup|  ProductGroupDes

In [8]:
# numer of samples
df.count()

401125

In [9]:
# count distinct value
df.select([countDistinct(c).alias(c) for c in df.columns]).show()

+-------+---------+-------+----------+------------+--------+------------------------+---------+--------+-----------+-----------+---------------+-------------+-----------------+-----------+------------------+-----+------------+----------------+------------+---------+-----+--------+------------+-----+------------+------------+---------------+-----------+--------------+-----------------+----------+---------+------+---------+-----------+---------+-------+--------------+--------------+---------------+----------+-----------------------+------------+-----+---------------+------------+----------------+----------+---------------+-----------------+-----------------+
|SalesID|MachineID|ModelID|datasource|auctioneerID|YearMade|MachineHoursCurrentMeter|UsageBand|saledate|fiModelDesc|fiBaseModel|fiSecondaryDesc|fiModelSeries|fiModelDescriptor|ProductSize|fiProductClassDesc|state|ProductGroup|ProductGroupDesc|Drive_System|Enclosure|Forks|Pad_Type|Ride_Control|Stick|Transmission|Turbocharged|Blade_Ext

In [10]:
# show not null value count
df.select([count(when(col(c).isNotNull() , c)).alias(c) for c in df.columns]).show()

+-------+---------+-------+----------+------------+--------+------------------------+---------+--------+-----------+-----------+---------------+-------------+-----------------+-----------+------------------+------+------------+----------------+------------+---------+------+--------+------------+-----+------------+------------+---------------+-----------+--------------+-----------------+----------+---------+------+---------+-----------+---------+-------+--------------+--------------+---------------+----------+-----------------------+------------+-----+---------------+------------+----------------+----------+---------------+-----------------+-----------------+
|SalesID|MachineID|ModelID|datasource|auctioneerID|YearMade|MachineHoursCurrentMeter|UsageBand|saledate|fiModelDesc|fiBaseModel|fiSecondaryDesc|fiModelSeries|fiModelDescriptor|ProductSize|fiProductClassDesc| state|ProductGroup|ProductGroupDesc|Drive_System|Enclosure| Forks|Pad_Type|Ride_Control|Stick|Transmission|Turbocharged|Blade

There are 341027 Machine ID in 401125 samples, so we shouldn't consider Machine ID as features, MachineHoursCurrentMeter is numeric and the other can be treated as categorical, the sale date can be sepearated as year and month then treat as categorical, which may be better, remove track_type, since it is all null.

## deal with saledate

In [11]:
from pyspark.sql.functions import substring, length
from pyspark.sql.functions import udf

udf1 = udf(lambda x:x[0:x.index('/')])
udf2 = udf(lambda x:x[-9:-4])
df = df.withColumn("month", udf1("saledate"))
df = df.withColumn("year", udf2("saledate"))

## remove unused variables and simple imputation

In [12]:
df = df.drop('saledate', 'machineID')

In [13]:
df = df.replace('NA', '0', subset=['MachineHoursCurrentMeter'])
df = df.fillna('0', subset=['MachineHoursCurrentMeter'])

In [14]:
from pyspark.sql.types import DoubleType
df = df.withColumn("MachineHoursCurrentMeter", df["MachineHoursCurrentMeter"].cast(DoubleType()))

In [15]:
from pyspark.sql.types import StringType
for i in df.columns[1:5] + df.columns[6:]:
    df = df.fillna('NA', subset = i)
    df = df.withColumn(i, df[i].cast(StringType()))

In [16]:
# show not null value count (no blank)
df.select([count(when(col(c).isNotNull() , c)).alias(c) for c in df.columns]).show()

+-------+-------+----------+------------+--------+------------------------+---------+-----------+-----------+---------------+-------------+-----------------+-----------+------------------+------+------------+----------------+------------+---------+------+--------+------------+------+------------+------------+---------------+-----------+--------------+-----------------+----------+---------+------+---------+-----------+---------+-------+--------------+--------------+---------------+----------+-----------------------+------------+------+---------------+------------+----------------+----------+---------------+-----------------+-----------------+------+------+
|SalesID|ModelID|datasource|auctioneerID|YearMade|MachineHoursCurrentMeter|UsageBand|fiModelDesc|fiBaseModel|fiSecondaryDesc|fiModelSeries|fiModelDescriptor|ProductSize|fiProductClassDesc| state|ProductGroup|ProductGroupDesc|Drive_System|Enclosure| Forks|Pad_Type|Ride_Control| Stick|Transmission|Turbocharged|Blade_Extension|Blade_Widt

## Convert the data to dense vector and dummy variable (features and label)

In [17]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [18]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol,dropLast=False):
    '''
    Get dummy variables and concat with continuous variables for ml modeling.
    :param df: the dataframe
    :param categoricalCols: the name list of the categorical data
    :param continuousCols:  the name list of the numerical data
    :param labelCol:  the name of label column
    :param dropLast:  the flag of drop last column
    :return: feature matrix
    :reference: https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf
    :author: Wenqiang Feng
    :revised by: Jiecheng Song
    '''
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col
    
    indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]
    # default setting: dropLast=True
    encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{0}_encoded".format(indexer.getOutputCol()), dropLast=dropLast) 
                for indexer in indexers]
    
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + continuousCols, outputCol="features")
    
    pipeline = Pipeline(stages=indexers + encoders + [assembler])
    
    model=pipeline.fit(df)
    data = model.transform(df)
    
    if indexCol and labelCol:
        # for supervised learning
        data = data.withColumn('label',col(labelCol)) 
        return data.select(indexCol,'features','label')
    elif not indexCol and labelCol:
        # for supervised learning
        data = data.withColumn('label',col(labelCol)) 
        return data.select('features','label')
    elif indexCol and not labelCol:
        # for unsupervised learning
        return data.select(indexCol,'features')
    elif not indexCol and not labelCol: 
        # for unsupervised learning 
        return data.select('features')

In [19]:
df_dummy = get_dummy(df,'SalesID',df.columns[1:5] + df.columns[6:],['MachineHoursCurrentMeter'],None,dropLast= True)

In [20]:
df_dummy.show()

+-------+--------------------+
|SalesID|            features|
+-------+--------------------+
|1139246|(13064,[1142,5220...|
|1139248|(13064,[40,5220,5...|
|1139249|(13064,[81,5220,5...|
|1139251|(13064,[1411,5220...|
|1139253|(13064,[53,5220,5...|
|1139255|(13064,[0,5220,52...|
|1139256|(13064,[214,5220,...|
|1139261|(13064,[47,5220,5...|
|1139272|(13064,[1872,5220...|
|1139275|(13064,[37,5220,5...|
|1139278|(13064,[0,5220,52...|
|1139282|(13064,[1243,5220...|
|1139283|(13064,[68,5220,5...|
|1139284|(13064,[14,5220,5...|
|1139290|(13064,[60,5220,5...|
|1139291|(13064,[3,5220,52...|
|1139292|(13064,[572,5220,...|
|1139299|(13064,[1473,5220...|
|1139301|(13064,[170,5220,...|
|1139304|(13064,[72,5220,5...|
+-------+--------------------+
only showing top 20 rows



In [21]:
train_dummy = train_data.select(['SalesID',col('SalePrice').alias('label')])
train_dummy = train_dummy.join(df_dummy, on = 'SalesID', how = 'inner')

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression()
pipeline = Pipeline(stages = [lr])
model = pipeline.fit(train_dummy)

In [23]:
def modelsummary(model):
    import numpy as np
    coef = np.append(list(model.coefficients),model.intercept) 
    Summary=model.summary
    print ("##","Mean squared error: % .6f" \
           % Summary.meanSquaredError, ", RMSE: % .6f" \
           % Summary.rootMeanSquaredError )
    print ("##","Multiple R-squared: %f" % Summary.r2, ",Total iterations: %i"% Summary.totalIterations)
    print ("##","Adjusted R-squared: %f" % Summary.r2adj)

In [24]:
modelsummary(model.stages[-1])

## Mean squared error:  86634417.443971 , RMSE:  9307.761140
## Multiple R-squared: 0.828548 ,Total iterations: 100
## Adjusted R-squared: 0.820742


## Prediction

In [25]:
test_dummy = test_data.select(['SalesID'])
test_dummy = test_dummy.join(df_dummy, on = 'SalesID', how = 'inner')

In [26]:
predictions = model.transform(test_dummy)

In [27]:
predictions.show()

+-------+--------------------+------------------+
|SalesID|            features|        prediction|
+-------+--------------------+------------------+
|2241104|(13064,[289,5218,...|30118.352230247372|
|2241111|(13064,[289,5218,...| 22312.43482435544|
|2241112|(13064,[289,5218,...|31764.693843827685|
|2241114|(13064,[289,5218,...|15897.740068360246|
|2241118|(13064,[289,5218,...| 25277.99325016539|
|2241120|(13064,[289,5218,...|31920.622300369956|
|2241121|(13064,[289,5218,...| 22147.81689702234|
|2241124|(13064,[289,5218,...| 23804.44169599517|
|2241128|(13064,[289,5218,...|29654.152828422317|
|2241129|(13064,[289,5218,...| 20388.41163263911|
|2241135|(13064,[289,5218,...| 36311.01271867051|
|2241136|(13064,[289,5218,...|30869.137141512845|
|2241143|(13064,[713,5218,...| 86899.05905747288|
|2241144|(13064,[713,5218,...| 88842.34353923445|
|2241145|(13064,[713,5218,...|  79661.2283191076|
|2241146|(13064,[713,5218,...| 87305.39750075944|
|2241149|(13064,[713,5218,...| 85152.19538035501|


In [30]:
predictions.select(['SalesID','prediction']).coalesce(1).write.option("header", "true").csv('prediction.csv')