# Udemy Spark and Python for Big Data with PySpark

# Linear Regression

Finding the necessary amount of crew needed for a ship class.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('sparkproj').getOrCreate()

In [8]:
df = spark.read.csv('cruise_ship_info.csv', inferSchema=True , header=True )

In [9]:
df.head(1)

[Row(Ship_name='Journey', Cruise_line='Azamara', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55)]

Need to convert Cruise_line to number using strinindexer

In [10]:
from pyspark.ml.feature import StringIndexer

In [12]:
indexer = StringIndexer(inputCol='Cruise_line' , outputCol='Cruise_line_index' )

In [14]:
df_index = indexer.fit(df).transform(df)

In [17]:
df_index = df_index.drop('Cruise_line')

In [18]:
df_index.head(1)

[Row(Ship_name='Journey', Age=6, Tonnage=30.276999999999997, passengers=6.94, length=5.94, cabins=3.55, passenger_density=42.64, crew=3.55, Cruise_line_index=16.0)]

In [19]:
df_index.describe()

DataFrame[summary: string, Ship_name: string, Age: string, Tonnage: string, passengers: string, length: string, cabins: string, passenger_density: string, crew: string, Cruise_line_index: string]

In [22]:
df_index.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)
 |-- Cruise_line_index: double (nullable = false)



In [10]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [37]:
df_index.columns

['Ship_name',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density',
 'crew',
 'Cruise_line_index']

Dropping Ship_name which can't help the model and Getting the features into a vector and crew to the label column.

In [40]:
assembler = VectorAssembler(inputCols=['Age','Tonnage','passengers','length','cabins',
                            'passenger_density','Cruise_line_index'] , outputCol = 'features')

In [43]:
output = assembler.transform(df_index)

In [50]:
x_y_data = output.select('features','crew').collect()

In [None]:
data_train , data_test = x_y_data.randomSplit([0.7,0.3])

In [69]:
data_train.describe().show()

+-------+------------------+
|summary|              crew|
+-------+------------------+
|  count|               103|
|   mean|7.8573786407767106|
| stddev|3.4941003389649508|
|    min|              0.59|
|    max|              19.1|
+-------+------------------+



In [60]:
model = LinearRegression(labelCol='crew')

In [61]:
trained_model = model.fit(data_train)

In [65]:
results = trained_model.evaluate(data_test)

In [67]:
results.rootMeanSquaredError , results.r2 , results.meanAbsoluteError

(0.6802418377966485, 0.9626069664570869, 0.5629717340191953)

Great model result, r2 is 96% which mean 96% of data variance is explained , tmse is 0.68 out of mean of 7.85 which means at most im wrong at 1-2 crew members 

In [77]:
df_index.corr('crew','passengers') , df_index.corr('crew','Tonnage') , df_index.corr('crew','cabins') 

(0.9152341306065384, 0.927568811544939, 0.9508226063578497)

In [78]:
df_index.corr('crew','passenger_density') , df_index.corr('crew','length')

(-0.15550928421699717, 0.8958566271016579)

Since the 'Crew' is so highly correlated to other features it makes sense my model achieved R^2 = 96%

# Logistic Regression

Finding customers with risk of churn

In [144]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [79]:
df = spark.read.csv('customer_churn.csv' , inferSchema=True , header= True)

In [80]:
df.head(1)

[Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1)]

In [108]:
df = df.drop('Names' , 'Location' , 'Company' , 'Onboard_date')

In [109]:
df.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Churn: integer (nullable = true)



In [110]:
df.columns

['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Num_Sites', 'Churn']

In [111]:
assembler = VectorAssembler(inputCols=['Age','Total_Purchase','Account_Manager','Years','Num_Sites'] , outputCol = 'features')

In [112]:
output = assembler.transform(df)

In [121]:
model_data = output.select('features','Churn')

In [122]:
train_data , test_data = model_data.randomSplit([0.7,0.3])

In [125]:
log_reg = LogisticRegression(labelCol='Churn')

