In [1]:
#Testing pyspark installation
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark-3.0.0-bin-hadoop2.7\\spark-3.0.0-bin-hadoop2.7'

In [2]:
#Initiate Spark Context
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


In [3]:
conf = pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
#Example Test Code
numeric_val = sc.parallelize([1,2,3,4])
numeric_val.map(lambda x: x*x*x).collect()

[1, 8, 27, 64]

In [6]:
from pyspark.ml.feature import Word2Vec

In [7]:
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.031109690852463248,-0.020681756734848025,0.009653565101325513]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.01842194888740778,0.03821451057280813,0.06269602593965828]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.005502792075276375,0.001944764330983162,0.008235244452953339]



# Krish exercise

In [45]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Customers').getOrCreate()

In [46]:
spark

In [47]:
from pyspark.ml.regression import LinearRegression

In [48]:
dataset=spark.read.csv("C:\\Users\\2304373.UNIPHOREIND\\Pictures\\pyspark_exercises\\PysparkRegressions-master\\Ecommerce_Customers.csv",inferSchema=True,header=True)

In [49]:
dataset

DataFrame[Email: string, Address: string, Avg Session Length: double, Time on App: double, Time on Website: double, Length of Membership: double, Yearly Amount Spent: double]

In [50]:
dataset.show() # like head()

+--------------------+--------------------+------------------+-----------+---------------+--------------------+-------------------+
|               Email|             Address|Avg Session Length|Time on App|Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+------------------+-----------+---------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|       34.49726773|12.65565115|    39.57766802|         4.082620633|         587.951054|
|   hduke@hotmail.com|4547 Archer Commo...|       31.92627203|11.10946073|    37.26895887|         2.664034182|        392.2049334|
|    pallen@yahoo.com|24645 Valerie Uni...|       33.00091476|11.33027806|    37.11059744|         4.104543202|        487.5475049|
|riverarebecca@gma...|1414 David Throug...|       34.30555663|13.71751367|    36.72128268|         3.120178783|         581.852344|
|mstephens@davidso...|14023 Rodriguez P...|       33.33067252|12.79518855|  

In [51]:
dataset.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [52]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [53]:
featureassembler=VectorAssembler(inputCols=["Avg Session Length","Time on App","Time on Website","Length of Membership"],outputCol="Independent Features")

In [54]:
output=featureassembler.transform(dataset)

In [55]:
output.show()

