In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("lin_reg").getOrCreate()

In [3]:
df = spark.read.csv("logisticregressiondata/Log_Reg_dataset.csv",inferSchema=True,header=True)

In [4]:
print((df.count(),len(df.columns)))

(20000, 6)


### EDA

In [6]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Repeat_Visitor: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: integer (nullable = true)
 |-- Status: integer (nullable = true)



In [7]:
df.describe().show(4,False)

+-------+-------+-----------------+-----------------+--------+-----------------+------------------+
|summary|Country|Age              |Repeat_Visitor   |Platform|Web_pages_viewed |Status            |
+-------+-------+-----------------+-----------------+--------+-----------------+------------------+
|count  |20000  |20000            |20000            |20000   |20000            |20000             |
|mean   |null   |28.53955         |0.5029           |null    |9.5533           |0.5               |
|stddev |null   |7.888912950773227|0.500004090187782|null    |6.073903499824976|0.5000125004687693|
|min    |Brazil |17               |0                |Bing    |1                |0                 |
+-------+-------+-----------------+-----------------+--------+-----------------+------------------+
only showing top 4 rows



In [13]:
df.groupBy('Platform').count().show()

+--------+-----+
|Platform|count|
+--------+-----+
|   Yahoo| 9859|
|    Bing| 4360|
|  Google| 5781|
+--------+-----+



### Feature Engineering

In [14]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [15]:
df.columns

['Country', 'Age', 'Repeat_Visitor', 'Platform', 'Web_pages_viewed', 'Status']

In [16]:
search_engine_indexer = StringIndexer(inputCol="Platform",outputCol="Platform_num").fit(df)

In [17]:
df=search_engine_indexer.transform(df)

In [30]:
df.show(3)

+-------+---+--------------+--------+----------------+------+------------+
|Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_num|
+-------+---+--------------+--------+----------------+------+------------+
|  India| 41|             1|   Yahoo|              21|     1|         0.0|
| Brazil| 28|             1|   Yahoo|               5|     0|         0.0|
| Brazil| 40|             0|  Google|               3|     0|         1.0|
+-------+---+--------------+--------+----------------+------+------------+
only showing top 3 rows



In [20]:
df.groupBy('Platform_num').count().orderBy('count',ascending=True).show()

+------------+-----+
|Platform_num|count|
+------------+-----+
|         2.0| 4360|
|         1.0| 5781|
|         0.0| 9859|
+------------+-----+



In [21]:
from pyspark.ml.feature import OneHotEncoder

In [31]:
encoder = OneHotEncoder(inputCol="Platform_num",outputCol="P_vector")
encoder.setDropLast(False)
ohe = encoder.fit(df) # indexer is the existing dataframe, see the question
df = ohe.transform(df)

In [33]:
df.show(3)

+-------+---+--------------+--------+----------------+------+------------+-------------+
|Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_num|     P_vector|
+-------+---+--------------+--------+----------------+------+------------+-------------+
|  India| 41|             1|   Yahoo|              21|     1|         0.0|(3,[0],[1.0])|
| Brazil| 28|             1|   Yahoo|               5|     0|         0.0|(3,[0],[1.0])|
| Brazil| 40|             0|  Google|               3|     0|         1.0|(3,[1],[1.0])|
+-------+---+--------------+--------+----------------+------+------------+-------------+
only showing top 3 rows



In [35]:
df.groupBy('P_vector').count().orderBy('count',ascending=False).show(5)

+-------------+-----+
|     P_vector|count|
+-------------+-----+
|(3,[0],[1.0])| 9859|
|(3,[1],[1.0])| 5781|
|(3,[2],[1.0])| 4360|
+-------------+-----+



In [38]:
country_indexer = StringIndexer(inputCol='Country',outputCol="Country_Num").fit(df)

In [39]:
df = country_indexer.transform(df)

In [41]:
df.groupBy('Country').count().orderBy('count',ascending=False).show()

+---------+-----+
|  Country|count|
+---------+-----+
|Indonesia|12178|
|    India| 4018|
|   Brazil| 2586|
| Malaysia| 1218|
+---------+-----+



In [43]:
df.groupBy('Country_Num').count().orderBy('count',ascending=False).show()

+-----------+-----+
|Country_Num|count|
+-----------+-----+
|        0.0|12178|
|        1.0| 4018|
|        2.0| 2586|
|        3.0| 1218|
+-----------+-----+



In [44]:
country_encoder = OneHotEncoder(inputCol="Country_Num",outputCol="Country_Num_vector")
country_encoder.setDropLast(False)
ohe = country_encoder.fit(df) # indexer is the existing dataframe, see the question
df = ohe.transform(df)

In [46]:
df.show(1)

+-------+---+--------------+--------+----------------+------+------------+-------------+-----------+------------------+
|Country|Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_num|     P_vector|Country_Num|Country_Num_vector|
+-------+---+--------------+--------+----------------+------+------------+-------------+-----------+------------------+
|  India| 41|             1|   Yahoo|              21|     1|         0.0|(3,[0],[1.0])|        1.0|     (4,[1],[1.0])|
+-------+---+--------------+--------+----------------+------+------------+-------------+-----------+------------------+
only showing top 1 row



