In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext()

In [3]:
# this looks ugly to me, but everyone does it...
from pyspark.sql.types import *

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName('Test face') \
                    .getOrCreate()

In [5]:
# load file as a text file because the spark.read.text option is rubbish
housing = sc.textFile('C:\\Users\\mroberts\\Documents\\AmesHousing.txt')

In [6]:
housing.take(5)

['Order\tPID\tMS SubClass\tMS Zoning\tLot Frontage\tLot Area\tStreet\tAlley\tLot Shape\tLand Contour\tUtilities\tLot Config\tLand Slope\tNeighborhood\tCondition 1\tCondition 2\tBldg Type\tHouse Style\tOverall Qual\tOverall Cond\tYear Built\tYear Remod/Add\tRoof Style\tRoof Matl\tExterior 1st\tExterior 2nd\tMas Vnr Type\tMas Vnr Area\tExter Qual\tExter Cond\tFoundation\tBsmt Qual\tBsmt Cond\tBsmt Exposure\tBsmtFin Type 1\tBsmtFin SF 1\tBsmtFin Type 2\tBsmtFin SF 2\tBsmt Unf SF\tTotal Bsmt SF\tHeating\tHeating QC\tCentral Air\tElectrical\t1st Flr SF\t2nd Flr SF\tLow Qual Fin SF\tGr Liv Area\tBsmt Full Bath\tBsmt Half Bath\tFull Bath\tHalf Bath\tBedroom AbvGr\tKitchen AbvGr\tKitchen Qual\tTotRms AbvGrd\tFunctional\tFireplaces\tFireplace Qu\tGarage Type\tGarage Yr Blt\tGarage Finish\tGarage Cars\tGarage Area\tGarage Qual\tGarage Cond\tPaved Drive\tWood Deck SF\tOpen Porch SF\tEnclosed Porch\t3Ssn Porch\tScreen Porch\tPool Area\tPool QC\tFence\tMisc Feature\tMisc Val\tMo Sold\tYr Sold\tSale

In [7]:
# we're going to get the schema details from the header
header = housing.first()

In [8]:
header.split('\t')

['Order',
 'PID',
 'MS SubClass',
 'MS Zoning',
 'Lot Frontage',
 'Lot Area',
 'Street',
 'Alley',
 'Lot Shape',
 'Land Contour',
 'Utilities',
 'Lot Config',
 'Land Slope',
 'Neighborhood',
 'Condition 1',
 'Condition 2',
 'Bldg Type',
 'House Style',
 'Overall Qual',
 'Overall Cond',
 'Year Built',
 'Year Remod/Add',
 'Roof Style',
 'Roof Matl',
 'Exterior 1st',
 'Exterior 2nd',
 'Mas Vnr Type',
 'Mas Vnr Area',
 'Exter Qual',
 'Exter Cond',
 'Foundation',
 'Bsmt Qual',
 'Bsmt Cond',
 'Bsmt Exposure',
 'BsmtFin Type 1',
 'BsmtFin SF 1',
 'BsmtFin Type 2',
 'BsmtFin SF 2',
 'Bsmt Unf SF',
 'Total Bsmt SF',
 'Heating',
 'Heating QC',
 'Central Air',
 'Electrical',
 '1st Flr SF',
 '2nd Flr SF',
 'Low Qual Fin SF',
 'Gr Liv Area',
 'Bsmt Full Bath',
 'Bsmt Half Bath',
 'Full Bath',
 'Half Bath',
 'Bedroom AbvGr',
 'Kitchen AbvGr',
 'Kitchen Qual',
 'TotRms AbvGrd',
 'Functional',
 'Fireplaces',
 'Fireplace Qu',
 'Garage Type',
 'Garage Yr Blt',
 'Garage Finish',
 'Garage Cars',
 'Garage 

In [9]:
# get a field list to pass to the StructType class
# NOTE: it would also be a good idea to remove spaces from field names here
fields = [StructField(field_name.replace(' ','_').replace('/','_'), StringType(), True) for field_name in header.split('\t')]

In [10]:
for i, f in enumerate(fields):
    print(i, ' ', f)

0   StructField(Order,StringType,true)
1   StructField(PID,StringType,true)
2   StructField(MS_SubClass,StringType,true)
3   StructField(MS_Zoning,StringType,true)
4   StructField(Lot_Frontage,StringType,true)
5   StructField(Lot_Area,StringType,true)
6   StructField(Street,StringType,true)
7   StructField(Alley,StringType,true)
8   StructField(Lot_Shape,StringType,true)
9   StructField(Land_Contour,StringType,true)
10   StructField(Utilities,StringType,true)
11   StructField(Lot_Config,StringType,true)
12   StructField(Land_Slope,StringType,true)
13   StructField(Neighborhood,StringType,true)
14   StructField(Condition_1,StringType,true)
15   StructField(Condition_2,StringType,true)
16   StructField(Bldg_Type,StringType,true)
17   StructField(House_Style,StringType,true)
18   StructField(Overall_Qual,StringType,true)
19   StructField(Overall_Cond,StringType,true)
20   StructField(Year_Built,StringType,true)
21   StructField(Year_Remod_Add,StringType,true)
22   StructField(Roof_Style,S

In [11]:
# here's our schema object
schema = StructType(fields)

In [12]:
# change datatype manually
schema[81].dataType = FloatType()

In [13]:
schema[81]

StructField(SalePrice,FloatType,true)

In [14]:
# we need to remove the header from the rdd
housing_header_rdd = housing.filter(lambda l: 'PID' in l)

In [15]:
housing_header_rdd.collect()

['Order\tPID\tMS SubClass\tMS Zoning\tLot Frontage\tLot Area\tStreet\tAlley\tLot Shape\tLand Contour\tUtilities\tLot Config\tLand Slope\tNeighborhood\tCondition 1\tCondition 2\tBldg Type\tHouse Style\tOverall Qual\tOverall Cond\tYear Built\tYear Remod/Add\tRoof Style\tRoof Matl\tExterior 1st\tExterior 2nd\tMas Vnr Type\tMas Vnr Area\tExter Qual\tExter Cond\tFoundation\tBsmt Qual\tBsmt Cond\tBsmt Exposure\tBsmtFin Type 1\tBsmtFin SF 1\tBsmtFin Type 2\tBsmtFin SF 2\tBsmt Unf SF\tTotal Bsmt SF\tHeating\tHeating QC\tCentral Air\tElectrical\t1st Flr SF\t2nd Flr SF\tLow Qual Fin SF\tGr Liv Area\tBsmt Full Bath\tBsmt Half Bath\tFull Bath\tHalf Bath\tBedroom AbvGr\tKitchen AbvGr\tKitchen Qual\tTotRms AbvGrd\tFunctional\tFireplaces\tFireplace Qu\tGarage Type\tGarage Yr Blt\tGarage Finish\tGarage Cars\tGarage Area\tGarage Qual\tGarage Cond\tPaved Drive\tWood Deck SF\tOpen Porch SF\tEnclosed Porch\t3Ssn Porch\tScreen Porch\tPool Area\tPool QC\tFence\tMisc Feature\tMisc Val\tMo Sold\tYr Sold\tSale

In [16]:
housing.count()

2931

In [17]:
# doing the removal
housingNoHead = housing.subtract(housing_header_rdd)
housingNoHead.count()

2930

In [18]:
housingNoHead.take(1)

['12\t0527165230\t020\tRL\t\t7980\tPave\tNA\tIR1\tLvl\tAllPub\tInside\tGtl\tGilbert\tNorm\tNorm\t1Fam\t1Story\t6\t7\t1992\t2007\tGable\tCompShg\tHdBoard\tHdBoard\tNone\t0\tTA\tGd\tPConc\tGd\tTA\tNo\tALQ\t935\tUnf\t0\t233\t1168\tGasA\tEx\tY\tSBrkr\t1187\t0\t0\t1187\t1\t0\t2\t0\t3\t1\tTA\t6\tTyp\t0\tNA\tAttchd\t1992\tFin\t2\t420\tTA\tTA\tY\t483\t21\t0\t0\t0\t0\tNA\tGdPrv\tShed\t500\t3\t2010\tWD \tNormal\t185000']

In [19]:
# finally we create a nice dataframe using the schema
# bearing in mind that we have to convert the column to float values
housingNoHead2 = housingNoHead.map(lambda l: l.split('\t'))
housingNoHead3 = housingNoHead2.map(lambda l: l[:len(l) - 1] + [float(l[-1])] )
df3 = spark.createDataFrame(housingNoHead3, schema)
#housingNoHead2.take(1)

In [20]:
# having a peak
df3.take(5)

[Row(Order='12', PID='0527165230', MS_SubClass='020', MS_Zoning='RL', Lot_Frontage='', Lot_Area='7980', Street='Pave', Alley='NA', Lot_Shape='IR1', Land_Contour='Lvl', Utilities='AllPub', Lot_Config='Inside', Land_Slope='Gtl', Neighborhood='Gilbert', Condition_1='Norm', Condition_2='Norm', Bldg_Type='1Fam', House_Style='1Story', Overall_Qual='6', Overall_Cond='7', Year_Built='1992', Year_Remod_Add='2007', Roof_Style='Gable', Roof_Matl='CompShg', Exterior_1st='HdBoard', Exterior_2nd='HdBoard', Mas_Vnr_Type='None', Mas_Vnr_Area='0', Exter_Qual='TA', Exter_Cond='Gd', Foundation='PConc', Bsmt_Qual='Gd', Bsmt_Cond='TA', Bsmt_Exposure='No', BsmtFin_Type_1='ALQ', BsmtFin_SF_1='935', BsmtFin_Type_2='Unf', BsmtFin_SF_2='0', Bsmt_Unf_SF='233', Total_Bsmt_SF='1168', Heating='GasA', Heating_QC='Ex', Central_Air='Y', Electrical='SBrkr', 1st_Flr_SF='1187', 2nd_Flr_SF='0', Low_Qual_Fin_SF='0', Gr_Liv_Area='1187', Bsmt_Full_Bath='1', Bsmt_Half_Bath='0', Full_Bath='2', Half_Bath='0', Bedroom_AbvGr='3',

In [21]:
# trying a filter
df3.filter(df3.Order == '1').show()

+-----+----------+-----------+---------+------------+--------+------+-----+---------+------------+---------+----------+----------+------------+-----------+-----------+---------+-----------+------------+------------+----------+--------------+----------+---------+------------+------------+------------+------------+----------+----------+----------+---------+---------+-------------+--------------+------------+--------------+------------+-----------+-------------+-------+----------+-----------+----------+----------+----------+---------------+-----------+--------------+--------------+---------+---------+-------------+-------------+------------+-------------+----------+----------+------------+-----------+-------------+-------------+-----------+-----------+-----------+-----------+-----------+------------+-------------+--------------+----------+------------+---------+-------+-----+------------+--------+-------+-------+---------+--------------+---------+
|Order|       PID|MS_SubClass|MS_Zoning|L

In [22]:
# define as a table for sql querying
df3.createTempView('housing')

In [23]:
sample = spark.sql("select * from housing where order = '1'")
sample.show()

+-----+----------+-----------+---------+------------+--------+------+-----+---------+------------+---------+----------+----------+------------+-----------+-----------+---------+-----------+------------+------------+----------+--------------+----------+---------+------------+------------+------------+------------+----------+----------+----------+---------+---------+-------------+--------------+------------+--------------+------------+-----------+-------------+-------+----------+-----------+----------+----------+----------+---------------+-----------+--------------+--------------+---------+---------+-------------+-------------+------------+-------------+----------+----------+------------+-----------+-------------+-------------+-----------+-----------+-----------+-----------+-----------+------------+-------------+--------------+----------+------------+---------+-------+-----+------------+--------+-------+-------+---------+--------------+---------+
|Order|       PID|MS_SubClass|MS_Zoning|L

In [24]:
# quick method to include all but a few columns in select and group by clauses
c = df3.columns
c = [col.replace(' ','_').replace('/','_') for col in c if col not in ['SalePrice','PID','Order']]
c = ', '.join(c)
strSQL = "select {}, sum(SalePrice) as SalePrice from housing group by {} limit 1".format(c, c)
strSQL

'select MS_SubClass, MS_Zoning, Lot_Frontage, Lot_Area, Street, Alley, Lot_Shape, Land_Contour, Utilities, Lot_Config, Land_Slope, Neighborhood, Condition_1, Condition_2, Bldg_Type, House_Style, Overall_Qual, Overall_Cond, Year_Built, Year_Remod_Add, Roof_Style, Roof_Matl, Exterior_1st, Exterior_2nd, Mas_Vnr_Type, Mas_Vnr_Area, Exter_Qual, Exter_Cond, Foundation, Bsmt_Qual, Bsmt_Cond, Bsmt_Exposure, BsmtFin_Type_1, BsmtFin_SF_1, BsmtFin_Type_2, BsmtFin_SF_2, Bsmt_Unf_SF, Total_Bsmt_SF, Heating, Heating_QC, Central_Air, Electrical, 1st_Flr_SF, 2nd_Flr_SF, Low_Qual_Fin_SF, Gr_Liv_Area, Bsmt_Full_Bath, Bsmt_Half_Bath, Full_Bath, Half_Bath, Bedroom_AbvGr, Kitchen_AbvGr, Kitchen_Qual, TotRms_AbvGrd, Functional, Fireplaces, Fireplace_Qu, Garage_Type, Garage_Yr_Blt, Garage_Finish, Garage_Cars, Garage_Area, Garage_Qual, Garage_Cond, Paved_Drive, Wood_Deck_SF, Open_Porch_SF, Enclosed_Porch, 3Ssn_Porch, Screen_Porch, Pool_Area, Pool_QC, Fence, Misc_Feature, Misc_Val, Mo_Sold, Yr_Sold, Sale_Type,

In [25]:
spark.sql(strSQL).show()

+-----------+---------+------------+--------+------+-----+---------+------------+---------+----------+----------+------------+-----------+-----------+---------+-----------+------------+------------+----------+--------------+----------+---------+------------+------------+------------+------------+----------+----------+----------+---------+---------+-------------+--------------+------------+--------------+------------+-----------+-------------+-------+----------+-----------+----------+----------+----------+---------------+-----------+--------------+--------------+---------+---------+-------------+-------------+------------+-------------+----------+----------+------------+-----------+-------------+-------------+-----------+-----------+-----------+-----------+-----------+------------+-------------+--------------+----------+------------+---------+-------+-----+------------+--------+-------+-------+---------+--------------+---------+
|MS_SubClass|MS_Zoning|Lot_Frontage|Lot_Area|Street|Alley|

In [26]:
sc.stop()