+--------------------+--------------------+------------------+-----------+---------------+--------------------+-------------------+--------------------+
|               Email|             Address|Avg Session Length|Time on App|Time on Website|Length of Membership|Yearly Amount Spent|Independent Features|
+--------------------+--------------------+------------------+-----------+---------------+--------------------+-------------------+--------------------+
|mstephenson@ferna...|835 Frank TunnelW...|       34.49726773|12.65565115|    39.57766802|         4.082620633|         587.951054|[34.49726773,12.6...|
|   hduke@hotmail.com|4547 Archer Commo...|       31.92627203|11.10946073|    37.26895887|         2.664034182|        392.2049334|[31.92627203,11.1...|
|    pallen@yahoo.com|24645 Valerie Uni...|       33.00091476|11.33027806|    37.11059744|         4.104543202|        487.5475049|[33.00091476,11.3...|
|riverarebecca@gma...|1414 David Throug...|       34.30555663|13.71751367|    36.7

In [56]:
output.select("Independent Features").show()

+--------------------+
|Independent Features|
+--------------------+
|[34.49726773,12.6...|
|[31.92627203,11.1...|
|[33.00091476,11.3...|
|[34.30555663,13.7...|
|[33.33067252,12.7...|
|[33.87103788,12.0...|
|[32.0215955,11.36...|
|[32.73914294,12.3...|
|[33.9877729,13.38...|
|[31.93654862,11.8...|
|[33.99257277,13.3...|
|[33.87936082,11.5...|
|[29.53242897,10.9...|
|[33.19033404,12.9...|
|[32.38797585,13.1...|
|[30.73772037,12.6...|
|[32.1253869,11.73...|
|[32.33889932,12.0...|
|[32.18781205,14.7...|
|[32.61785606,13.9...|
+--------------------+
only showing top 20 rows



In [57]:
finalized_data=output.select("Independent Features","Yearly Amount Spent")

In [58]:
finalized_data.show()

+--------------------+-------------------+
|Independent Features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.49726773,12.6...|         587.951054|
|[31.92627203,11.1...|        392.2049334|
|[33.00091476,11.3...|        487.5475049|
|[34.30555663,13.7...|         581.852344|
|[33.33067252,12.7...|         599.406092|
|[33.87103788,12.0...|        637.1024479|
|[32.0215955,11.36...|        521.5721748|
|[32.73914294,12.3...|        549.9041461|
|[33.9877729,13.38...|         570.200409|
|[31.93654862,11.8...|        427.1993849|
|[33.99257277,13.3...|        492.6060127|
|[33.87936082,11.5...|        522.3374046|
|[29.53242897,10.9...|        408.6403511|
|[33.19033404,12.9...|        573.4158673|
|[32.38797585,13.1...|        470.4527333|
|[30.73772037,12.6...|        461.7807422|
|[32.1253869,11.73...|        457.8476959|
|[32.33889932,12.0...|        407.7045475|
|[32.18781205,14.7...|        452.3156755|
|[32.61785606,13.9...|        605.0610388|
+----------

In [59]:
train_data,test_data=finalized_data.randomSplit([0.75,0.25])

In [60]:
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Yearly Amount Spent')
regressor=regressor.fit(train_data)

In [61]:
regressor.coefficients

DenseVector([26.064, 38.733, 0.3046, 61.1358])

In [62]:
regressor.intercept

-1056.151739932916

In [63]:
pred_results=regressor.evaluate(test_data)

In [64]:
pred_results.predictions.show(40)

+--------------------+-------------------+------------------+
|Independent Features|Yearly Amount Spent|        prediction|
+--------------------+-------------------+------------------+
|[30.97167564,11.7...|        494.6386098|  487.083082865684|
|[31.06132516,12.3...|        487.5554581|  493.100253191707|
|[31.12809005,13.2...|        557.2526867| 563.7121488618864|
|[31.1695068,13.97...|        427.3565308| 417.6667401588868|
|[31.30919264,11.9...|        432.7207178|429.63783382347697|
|[31.44744649,10.1...|        418.6027421|425.46115441307984|
|[31.5171218,10.74...|        275.9184207|280.67240719261827|
|[31.66104982,11.3...|        416.3583536| 417.2269130360196|
|[31.8209982,10.77...|         424.675281|416.92115882747635|
|[31.82934646,11.2...|         385.152338|384.32590664061104|
|[31.87455169,10.2...|        392.2852442| 397.8827530299618|
|[31.8854063,11.28...|         390.103273|399.18198762115594|
|[31.90962683,11.3...|        563.4460357| 551.0163389166153|
|[31.926

# Accessing example files in spark

In [37]:
#Testing pyspark installation
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()


'C:\\spark-3.0.0-bin-hadoop2.7\\spark-3.0.0-bin-hadoop2.7'

In [38]:
from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("C:\\spark-3.0.0-bin-hadoop2.7\\spark-3.0.0-bin-hadoop2.7\\data\\mllib\\sample_lda_libsvm_data.txt")

In [39]:
dataset

DataFrame[label: double, features: vector]

In [40]:
# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)

The lower bound on the log likelihood of the entire corpus: -820.7717059783324
The upper bound on perplexity: 3.156814253762817
The topics described by their top-weighted terms:
+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+---------------------------------------------------------------+
|0    |[0, 3, 5]  |[0.0988907776639746, 0.09840811080140031, 0.09815784171623272] |
|1    |[1, 4, 9]  |[0.17372581481263155, 0.1541451633717008, 0.14809304265019235] |
|2    |[1, 2, 0]  |[0.10763518719710427, 0.09290001778648123, 0.09285769471788125]|
|3    |[1, 7, 8]  |[0.10278233443057151, 0.10257897851841583, 0.09964766524551634]|
|4    |[6, 7, 1]  |[0.10991496236314273, 0.10762017498569953, 0.0934346815074767] |
|5    |[8, 2, 6]  |[0.10265802360235843, 0.10194550822222663, 0.09453499013679116]|
|6    |[3, 7, 8]  |[0.1061840707195339, 0.10315547141829495, 0.098

# AI university (Linear regression)

In [5]:
data = spark.read.csv('C:\\Users\\2304373.UNIPHOREIND\\Pictures\\pyspark_exercises\\Data_Science_Bootcamp-master\\Student_Grades_Data.csv', header=True, inferSchema=True)

In [6]:
data.printSchema()

root
 |-- Time_to_Study: integer (nullable = true)
 |-- Grades: double (nullable = true)



In [7]:
#Display first few rows of data
data.show()

+-------------+------+
|Time_to_Study|Grades|
+-------------+------+
|            1|   1.5|
|            5|   2.7|
|            7|   3.1|
|            3|   2.1|
|            2|   1.8|
|            9|   3.9|
|            6|   2.9|
|           12|   4.5|
|           11|   4.3|
|            2|   1.8|
|            4|   2.4|
|            8|   3.5|
|           13|   4.8|
|            9|   3.9|
|           14|   5.0|
|           10|   4.1|
|            6|   2.9|
|           12|   4.5|
|            1|   1.5|
|            4|   2.4|
+-------------+------+
only showing top 20 rows



In [8]:
#Create a Feature array by omitting the last column
feature_cols = data.columns[:-1] 
from pyspark.ml.feature import VectorAssembler
vect_assembler = VectorAssembler(inputCols=feature_cols,outputCol="features")


In [9]:
#Utilize Assembler created above in order to add the feature column
data_w_features = vect_assembler.transform(data)

In [10]:

#Display the data having additional column named features. Had it been multiple linear regression problem, you could see all the
# independent variable values combined in one list
data_w_features.show()

+-------------+------+--------+
|Time_to_Study|Grades|features|
+-------------+------+--------+
|            1|   1.5|   [1.0]|
|            5|   2.7|   [5.0]|
|            7|   3.1|   [7.0]|
|            3|   2.1|   [3.0]|
|            2|   1.8|   [2.0]|
|            9|   3.9|   [9.0]|
|            6|   2.9|   [6.0]|
|           12|   4.5|  [12.0]|
|           11|   4.3|  [11.0]|
|            2|   1.8|   [2.0]|
|            4|   2.4|   [4.0]|
|            8|   3.5|   [8.0]|
|           13|   4.8|  [13.0]|
|            9|   3.9|   [9.0]|
|           14|   5.0|  [14.0]|
|           10|   4.1|  [10.0]|
|            6|   2.9|   [6.0]|
|           12|   4.5|  [12.0]|
|            1|   1.5|   [1.0]|
|            4|   2.4|   [4.0]|
+-------------+------+--------+
only showing top 20 rows



In [11]:
#Select only Features and Label from previous dataset as we need these two entities for building machine learning model
finalized_data = data_w_features.select("features","Grades")

finalized_data.show()

+--------+------+
|features|Grades|
+--------+------+
|   [1.0]|   1.5|
|   [5.0]|   2.7|
|   [7.0]|   3.1|
|   [3.0]|   2.1|
|   [2.0]|   1.8|
|   [9.0]|   3.9|
|   [6.0]|   2.9|
|  [12.0]|   4.5|
|  [11.0]|   4.3|
|   [2.0]|   1.8|
|   [4.0]|   2.4|
|   [8.0]|   3.5|
|  [13.0]|   4.8|
|   [9.0]|   3.9|
|  [14.0]|   5.0|
|  [10.0]|   4.1|
|   [6.0]|   2.9|
|  [12.0]|   4.5|
|   [1.0]|   1.5|
|   [4.0]|   2.4|
+--------+------+
only showing top 20 rows



In [12]:
#Split the data into training and test model with 70% obs. going in training and 30% in testing
train_dataset, test_dataset = finalized_data.randomSplit([0.7, 0.3])

In [13]:
#Peek into training data
train_dataset.describe().show()

+-------+------------------+
|summary|            Grades|
+-------+------------------+
|  count|                38|
|   mean|3.1657894736842107|
| stddev|1.0710632039768841|
|    min|               1.5|
|    max|               5.0|
+-------+------------------+



In [14]:
#Peek into test_dataset
test_dataset.describe().show()

+-------+------------------+
|summary|            Grades|
+-------+------------------+
|  count|                12|
|   mean|               3.4|
| stddev|1.2380336315008864|
|    min|               1.5|
|    max|               5.0|
+-------+------------------+



In [15]:
#Import Linear Regression class called LinearRegression
from pyspark.ml.regression import LinearRegression

In [16]:
#Create the Linear Regression object named having feature column as features and Label column as Time_to_Study
LinReg = LinearRegression(featuresCol="features", labelCol="Grades")

In [17]:
#Train the model on the training using fit() method.
model = LinReg.fit(train_dataset)

In [18]:
#Predict the Grades using the evulate method
pred = model.evaluate(test_dataset)

In [19]:
#Show the predicted Grade values along side actual Grade values
pred.predictions.show()

+--------+------+------------------+
|features|Grades|        prediction|
+--------+------+------------------+
|   [1.0]|   1.5|1.5572708476912447|
|   [1.0]|   1.5|1.5572708476912447|
|   [3.0]|   2.1|2.1005926946933133|
|   [5.0]|   2.7|2.6439145416953815|
|   [6.0]|   2.9| 2.915575465196416|
|   [8.0]|   3.5|3.4588973121984843|
|  [10.0]|   4.1|4.0022191592005525|
|  [10.0]|   4.1|4.0022191592005525|
|  [10.0]|   4.1|4.0022191592005525|
|  [12.0]|   4.5| 4.545541006202621|
|  [13.0]|   4.8| 4.817201929703655|
|  [14.0]|   5.0|  5.08886285320469|
+--------+------+------------------+



In [20]:
#Find out coefficient value
coefficient = model.coefficients
print ("The coefficient of the model is : %a" %coefficient)

The coefficient of the model is : DenseVector([0.2717])


In [21]:
#Find out intercept Value
intercept = model.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : 1.285610


In [22]:
#Evaluate the model using metric like Mean Absolute Error(MAE), Root Mean Square Error(RMSE) and R-Square
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="Grades", predictionCol="prediction")

# Root Mean Square Error
rmse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "rmse"})
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = evaluation.evaluate(pred.predictions, {evaluation.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = evaluation.evaluate(pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 0.065
MSE: 0.004
MAE: 0.056
r2: 0.997


In [23]:
#Create Unlabeled dataset  to contain only feature column
unlabeled_dataset = test_dataset.select('features')

In [24]:
#Display the content of unlabeled_dataset
unlabeled_dataset.show()

+--------+
|features|
+--------+
|   [1.0]|
|   [1.0]|
|   [3.0]|
|   [5.0]|
|   [6.0]|
|   [8.0]|
|  [10.0]|
|  [10.0]|
|  [10.0]|
|  [12.0]|
|  [13.0]|
|  [14.0]|
+--------+



In [25]:
#Predict the model output for fresh & unseen test data using transform() method
new_predictions = model.transform(unlabeled_dataset)

In [26]:
#Display the new prediction values
new_predictions.show()

+--------+------------------+
|features|        prediction|
+--------+------------------+
|   [1.0]|1.5572708476912447|
|   [1.0]|1.5572708476912447|
|   [3.0]|2.1005926946933133|
|   [5.0]|2.6439145416953815|
|   [6.0]| 2.915575465196416|
|   [8.0]|3.4588973121984843|
|  [10.0]|4.0022191592005525|
|  [10.0]|4.0022191592005525|
|  [10.0]|4.0022191592005525|
|  [12.0]| 4.545541006202621|
|  [13.0]| 4.817201929703655|
|  [14.0]|  5.08886285320469|
+--------+------------------+



# multiple linear regression

In [135]:
data = spark.read.csv('C:\\Users\\2304373.UNIPHOREIND\\Pictures\\pyspark_exercises\\Data_Science_Bootcamp-master\\Regression_Algorithms\\Multiple_Linear_Regression\\Restaurant_Profit_Data.csv', header=True, inferSchema=True)

In [136]:
data.printSchema()

root
 |-- Miscellaneous_Expenses: double (nullable = true)
 |-- Food_Innovation_Spend: double (nullable = true)
 |-- Advertising: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Profit: double (nullable = true)



In [137]:
data.show()

+----------------------+---------------------+-----------+-------+---------+
|Miscellaneous_Expenses|Food_Innovation_Spend|Advertising|   City|   Profit|
+----------------------+---------------------+-----------+-------+---------+
|              138671.8|             167497.2|   475918.1|Chicago|202443.83|
|             153151.59|             164745.7|  448032.53| Mumbai|201974.06|
|             102919.55|            155589.51|  412068.54|  Tokyo|201232.39|
|             120445.85|            146520.41|  387333.62|Chicago|193083.99|
|              93165.77|            144255.34|  370302.42|  Tokyo|176369.94|
|             101588.71|             134024.9|  366995.36|Chicago|167173.12|
|             148972.87|            136763.46|  131850.82| Mumbai|166304.51|
|             147304.06|            132446.13|  328010.68|  Tokyo| 165934.6|
|             150492.95|            122690.52|  315747.29|Chicago|162393.77|
|             110453.17|            125482.88|  309115.62| Mumbai|159941.96|

In [138]:
#Display data types of the data columns.
data.dtypes

[('Miscellaneous_Expenses', 'double'),
 ('Food_Innovation_Spend', 'double'),
 ('Advertising', 'double'),
 ('City', 'string'),
 ('Profit', 'double')]

In [139]:
categorical_cols = [item[0] for item in data.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

['City']
['Miscellaneous_Expenses', 'Food_Innovation_Spend', 'Advertising']


In [140]:
print(str(len(categorical_cols)) + '  categorical features')
print(str(len(numerical_cols)) + '  numerical features')

1  categorical features
3  numerical features


In [141]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# replaced OneHotEncoderEstimator due to the update

In [142]:
from pyspark.ml.feature import OneHotEncoder # replaced OneHotEncoderEstimator

In [143]:
#from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
stages = []
for categoricalCol in categorical_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    OHencoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_catVec"])
stages += [stringIndexer, OHencoder]
assemblerInputs = [c + "_catVec" for c in categorical_cols] + numerical_cols
Vectassembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [Vectassembler]

# Do not run below code another time you will get error. if you get the error by running mistakely, execute from first importing data (Restaurant_Profit_Data.csv), then the error will be resolved

In [144]:
from pyspark.ml import Pipeline
import pandas as pd
cols = data.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)
selectedCols = ['features']+cols
data = data.select(selectedCols)
pd.DataFrame(data.take(5), columns=data.columns)

Unnamed: 0,features,Miscellaneous_Expenses,Food_Innovation_Spend,Advertising,City,Profit
0,"[1.0, 0.0, 138671.8, 167497.2, 475918.1]",138671.8,167497.2,475918.1,Chicago,202443.83
1,"[0.0, 1.0, 153151.59, 164745.7, 448032.53]",153151.59,164745.7,448032.53,Mumbai,201974.06
2,"[0.0, 0.0, 102919.55, 155589.51, 412068.54]",102919.55,155589.51,412068.54,Tokyo,201232.39
3,"[1.0, 0.0, 120445.85, 146520.41, 387333.62]",120445.85,146520.41,387333.62,Chicago,193083.99
4,"[0.0, 0.0, 93165.77, 144255.34, 370302.42]",93165.77,144255.34,370302.42,Tokyo,176369.94


In [27]:
from pyspark.sql import SparkSession

In [29]:
import pyspark.sql.functions as F