In [50]:
df.groupBy('Country_Num_vector').count().orderBy('count',ascending=False).show()

+------------------+-----+
|Country_Num_vector|count|
+------------------+-----+
|     (4,[0],[1.0])|12178|
|     (4,[1],[1.0])| 4018|
|     (4,[2],[1.0])| 2586|
|     (4,[3],[1.0])| 1218|
+------------------+-----+



In [52]:
from pyspark.ml.feature import VectorAssembler


In [55]:
df.columns

['Country',
 'Age',
 'Repeat_Visitor',
 'Platform',
 'Web_pages_viewed',
 'Status',
 'Platform_num',
 'P_vector',
 'Country_Num',
 'Country_Num_vector']

In [59]:
df_assembler = VectorAssembler(inputCols=['P_vector','Country_Num_vector', 'Repeat_Visitor',
 'Age',
 'Web_pages_viewed'],outputCol = "features")

In [60]:
df = df_assembler.transform(df)

In [63]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Repeat_Visitor: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: integer (nullable = true)
 |-- Status: integer (nullable = true)
 |-- Platform_num: double (nullable = false)
 |-- P_vector: vector (nullable = true)
 |-- Country_Num: double (nullable = false)
 |-- Country_Num_vector: vector (nullable = true)
 |-- features: vector (nullable = true)



In [64]:
df.select('features','status').show()

+--------------------+------+
|            features|status|
+--------------------+------+
|(10,[0,4,7,8,9],[...|     1|
|(10,[0,5,7,8,9],[...|     0|
|(10,[1,5,8,9],[1....|     0|
|(10,[2,3,7,8,9],[...|     1|
|(10,[1,6,8,9],[1....|     1|
|(10,[1,5,8,9],[1....|     0|
|(10,[1,5,8,9],[1....|     0|
|(10,[1,3,8,9],[1....|     0|
|(10,[0,3,8,9],[1....|     0|
|(10,[2,3,7,8,9],[...|     1|
|(10,[1,6,7,8,9],[...|     1|
|(10,[0,3,7,8,9],[...|     1|
|(10,[0,3,7,8,9],[...|     1|
|(10,[2,3,8,9],[1....|     0|
|(10,[0,4,7,8,9],[...|     1|
|(10,[2,3,8,9],[1....|     0|
|(10,[0,3,7,8,9],[...|     1|
|(10,[0,3,7,8,9],[...|     1|
|(10,[0,6,8,9],[1....|     0|
|(10,[1,3,8,9],[1....|     0|
+--------------------+------+
only showing top 20 rows



### Splitting the Dataset

In [65]:
model_df = df.select(['features','Status'])

In [68]:
train_df,test_df =model_df.randomSplit([0.75,0.25])

In [70]:
train_df.count()

14994

In [72]:
train_df.groupBy('Status').count().show()

+------+-----+
|Status|count|
+------+-----+
|     1| 7491|
|     0| 7503|
+------+-----+



### Build and Train Logistic Regression Model

In [74]:
from pyspark.ml.classification import LogisticRegression

In [75]:
log_reg=LogisticRegression(labelCol='Status').fit(train_df)

In [76]:
train_result = log_reg.evaluate(train_df).predictions

In [78]:
train_result.filter(train_result["Status"]==1).filter(train_result['prediction']==1).select(['Status','prediction','probability']).show(10)

+------+----------+--------------------+
|Status|prediction|         probability|
+------+----------+--------------------+
|     1|       1.0|[0.42845950545094...|
|     1|       1.0|[0.26065765891857...|
|     1|       1.0|[0.26065765891857...|
|     1|       1.0|[0.26065765891857...|
|     1|       1.0|[0.14222064321610...|
|     1|       1.0|[0.14222064321610...|
|     1|       1.0|[0.07233374381514...|
|     1|       1.0|[0.07233374381514...|
|     1|       1.0|[0.07233374381514...|
|     1|       1.0|[0.07233374381514...|
+------+----------+--------------------+
only showing top 10 rows



### Evaluate Linear Regression Model on Test Data

In [79]:
Result = log_reg.evaluate(test_df).predictions

In [80]:
Result.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Status: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [81]:
Result.select(['Status','prediction']).show()

+------+----------+
|Status|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     1|       0.0|
|     1|       0.0|
|     0|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
+------+----------+
only showing top 20 rows



In [82]:
tp = Result[(Result.Status == 1) & (Result.prediction 
== 1)].count()

In [84]:
tn = Result[(Result.Status == 0) & (Result.prediction 
== 0)].count()

In [85]:
fp = Result[(Result.Status == 0) & (Result.prediction 
== 1)].count()

In [88]:
fn = Result[(Result.Status == 1) & (Result.prediction 
== 0)].count()

In [90]:
accuracy=float((tp+tn) /(Result.
count()))

In [91]:
print(accuracy)

0.9388733519776269


In [92]:
recall = float(tp)/(tp +fn)
print(recall)

0.9382223993622958


In [93]:
precision = float(tp) / (tp+ fp)
print(precision)

0.9397205588822355