In [126]:
fitted_model = log_reg.fit(train_data)

In [166]:
fitted_model.coefficients

DenseVector([0.0649, 0.0001, 0.5717, 0.5154, 1.2228])

In [130]:
test_results = fitted_model.evaluate(test_data)

In [175]:
test_results_pred = test_results.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[28.0,9090.43,1.0...|    0|[1.64552115331272...|[0.83828480232749...|       0.0|
|[29.0,5900.78,1.0...|    0|[4.28871053346174...|[0.98646315212829...|       0.0|
|[29.0,8688.17,1.0...|    1|[2.84540982580951...|[0.94508092526674...|       0.0|
|[29.0,9617.59,0.0...|    0|[4.69866540800888...|[0.99097477287805...|       0.0|
|[30.0,8874.83,0.0...|    0|[3.41438437328706...|[0.96815107063565...|       0.0|
|[30.0,13473.35,0....|    0|[2.83325050355735...|[0.94444639413915...|       0.0|
|[31.0,5387.75,0.0...|    0|[2.88053749685883...|[0.94687590722290...|       0.0|
|[31.0,11743.24,0....|    0|[6.99918291998165...|[0.99908820477869...|       0.0|
|[32.0,9885.12,1.0...|    1|[1.95816667703517...|[0.87633440784036...|       0.0|
|[32.0,10716.75,

In [176]:
test_results_pred = test_results.predictions

In [224]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')

In [226]:
auc = evaluator.evaluate(test_results_pred)

# Tree methods - GBT , RTC & DTC

Finding the chemical causing dog food to spoil

In [248]:
spark2 = SparkSession.builder.appName('tree').getOrCreate()

In [3]:
df = spark2.read.csv('dog_food.csv' , inferSchema=True , header= True)

In [4]:
df.head(5)

[Row(A=4, B=2, C=12.0, D=3, Spoiled=1.0),
 Row(A=5, B=6, C=12.0, D=7, Spoiled=1.0),
 Row(A=6, B=2, C=13.0, D=6, Spoiled=1.0),
 Row(A=4, B=2, C=12.0, D=1, Spoiled=1.0),
 Row(A=4, B=2, C=12.0, D=3, Spoiled=1.0)]

In [5]:
df.printSchema()

root
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: double (nullable = true)
 |-- D: integer (nullable = true)
 |-- Spoiled: double (nullable = true)



In [6]:
df.columns

['A', 'B', 'C', 'D', 'Spoiled']

In [7]:
from pyspark.ml.classification import GBTClassifier , DecisionTreeClassifier , RandomForestClassifier

In [11]:
assembler = VectorAssembler(inputCols= ['A', 'B', 'C', 'D'] , outputCol= 'features')

In [12]:
output = assembler.transform(df)

In [13]:
final_data = output.select('features' , 'Spoiled')

In [14]:
final_data.show()

+-------------------+-------+
|           features|Spoiled|
+-------------------+-------+
| [4.0,2.0,12.0,3.0]|    1.0|
| [5.0,6.0,12.0,7.0]|    1.0|
| [6.0,2.0,13.0,6.0]|    1.0|
| [4.0,2.0,12.0,1.0]|    1.0|
| [4.0,2.0,12.0,3.0]|    1.0|
|[10.0,3.0,13.0,9.0]|    1.0|
| [8.0,5.0,14.0,5.0]|    1.0|
| [5.0,8.0,12.0,8.0]|    1.0|
| [6.0,5.0,12.0,9.0]|    1.0|
| [3.0,3.0,12.0,1.0]|    1.0|
| [9.0,8.0,11.0,3.0]|    1.0|
|[1.0,10.0,12.0,3.0]|    1.0|
|[1.0,5.0,13.0,10.0]|    1.0|
|[2.0,10.0,12.0,6.0]|    1.0|
|[1.0,10.0,11.0,4.0]|    1.0|
| [5.0,3.0,12.0,2.0]|    1.0|
| [4.0,9.0,11.0,8.0]|    1.0|
| [5.0,1.0,11.0,1.0]|    1.0|
|[4.0,9.0,12.0,10.0]|    1.0|
| [5.0,8.0,10.0,9.0]|    1.0|
+-------------------+-------+
only showing top 20 rows



In [18]:
train_data , test_data = final_data.randomSplit([0.7,0.3])

In [19]:
gbt_model = GBTClassifier(labelCol='Spoiled')
dtc_model = DecisionTreeClassifier(labelCol='Spoiled')
rtc_model = RandomForestClassifier(labelCol='Spoiled')

In [20]:
fitted_gbt = gbt_model.fit(train_data)
fitted_dtc = dtc_model.fit(train_data)
fitted_rtc = rtc_model.fit(train_data)

In [21]:
gbt_pred = fitted_gbt.transform(test_data)
dtc_pred = fitted_dtc.transform(test_data)
rtc_pred = fitted_rtc.transform(test_data)

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:
eval = BinaryClassificationEvaluator(labelCol='Spoiled')

In [25]:
print(eval.evaluate(gbt_pred))
print(eval.evaluate(dtc_pred))
print(eval.evaluate(rtc_pred))

0.9991452991452991
0.9777777777777779
1.0


In [36]:
fitted_rtc.featureImportances

SparseVector(4, {0: 0.0359, 1: 0.0349, 2: 0.887, 3: 0.0422})

Since Chemical number 2 (or C) is explaining most of the variance in the model it is the most likely one to cause the spoil

# KMeans

Finding number of hackers (2-3) attacking a system , there was a similar nubmer of attackes from each hacker

In [28]:
from pyspark.ml.clustering import KMeans

In [31]:
df2 = spark.read.csv('hack_data.csv' , inferSchema=True , header=True)

In [32]:
df2.head()

Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)

In [33]:
df2.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



In [34]:
df2.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [39]:
assembler2 = VectorAssembler(inputCols=['Session_Connection_Time','Bytes Transferred',
                                        'Kali_Trace_Used','Servers_Corrupted','Pages_Corrupted',
                                        'WPM_Typing_Speed'], outputCol = 'features')

File says locations are probably useless due to possilbility to change iP location

In [40]:
all_data = assembler2.transform(df2)

In [42]:
all_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)



In [43]:
vec_data = all_data.select('features')

In [46]:
from pyspark.ml.feature import StandardScaler

In [50]:
scaler = StandardScaler(inputCol='features', outputCol='scaledfeatures')

In [51]:
scale_model = scaler.fit(vec_data)

In [52]:
scaled_data = scale_model.transform(vec_data)

In [54]:
km2 = KMeans(k=2 , featuresCol='scaledfeatures')
km3 = KMeans(k=3 , featuresCol='scaledfeatures')

In [55]:
fitted_km2 = km2.fit(scaled_data)
fitted_km3 = km3.fit(scaled_data)

In [56]:
km2_pred = fitted_km2.transform(scaled_data)
km3_pred = fitted_km3.transform(scaled_data)

In [64]:
km2_pred.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+



In [65]:
km3_pred.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   83|
|         2|  167|
|         0|   84|
+----------+-----+



By clustering 2 we can see there is a similar number of attacks - so there were 2 attackers.

# Recommender system

Building a movie recommendation system

In [68]:
from pyspark.ml.recommendation import ALS

In [71]:
from pyspark.ml.evaluation import RegressionEvaluator

In [69]:
df3 = spark.read.csv('movielens_ratings.csv' , inferSchema=True , header= True)

In [70]:
df3.head(1 )

[Row(movieId=2, rating=3.0, userId=0)]

In [76]:
train , test = df3.randomSplit([0.8,0.2])

In [80]:
als = ALS(maxIter=5,regParam=0.01 , userCol='userId', itemCol='movieId', ratingCol='rating')

In [None]:
model = als.fit(train)