<a href="https://colab.research.google.com/github/lichen79/COVID-19/blob/master/full_Solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ⭐ 1- Intro to Data Cleaning and Preparation ⭐
In this chapter, you will: 

•	Exercise 1: Load data into a Spark DataFrame (DF) 

•	Exercise 2: Query the DF using SQL to get a feel for the data 

•	Exercise 3: Filter and transform the Data 



In [118]:
!pip install pyspark
from pyspark.sql import SparkSession 




In [119]:
!git clone https://github.com/lichen79/SparkML.git

Cloning into 'SparkML'...
remote: Enumerating objects: 27, done.[K
remote: Counting objects: 100% (27/27), done.[K
remote: Compressing objects: 100% (19/19), done.[K
remote: Total 27 (delta 9), reused 24 (delta 8), pack-reused 0[K
Unpacking objects: 100% (27/27), done.


In [120]:
%cd /content/final_train_data
%ls

/content/final_train_data
part-00000-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00001-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00002-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00003-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00004-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00005-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00006-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00007-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00008-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00009-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00010-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00011-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00012-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00013-237f8144-372f-450c-8fa4-c75baa8670c2-c000.snappy.parquet
part-00014-237f8144-37

In [121]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Intro") \
    .getOrCreate()

## Exercise 1: Load the data:

In [122]:
df = spark.read.csv ('SparkML/detecting-twitter-bot-data/training_bot_data.csv', header= True) 

### What is the size of the data? use count() function 

In [123]:
# understand what is the data size:
df.count()

2840

#### Immutability

DataFrame in Spark is **immutable**.

What does that mean?
It means that every action we do on DataFrame doesn't change the actual DataFrame!

Instead, it creates a new DataFrame.
Run the next commands and get a feel for working with DataFrame.

Don't worry if you don't understand everything completely, the next exercises go deeper into it.

In [124]:
df.limit(2) .toPandas ()

Unnamed: 0,id,id_str,screen_name,location,description,url,followers_count,friends_count,listed_count,created_at,favourites_count,verified,statuses_count,lang,status,default_profile,default_profile_image,has_extended_profile,name,bot
0,8.16e+17,"""""""815745789754417152""""""","""""""HoustonPokeMap""""""","""""""Houston","TX""""""","""""""Rare and strong PokŽmon in Houston",TX. See more PokŽmon at https://t.co/dnWuDbFR...,"""""""https://t.co/dnWuDbFRkt""""""",1291,0,10,"""""""Mon Jan 02 02:25:26 +0000 2017""""""",0,FALSE,78554.0,"""""""en""""""","""{ """"created_at"""": """"Sun Mar 12 15:44:04 ...","""""id"""": 840951532543737900","""""id_str"""": """"840951532543737856""""","""""text"""": """"[Southeast Houston] Chansey ..."
1,4843621225.0,4843621225,kernyeahx,"Templeville town, MD, USA",From late 2014 Socium Marketplace will make sh...,,1,349,0,2/1/2016 7:37,38,FALSE,31,en,,TRUE,FALSE,FALSE,Keri Nelson,1


In [125]:
df_new = df.select('bot')

In [126]:
df_new.limit(2) .toPandas ()

Unnamed: 0,bot
0,"""""text"""": """"[Southeast Houston] Chansey ..."
1,1


You probably notice that `df_new`, and `df` are different!
They are pointers to two different DataFrames.

Try the next command:

In [127]:
df.select('bot').limit(2) .toPandas ()

Unnamed: 0,bot
0,"""""text"""": """"[Southeast Houston] Chansey ..."
1,1


In [128]:
df.limit(2) .toPandas ()

Unnamed: 0,id,id_str,screen_name,location,description,url,followers_count,friends_count,listed_count,created_at,favourites_count,verified,statuses_count,lang,status,default_profile,default_profile_image,has_extended_profile,name,bot
0,8.16e+17,"""""""815745789754417152""""""","""""""HoustonPokeMap""""""","""""""Houston","TX""""""","""""""Rare and strong PokŽmon in Houston",TX. See more PokŽmon at https://t.co/dnWuDbFR...,"""""""https://t.co/dnWuDbFRkt""""""",1291,0,10,"""""""Mon Jan 02 02:25:26 +0000 2017""""""",0,FALSE,78554.0,"""""""en""""""","""{ """"created_at"""": """"Sun Mar 12 15:44:04 ...","""""id"""": 840951532543737900","""""id_str"""": """"840951532543737856""""","""""text"""": """"[Southeast Houston] Chansey ..."
1,4843621225.0,4843621225,kernyeahx,"Templeville town, MD, USA",From late 2014 Socium Marketplace will make sh...,,1,349,0,2/1/2016 7:37,38,FALSE,31,en,,TRUE,FALSE,FALSE,Keri Nelson,1


The last `toPandas ()` commands printed different results, 

### why?

`df.select('bot')` functionality returns pointer to a new immutable DataFrame! AHA!

Let's have `df_new` and `df` point to the same DataFrame.
By doing this, we release the pointer from `df_new` and it can be erased from memory.

If we wish to have access to it again, we will need to rerun the logic.
Bare that in mind for working with `Apache Spark`.

In [129]:
df_new = df
df_new.limit(2) .toPandas()

Unnamed: 0,id,id_str,screen_name,location,description,url,followers_count,friends_count,listed_count,created_at,favourites_count,verified,statuses_count,lang,status,default_profile,default_profile_image,has_extended_profile,name,bot
0,8.16e+17,"""""""815745789754417152""""""","""""""HoustonPokeMap""""""","""""""Houston","TX""""""","""""""Rare and strong PokŽmon in Houston",TX. See more PokŽmon at https://t.co/dnWuDbFR...,"""""""https://t.co/dnWuDbFRkt""""""",1291,0,10,"""""""Mon Jan 02 02:25:26 +0000 2017""""""",0,FALSE,78554.0,"""""""en""""""","""{ """"created_at"""": """"Sun Mar 12 15:44:04 ...","""""id"""": 840951532543737900","""""id_str"""": """"840951532543737856""""","""""text"""": """"[Southeast Houston] Chansey ..."
1,4843621225.0,4843621225,kernyeahx,"Templeville town, MD, USA",From late 2014 Socium Marketplace will make sh...,,1,349,0,2/1/2016 7:37,38,FALSE,31,en,,TRUE,FALSE,FALSE,Keri Nelson,1


By the Way! `limit(2)` returns a pointer to a DataFrame with 2 rows.

Interesting! This is what **Immutability** means!! 


## Exercise 2: Get a feel for the data

Look at 2 records from the DataFrame to understand the values better before filter: use take() function

```python
df.take(insert an integer here)
```

In [130]:
df.take(2)

[Row(id='8.16E+17', id_str='"""815745789754417152"""', screen_name='"""HoustonPokeMap"""', location='"""Houston', description=' TX"""', url='"""Rare and strong PokŽmon in Houston', followers_count=' TX. See more PokŽmon at https://t.co/dnWuDbFRkt"""', friends_count='"""https://t.co/dnWuDbFRkt"""', listed_count='1291', created_at='0', favourites_count='10', verified='"""Mon Jan 02 02:25:26 +0000 2017"""', statuses_count='0', lang='FALSE', status='78554', default_profile='"""en"""', default_profile_image='"{      ""created_at"": ""Sun Mar 12 15:44:04 +0000 2017""', has_extended_profile='      ""id"": 840951532543737900', name='      ""id_str"": ""840951532543737856""', bot='      ""text"": ""[Southeast Houston] Chansey (F) (IV: 73%) until 11:11:37AM at 2511 Winbern St https://t.co/HYRIyq4mF7 https://t.co/bydOOKsEEI""'),
 Row(id='4843621225', id_str='4843621225', screen_name='kernyeahx', location='Templeville town, MD, USA', description='From late 2014 Socium Marketplace will make shoppin

Check out the schema stracture of the DataFrame.

What are the values types?
Use:

```python
df.printSchema()
```

In [131]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- id_str: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- description: string (nullable = true)
 |-- url: string (nullable = true)
 |-- followers_count: string (nullable = true)
 |-- friends_count: string (nullable = true)
 |-- listed_count: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- favourites_count: string (nullable = true)
 |-- verified: string (nullable = true)
 |-- statuses_count: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- status: string (nullable = true)
 |-- default_profile: string (nullable = true)
 |-- default_profile_image: string (nullable = true)
 |-- has_extended_profile: string (nullable = true)
 |-- name: string (nullable = true)
 |-- bot: string (nullable = true)



Run the next function:

```python
df.limit(5) .toPandas ()
```

What happened here? `toPandas` function took the Spark DataFrame and translated it into Pandas DataFrame.

#### Run this function only on small data sets and when exploring the data. 
#### Otherwise, you might run out of memory! 


In [132]:

df.limit(25) .toPandas ()

Unnamed: 0,id,id_str,screen_name,location,description,url,followers_count,friends_count,listed_count,created_at,favourites_count,verified,statuses_count,lang,status,default_profile,default_profile_image,has_extended_profile,name,bot
0,8.16e+17,"""""""815745789754417152""""""","""""""HoustonPokeMap""""""","""""""Houston","TX""""""","""""""Rare and strong PokŽmon in Houston",TX. See more PokŽmon at https://t.co/dnWuDbFR...,"""""""https://t.co/dnWuDbFRkt""""""",1291,0,10,"""""""Mon Jan 02 02:25:26 +0000 2017""""""",0,FALSE,78554,"""""""en""""""","""{ """"created_at"""": """"Sun Mar 12 15:44:04 ...","""""id"""": 840951532543737900","""""id_str"""": """"840951532543737856""""","""""text"""": """"[Southeast Houston] Chansey ..."
1,4843621225.0,4843621225,kernyeahx,"Templeville town, MD, USA",From late 2014 Socium Marketplace will make sh...,,1,349,0,2/1/2016 7:37,38,FALSE,31,en,,TRUE,FALSE,FALSE,Keri Nelson,1
2,4303727112.0,4303727112,mattlieberisbot,,"Inspired by the smart, funny folks at @replyal...",https://t.co/P1e1o0m4KC,1086,0,14,Fri Nov 20 18:53:22 +0000 2015,0,FALSE,713,en,"""{'retweeted': False, 'is_quote_status': False...",'truncated': False,'in_reply_to_user_id': None,'created_at': 'Mon Mar 13 16:00:00 +0000 2017','contributors': None,'in_reply_to_status_id_str': None
3,3063139353.0,3063139353,sc_papers,,,,33,0,8,2/25/2015 20:11,0,FALSE,676,en,Construction of human anti-tetanus single-chai...,TRUE,TRUE,FALSE,single cell papers,1
4,2955142070.0,2955142070,lucarivera16,"Dublin, United States",Inspiring cooks everywhere since 1956.,,11,745,0,1/1/2015 17:44,146,FALSE,185,en,,FALSE,FALSE,FALSE,lucarivera16,1
5,8.41e+17,8.41E+17,dantheimprover,"Austin, TX",Just a guy trying to do good by telling everyo...,,1,186,0,13/03/2017 22:53,0,FALSE,11,en,"""Status(_api=<tweepy.api.API object at 0x10192...",'in_reply_to_status_id': None,'in_reply_to_status_id_str': None,'in_reply_to_user_id': None,'in_reply_to_user_id_str': None,'in_reply_to_screen_name': None
6,2482834658.0,2482834658,_all_of_us_,in a machine.,bot by @rubicon,,193,0,19,Wed May 07 22:29:25 +0000 2014,0,FALSE,6068,en,"""{u'contributors': None, u'truncated': False, ...",u'retweeted': False,u'coordinates': None,u'entities': {u'symbols': [],u'user_mentions': [],u'hashtags': []
7,3333573622.0,3333573622,KatamariItems,,[Bot rolled up by @BeachEpisode] Cataloguing e...,,8227,2,89,Thu Jun 18 22:07:31 +0000 2015,26,FALSE,2597,en,"""{u'contributors': None, u'truncated': False, ...",u'retweeted': False,u'coordinates': None,u'entities': {u'symbols': [],u'user_mentions': [],u'hashtags': []
8,2996105102.0,2996105102,AutophagyPapers,,Twitterbot for #Autophagy papers. Curated by @...,,275,0,17,1/25/2015 17:34,23,FALSE,9922,en,Feeding Schedule And Proteolysis Regulate Auto...,FALSE,FALSE,FALSE,Autophagy Papers,1
9,3271095818.0,3271095818,HSC_papers,,,,51,3,9,7/7/2015 15:23,0,FALSE,2515,en,Functional Selectivity in Cytokine Signaling R...,TRUE,FALSE,FALSE,Hematopoiesis,1


How many lines have missing values? run the next command to figure it out! 

```python
import pyspark.sql.functions as f
from functools import reduce
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))).count()
```



In [133]:
import pyspark.sql.functions as f
from functools import reduce
df.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in df.columns))).count()

1780

### Distinct Value

Get the sum  of `id` distinct values, it should be equal to the size of the data 

Try both `id` and `id_str` fields.

Use the next code and adjust it according to the field:

```pythob
df.select("field_name").distinct().count()
```


What happened here? Is it in the same size of the data set?
Don't worry; We fix that soon!

In [134]:
df.select("id").distinct().count()

2403

In [135]:
df.select("id_str").distinct().count()

2439

### Is Null

How many rows have null on the `screen_name` column?

Use the `where` with col `.isNull` function to get the DataFrame with null value for `column_name`.

Count it! Use the count method for that.

Code sample:
```python
df.where(f.col('column_name').isNull()).count()
```

In [136]:
df.where(f.col('screen_name').isNull()).count()

5

<details><summary>Answer</summary>
<p>

#### 5

</p>
</details>

### Standard Deviation

As part of exploring the data phase, the standard deviation(stddev) is a must!

Calculate **stddev** for `followers_count`.

### Notice! 
Some rows have None/Null for `followers_count`, we can:

1. Ignore and not calculate the stddev for them

**OR** 

2. Give them a default value

**OR** 

3. Filter them entirely out of our training data.

Start with counting how many rows has null for followers_count:

Run this:
```python
df.where(f.col('followers_count').isNull()).count()
```



In [137]:
df.where(f.col('followers_count').isNull()).count()

45

We go with:  `2. Give them a default value`

#### Give deafult values with - Fill null values - fillna()

Give the null cells a default value:
Using [fillna](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna)

Notice the matching type request.
Meaning, if a column is of type string, we will need a default value of type string.
At the moment, all are fields are of type string.

Code sample:
```python
df_defaultvalue = df.fillna({'column_name':'0'})
```

<details><summary>Answer</summary>
<p>

df_defaultvalue = df.fillna({'followers_count':'0'})

</p>
</details>

Remember to valide yourself with count:

```python
df_defaultvalue.where(f.col('followers_count').isNull()).count()
```

In [138]:
df_defaultvalue = df.fillna({'followers_count':0})

In [139]:
df_defaultvalue.where(f.col('followers_count').isNull()).count()

0

2nd phase of **standard deviation** calculation is:

Casting data to numbers!

Cast it to integer:

In the code sample, replace the `column_name` with `followers_count`:
```python
from pyspark.sql.types import IntegerType

data_df = df_defaultvalue.withColumn("column_name", df_defaultvalue["column_name"].cast(IntegerType()))
```

In [140]:
from pyspark.sql.types import IntegerType

data_df = df_defaultvalue.withColumn("followers_count", df_defaultvalue["followers_count"].cast(IntegerType()))

Calculate Standard Deviation! 

Use `pyspark.sql.function` methods, [here are the docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)

Check out **describe** functionality. it provides us `count`, `mean`, `stddev`, `min` and `max` calculations in one function!

**Remember** - Use the last DataFrame that you created, with the casting and default values.


`describe` can take any field, or calculate statistics for all fields.

Code Example:
```python
df.describe(['age']).show()
df.describe().toPandas().transpose()

```

In the code example, Change `age` to `followers_count` and run it!


In [141]:
data_df.describe(['followers_count']).show()

+-------+-----------------+
|summary|  followers_count|
+-------+-----------------+
|  count|             2781|
|   mean|995260.6181229773|
| stddev|5604474.389024296|
|    min|                0|
|    max|         96321564|
+-------+-----------------+



In [142]:
data_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
id,2840,1.4824597889799824E17,3.1005777294299475E17,},«I started something I couldn’t finish - The S...
id_str,2836,1.42975365019679824E17,3.0581818876766093E17,"visit http://t.co/o5SYTE11ku.""",https://t.co/wfdAJoxfYZ
screen_name,2835,539.5,660.4263774259778,"""""""1lovetakes2""""""",ÓATVI_ABÒ
location,1816,11242.444444444445,31093.281232088997,,"€¡stanbul, TÌ_rkiye"
description,2433,9.0,9.899494936611665,"CA""""""",‰÷Ï_ÙÕ_ ì_¥Ð _ö´‰¼§â‰_£_¥ _ÙÔ_ÙÕ
url,1540,0.5142857142857142,0.6584933146039915,82% notification for all top tiered pokemon,"the force is with me"""" - dead blind guy"""
followers_count,2781,995260.6181229773,5604474.389024296,0,96321564
friends_count,2797,11659.697800216372,266249.06763620034,European-style board games,https://t.co/BkV5HL3bmA
listed_count,2797,3238.1019748653503,17316.69703229031,"#lifestyle #exercise #food #holistic #natural""""""",


This data is dirty! 

Have you noticed a weird behavior with `id` and `id_str`?

Run `.distinct().count()` on each, and count how many blank values there are there.

Who has the most distinct values? Is it the same as the DataFrame?


Use the code sample and remember to replace column name accordinly
```python
df.select("id_str").distinct().count()
df.select("id").distinct().count()
```

In [143]:
data_df.where(f.col('id_str').isNull()).count()

4

In [144]:
data_df.where(f.col('id').isNull()).count()

0

In [145]:
df.select("id_str").distinct().count()

2439

In [146]:
df.select("id").distinct().count()

2403

You probably discovered that we couldn't trust `id` nor `id_str` !

Oops! What should we do? Do we need them at all?

Continue to Excercise 3! 

---

## Exercise 3: Filter the DataFrame 

You have reached the last section of cleaning and preparing the data 🎊


In this exercise - you filter, cast, and add a default value to necessary fields using the Spark functionality.

You are going to use the DataFrame that you created in chapters (2,3, and 4!)📙

Follow the instructions. For any questions, please use 👉 the Q&A chat.  

---

Start with casting:
Run the next commands:

In [147]:
from pyspark.sql.types import IntegerType, BooleanType

casted_df = data_df.withColumn("friends_count", data_df["friends_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("listed_count", casted_df["listed_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("favourites_count", casted_df["favourites_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("statuses_count", casted_df["statuses_count"].cast(IntegerType()))
casted_df = casted_df.withColumn("verified", casted_df["verified"].cast(BooleanType()))
casted_df = casted_df.withColumn("default_profile", casted_df["default_profile"].cast(BooleanType()))
casted_df = casted_df.withColumn("has_extended_profile", casted_df["has_extended_profile"].cast(BooleanType()))
casted_df = casted_df.withColumn("default_profile_image", casted_df["default_profile_image"].cast(BooleanType()))

What happened here? check it out:

In [148]:
casted_df.limit(5) .toPandas()
df = casted_df 

✅ **Task :** 

#### When and withColumn functionality

Let's fix the weird behavior of `id_str` and `id` fields.


Now that we know that there are some blanks values for `id_str`, let's try to fill them out with `id` values.

For achiving that, we will use the `when` functions:

```python
from pyspark.sql.functions import when
new_df = df.select(when(df['age'].isNull(), 3).otherwise(df['age']))

```

Use `when` with the `withColumn` functionality:

```python
from pyspark.sql.functions import withColumn
new_df = df.withColumn('age2', df.age + 2)

```


Put `where` and `withColumn` together:

```python
 new_df = df.withColumn('new_column_name',when(df['column_that_we_check'].isNotNull(),df['colum_if_true']).
                        otherwise(df['column_if_false']))
```


Replace **age** column from the examples with `id_str` and `id` according to the needs.


**Remember!** DataFrame is an immutable object. Each function on DataFrame that transform it creates another DataFrame and returns a pointer to the new one. Remember to work with your latest DataFrame and validate yourself! 



[Docs for when](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when), 
[Docs for withColumn](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.withColumn)


In [149]:
from pyspark.sql.functions import when
test = df.withColumn('id_str',when(df['id'].isNotNull(),df['id']).otherwise(df['id_str']))

<details><summary>Hint</summary>
<p>

Use the isNull function with when, like this:
    
```python
when(df['id_str'].isNull(),df['id']).otherwise(df['id_str'])

```  

</p>
</details>

<details><summary>Answer</summary>
<p>
    
```python
from pyspark.sql.functions import when
test = df.withColumn('id_str',when(df['id_str'].isNull(),df['id']).otherwise(df['id_str']))

```  
</p>
</details>

In [150]:
# Happy with the results? save erase your old DataFrame with the next command:
df = test

In [151]:
# validate yourself
df.select("id_str").distinct().count()

2403

---

✅ **Task :** 

#### drop functionality

Drop column `id` with drop function:
    
```python
   new_df = df.drop('column_name')
```

And validate the schema for the new DataFrame

```python
  new_df.schema
```

After validating the new DataFrame, overwrite the reference to clear memory:

```python
   df = new_df
```

In [152]:
new_df = df.drop('id')

In [153]:
df = new_df

---

✅ **Task :**  

Drop the next fields:
    * `default_profile_image`
    * `has_extended_profile`
    * `url`
    * `created_at`
    
    You can drop field by field, or provide all the fields to drop in one function call.
    
```python
    updated_df = df.drop('age','history','another_column_to_drop')
```




**Remember** to validate yourself with the schema and overwrite the DataFrame reference

In [154]:
new_df = df.drop('default_profile_image','has_extended_profile','url','created_at','lang')

In [155]:
# What did you get? Happy with the results?

new_df.limit(15) .toPandas()

Unnamed: 0,id_str,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,8.16e+17,"""""""HoustonPokeMap""""""","""""""Houston","TX""""""",,,1291,10,,0,78554,,"""""id_str"""": """"840951532543737856""""","""""text"""": """"[Southeast Houston] Chansey ..."
1,4843621225.0,kernyeahx,"Templeville town, MD, USA",From late 2014 Socium Marketplace will make sh...,1.0,349.0,0,38,False,31,,True,Keri Nelson,1
2,4303727112.0,mattlieberisbot,,"Inspired by the smart, funny folks at @replyal...",1086.0,0.0,14,0,False,713,"""{'retweeted': False, 'is_quote_status': False...",,'contributors': None,'in_reply_to_status_id_str': None
3,3063139353.0,sc_papers,,,33.0,0.0,8,0,False,676,Construction of human anti-tetanus single-chai...,True,single cell papers,1
4,2955142070.0,lucarivera16,"Dublin, United States",Inspiring cooks everywhere since 1956.,11.0,745.0,0,146,False,185,,False,lucarivera16,1
5,8.41e+17,dantheimprover,"Austin, TX",Just a guy trying to do good by telling everyo...,1.0,186.0,0,0,False,11,"""Status(_api=<tweepy.api.API object at 0x10192...",,'in_reply_to_user_id_str': None,'in_reply_to_screen_name': None
6,2482834658.0,_all_of_us_,in a machine.,bot by @rubicon,193.0,0.0,19,0,False,6068,"""{u'contributors': None, u'truncated': False, ...",,u'user_mentions': [],u'hashtags': []
7,3333573622.0,KatamariItems,,[Bot rolled up by @BeachEpisode] Cataloguing e...,8227.0,2.0,89,26,False,2597,"""{u'contributors': None, u'truncated': False, ...",,u'user_mentions': [],u'hashtags': []
8,2996105102.0,AutophagyPapers,,Twitterbot for #Autophagy papers. Curated by @...,275.0,0.0,17,23,False,9922,Feeding Schedule And Proteolysis Regulate Auto...,False,Autophagy Papers,1
9,3271095818.0,HSC_papers,,,51.0,3.0,9,0,False,2515,Functional Selectivity in Cytokine Signaling R...,True,Hematopoiesis,1


In [156]:
df = new_df

---

✅ **Task :**  

#### Drop duplicates and describe functionality

Sometimes, we get duplicated data accidentally, drop all duplicated by using the 
`dropDuplicates` function:
    
```python
    new_df = df.dropDuplicates()
```

---

In [157]:
new_df = df.dropDuplicates()
new_df.count()

2840

Use `describe` to understand how the data looks like now.
Remember that describe works only on numerical values.


Use the next code sample and adjust it to your needs:
```python
new_df.describe('column_name').show()
```

In [158]:
new_df.describe('favourites_count').show()

+-------+------------------+
|summary|  favourites_count|
+-------+------------------+
|  count|              2749|
|   mean|2006.8162968352128|
| stddev|16182.205112382235|
|    min|                 0|
|    max|            714021|
+-------+------------------+



In [159]:
# Happy with the results?

df = new_df


The Machine Learning algorithm you are going to use doesn't take text/string as input. 

Hence, we transfer String columns to boolean or numerical.


Turn all String columns to boolean or numerical, if not possible, drop them.

---

✅ **Task :** 

Most of our data can be translated to _Integer_ , 1 for exist, 0 for non-exist.

Implement that logic for the next columns:
    * location
    * status
    * screen_name
    * name
    
    
    
Code sample:
```python
df = df.withColumn('column_name',when(df['column_name'].isNull(),0).otherwise(1))
```

Run the next command to make it happen! 
    

In [160]:
df = df.withColumn('location',when(df['location'].isNull(),0).otherwise(1))
df = df.withColumn('status',when(df['status'].isNull(),0).otherwise(1))
df = df.withColumn('screen_name',when(df['screen_name'].isNull(),0).otherwise(1))
df = df.withColumn('name',when(df['name'].isNull(),0).otherwise(1))

---

✅ Task :

Adapt `bot` column. 

`bot` is the data classification column, which indicated if the row represents a bot or not. 

1. Cast it into Integer.
2. Set 1 or 0: 1 for bot and 0 for none bot.

If we don't know what it is, use 0.

Run the next commands, and remember to validate yourself!

In [161]:
df_bot = df.withColumn('bot',df['bot'].cast(IntegerType()))
df_bot.limit(5) .toPandas()

Unnamed: 0,id_str,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,3366974463.0,1,1,A bot that creates hilarious joax via Google's...,1058,1,83,0,False,7678,1,,1,
1,7.19e+17,1,0,Bot by @czircon. #botALLY,26,0,6,0,False,6832,1,,1,
2,2651152393.0,1,0,"daily writing assignments, posts twice a day ~...",148,0,22,0,False,1723,1,,1,
3,2203838767.0,1,0,This is a bot to explain things like love and ...,116,0,39,0,False,8421,1,,1,
4,1948423238.0,1,0,It's lonely out in space. Rookie @exoriders pa...,51,0,19,0,False,13622,1,,1,


In [162]:
df_bot = df_bot.withColumn('bot',when(df_bot['bot'].isNull(),0).otherwise(df_bot['bot']))
df_bot.limit(5) .toPandas()

Unnamed: 0,id_str,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,3366974463.0,1,1,A bot that creates hilarious joax via Google's...,1058,1,83,0,False,7678,1,,1,0
1,7.19e+17,1,0,Bot by @czircon. #botALLY,26,0,6,0,False,6832,1,,1,0
2,2651152393.0,1,0,"daily writing assignments, posts twice a day ~...",148,0,22,0,False,1723,1,,1,0
3,2203838767.0,1,0,This is a bot to explain things like love and ...,116,0,39,0,False,8421,1,,1,0
4,1948423238.0,1,0,It's lonely out in space. Rookie @exoriders pa...,51,0,19,0,False,13622,1,,1,0


Do the same with the other booelan fields:
    Run next commends:

In [163]:
df_bot = df_bot.withColumn('verified',df['verified'].cast(IntegerType()))
df_bot = df_bot.withColumn('default_profile',df_bot['default_profile'].cast(IntegerType()))

df_bot = df_bot.withColumn('verified',when(df_bot['verified'].isNull(),0).otherwise(df_bot['verified']))
df_bot = df_bot.withColumn('default_profile',when(df_bot['default_profile'].isNull(),0).otherwise(df_bot['default_profile']))

df_bot.limit(5) .toPandas()

Unnamed: 0,id_str,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,3366974463.0,1,1,A bot that creates hilarious joax via Google's...,1058,1,83,0,0,7678,1,0,1,0
1,7.19e+17,1,0,Bot by @czircon. #botALLY,26,0,6,0,0,6832,1,0,1,0
2,2651152393.0,1,0,"daily writing assignments, posts twice a day ~...",148,0,22,0,0,1723,1,0,1,0
3,2203838767.0,1,0,This is a bot to explain things like love and ...,116,0,39,0,0,8421,1,0,1,0
4,1948423238.0,1,0,It's lonely out in space. Rookie @exoriders pa...,51,0,19,0,0,13622,1,0,1,0


How many bots and none bots we have in the data?

Run the next command to check out! 

In [164]:
df_bot.where(df['bot']==0).count()

190

In [165]:
df_bot.where(df['bot']==1).count()

322

In [166]:
#Happy with the results? 

df = df_bot

---

✅ **Task :** 

#### drop N/A functionality - dropna()


`dropna` functionality is dropping rows where the column given value is null.


1. Drop `id_str` column completly
2. Drop rows with N/A for `description`:

Code example:
```python
df_new = df.drop('id_str')

# in order to avoid errors, drop rows with null/None or N/A for description
df_new = df_new.dropna(subset=['description'])
# validate yourself
df_new.count()

```

In [167]:
df_new = df.drop('id_str')



In [168]:
# in order to avoid errors, drop rows with null/None or N/A for description
df_new = df_new.dropna(subset=['description'])
# validate yourself
df_new.count()

2433

In [169]:
# Happy with the results?

df = df_new

In [170]:
# Run the next commend, we will need it for chapter number 4
sub = df.selectExpr('description','bot as label')
sub.write.parquet("train_data_only_description")


AnalysisException: ignored

**Save updated DataFrame to file**
To optimize, speed up queries, and maintain schema information, save the DataFrame as a parquet file. 

>Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems


---

✅ **Task :** 

#### User Define Function - udf functionality

`description` is the only string column left.
Spark pattern mining algorithm takes an Array of unique Strings as in input.

Hence, for executing pattern mining on description, you implement a function that takes description column string
and turns it into an Array of unique Strings.

For doing it, you will create a UDF - user define function.



Code example for guidence:

```python
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def split_and_set(some_str):
    return {your python code goes here}

# connect everything together: 
# set the udf
list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))

# call udf from withColumn:
new_df = df.withColumn('description', list_udf(df['description']))

#validate yourself!
new_df.take(2)

#all good?
df = new_df
```


You might get errors, in the exception log stack,
search for `AnalysisException` and try to understand the problem.

Try to think about what will happen if you run the code twice?

Do it with a pointer to new DataFrame so you won't lose your results.


<details><summary>Answer</summary>
<p>

```python
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str


list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))

new_df = df.withColumn('description', list_udf(df['description']))
df = new_df
```
    
</p>
</details>


In [171]:
# check out this python code. Run it. What did you get?
# will this work for the task?
# how do you combine it with UDF?

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str

some_str = '[csds b lol,a]'

split_and_set(split_and_set(some_str))

['b', 'lola', 'csds']

In [172]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str

list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))

new_df = df.withColumn('description', list_udf(df['description']))
df = new_df



# Validate yourself and save to Parquet!

Before saving the DataFrame to Parquet, look at a sample of the data to validate it.

In [173]:
df.limit(5) .toPandas()

Unnamed: 0,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,1,1,"[creates, A, autocomplete, joax, that, Googles...",1058,1,83,0,0,7678,1,0,1,0
1,1,0,"[#botALLY, by, @czircon, Bot]",26,0,6,0,0,6832,1,0,1,0
2,1,0,"[posts, writing, twice, by, @cblgh~, a, ~bot, ...",148,0,22,0,0,1723,1,0,1,0
3,1,0,"[by, explain, a, things, This, hate, to, is, B...",116,0,39,0,0,8421,1,0,1,0
4,1,0,"[out, ten-after, @exoriders, off, Its, Rookie,...",51,0,19,0,0,13622,1,0,1,0


### Happy with the results? Save updated DataFrame to file

In [174]:
# happy with the results? write to file!
# run this command
df.write.parquet("final_train_data")

AnalysisException: ignored

# Well Done! 👏👏👏


## You just finished:  Intro to Data Cleaning and Preparation 


## Next Chapter:  Apache Spark ML - Create train and test set 

In [175]:
df_test = spark.read.csv("SparkML/detecting-twitter-bot-data/testing_bot_data.csv", header= True)

In [176]:
df_test.schema

StructType(List(StructField(id,StringType,true),StructField(id_str,StringType,true),StructField(screen_name,StringType,true),StructField(location,StringType,true),StructField(description,StringType,true),StructField(url,StringType,true),StructField(followers_count,StringType,true),StructField(friends_count,StringType,true),StructField(listed_count,StringType,true),StructField(created_at,StringType,true),StructField(favourites_count,StringType,true),StructField(verified,StringType,true),StructField(statuses_count,StringType,true),StructField(lang,StringType,true),StructField(status,StringType,true),StructField(default_profile,StringType,true),StructField(default_profile_image,StringType,true),StructField(has_extended_profile,StringType,true),StructField(name,StringType,true)))

In [177]:
df_test.count()

578

In [178]:
df_test.limit(25) .toPandas ()

Unnamed: 0,id,id_str,screen_name,location,description,url,followers_count,friends_count,listed_count,created_at,favourites_count,verified,statuses_count,lang,status,default_profile,default_profile_image,has_extended_profile,name
0,2281292622.0,2281292622.0,__keating,brooklyn,lgbt editor at @buzzfeed. shannon.keating@buzz...,https://t.co/QneJmYRyhj,4466,1295,111.0,Tue Jan 07 23:26:52 +0000 2014,1579.0,True,3036,en,"""{'created_at': 'Tue Apr 11 15:31:51 +0000 201...",'truncated': False,'entities': {'hashtags': [],'symbols': [],'user_mentions': [{'screen_name': 'Carrasquillo'
1,2344040251.0,2344040251.0,_callme_Dani,"Los Angeles, CA",News Curation Editor @BuzzFeedNews I do a lot ...,,295,1016,10.0,Fri Feb 14 19:45:56 +0000 2014,300.0,False,618,en,"""{'created_at': 'Tue Apr 11 00:56:02 +0000 201...",'truncated': False,'entities': {'hashtags': [],'symbols': [],'user_mentions': [{'screen_name': 'elliesunak...
2,765871267.0,765871267.0,_little_britt_,,Family comes first! Also I am in love with piz...,https://t.co/E7DE1cJB7e,1001678,3017,14.0,8/18/2012 15:13,13040.0,True,3329,en,"""{'place': None, 'retweeted': False, 'favorite...",'created_at': 'Sat Apr 08 19:18:41 +0000 2017','id': 850790016838238210,'lang': 'en',"'retweet_count': 2037}"""
3,4772373433.0,4772373433.0,134k5,,@BuzzFeedJapan ��� @cnet_japan / DM��܋�㋁_��_��...,https://t.co/Cbguzs2PjT,445,487,17.0,Sun Jan 17 07:11:45 +0000 2016,1112.0,False,46,ja,"""{'created_at': 'Sat Apr 08 08:41:08 +0000 201...",'in_reply_to_status_id': 850628293522894849,'in_reply_to_status_id_str': '850628293522894...,'in_reply_to_user_id': 2249898907,'in_reply_to_user_id_str': '2249898907'
4,1324548560.0,,2181chrom_bot,自分の天幕,これはFE覚醒のクロム…つまり俺がツイ廃なbotらしい。よく分からんがネタ要素しかないそうだ...,http://t.co/10Swf6luED,187,68,13.0,Wed Apr 03 13:00:42 +0000 2013,,,690359,ja,"""{u'lang': u'ja', u'text': u'@2181lucina_bot \...",u'in_reply_to_status_id': 851191070100606976,u'in_reply_to_screen_name': u'2181lucina_bot',u'id_str': u'851191507486986241',u'urls': []
5,2561341789.0,,2LA1R_bot,,ふれあ語をつぶやくbotです たまに中の人(ふれあ)もつぶやきます,,80,87,,Wed Jun 11 13:12:06 +0000 2014,,,20167,ja,"""{u'lang': u'ja', u'text': u'\u3010\u3075\u308...",u'id_str': u'851191514206097409',u'urls': [],"u'id': 851191514206097409}""",TRUE
6,347810134.0,,3pei_bot,三河屋,■ちわーす！三河屋でーす！三郎くんには負けませんｗｗｗ■サザエさんに過去登場、三河屋さんへ勤...,http://twpf.jp/3pei_bot,2020,1978,56.0,Wed Aug 03 11:52:59 +0000 2011,,,968182,ja,"""{u'lang': u'ja', u'text': u'@kazenoraby \uff6...",u'in_reply_to_status_id': 851191255841124352,u'in_reply_to_screen_name': u'kazenoraby',u'id_str': u'851191511765176320',u'urls': []
7,856303860.0,,94kichi_bot,,94 チャック・ウィルソンと愉快な仲間たちの笑いあり涙ありなちょこっとキチガイツイートを集め...,,70,80,2.0,Mon Oct 01 12:39:46 +0000 2012,,,76735,ja,"""{u'lang': u'ja', u'text': u'\u307f\u3093\u306...",u'id_str': u'851191524062670848',u'urls': [],"u'id': 851191524062670848}""",TRUE
8,8.32875e+17,,A3_Dekasegi_bot,ビロード駅前,シトロン「A3!出稼ぎ日誌ダヨー！みんなの出稼ぎ中のあんなことやこんなことをまとめた日誌ネ！...,https://t.co/t171JmIrjL,181,144,2.0,Sat Feb 18 08:50:03 +0000 2017,,,1960,ja,"""{u'lang': u'ja', u'text': u'\u30b7\u30c8\u30e...",u'id_str': u'851191498854998016',u'urls': [],"u'id': 851191498854998016}""",TRUE
9,88856792.0,,aamir_khan,Mumbai,Actor.,https://t.co/l1dUhQjS8Y,20419393,9,6.0,Tue Nov 10 05:08:56 +0000 2009,,True,468,en,"""{u'lang': u'en', u'text': u'Hey guys, doing s...",u'id_str': u'849903030598344704',u'urls': [],u'media': [{u'expanded_url': u'https://twitte...,u'display_url': u'pic.twitter.com/uYmd8FKOVH'


Clean and prep testing data as well:
Remember that here we don't have bot value.

You will not drop id, as we will use it to compare results later.

Excecute the next commands:


In [179]:
from pyspark.sql.types import IntegerType, ArrayType, BooleanType, StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import when


# Dropping irrelevant columns and duplicates
df_test = df_test.drop('default_profile_image','has_extended_profile','url','created_at','lang','id','id_str')
df_test = df_test.dropDuplicates()


In [180]:

# First Transformation
df_test = df_test.withColumn("friends_count", df_test["friends_count"].cast(IntegerType()))
df_test = df_test.withColumn("listed_count", df_test["listed_count"].cast(IntegerType()))
df_test = df_test.withColumn("favourites_count", df_test["favourites_count"].cast(IntegerType()))
df_test = df_test.withColumn("statuses_count", df_test["statuses_count"].cast(IntegerType()))
df_test = df_test.withColumn("verified", df_test["verified"].cast(BooleanType()))
df_test = df_test.withColumn("default_profile", df_test["default_profile"].cast(BooleanType()))


In [181]:

# Second Transformation
df_test = df_test.withColumn('default_profile',df_test['default_profile'].cast(IntegerType()))
df_test = df_test.withColumn('name',when(df_test['name'].isNull(),0).otherwise(1))
df_test = df_test.withColumn('verified',df_test['verified'].cast(IntegerType()))


In [182]:

# Theird Transformation
df_test = df_test.withColumn('verified',when(df_test['verified'].isNull(),0).otherwise(df_test['verified']))
df_test = df_test.withColumn('default_profile',when(df_test['default_profile'].isNull(),0).otherwise(df_test['default_profile']))
df_test = df_test.withColumn('location',when(df_test['location'].isNull(),0).otherwise(1))
df_test = df_test.withColumn('status',when(df_test['status'].isNull(),0).otherwise(1))
df_test = df_test.withColumn('screen_name',when(df_test['screen_name'].isNull(),0).otherwise(1))


In [183]:
# Forth Transformation
df_test = df_test.dropna(subset=['description'])

def split_and_set(some_str):
    if isinstance(some_str, str):
        some_str = ''.join(c for c in some_str if c not in "[](){}<>,'/.")
        return list(set(some_str.split(' ')))
    return some_str

list_udf = udf(lambda y: split_and_set(y), ArrayType(StringType()))
df_test = df_test.withColumn('description', list_udf(df_test['description']))


In [184]:

# Fifth Transformation - fill NA:
df_test = df_test.fillna({'followers_count':0,'statuses_count':0,'favourites_count':0,'listed_count':0,'friends_count':0,})


Save to parquet:

Code sample:
```python
df_test.write.parquet("test_data")
```

In [185]:
df_test.write.parquet("test_data")

The `test_data` file that you save doesn't consist of information about bots at all.

We can use it to compare various algorithms and see how they behave.
However, since our training data is supervised, we would like to test it with classified data.
This will help us estimate our model.

Hence, you will split the training data into testing and train data set.

In [186]:
# Load the train data:
df_train = spark.read.parquet("final_train_data")

Split the training data into training and test sets, hold 30% out for testing.

Use randomSplit function:

```python
(trainingData, testData) = some_data.randomSplit((0.7, 0.3))
```

<details><summary>Hint</summary>
<p>

Use randomSplit function:
    
```python
(trainingData, testData) = data.randomSplit((0.7, 0.3))

```  
</p>
</details>

Remember to validate yourself with count

In [187]:
(trainingData, testData) = df_train.randomSplit([0.7, 0.3])

In [188]:
trainingData.count()

1689

In [189]:
testData.count()

744

Save the split data for the next Chapter.

In [190]:
testData.write.parquet("classified_test_data")

In [191]:
trainingData.write.parquet("classified_train_data")

# Well Done! 👏👏👏


## You just finished:  Apache Spark ML - Create train and test set 


## Next exercise: Apache Spark ML and create machine learning models

# 3 - Apache Spark ML- Create machine learning models

In this chapter, you will:

• Build your first ML model with Spark ML

• Learn more Spark functionality and how to use it

In [192]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("ApacheSparkML") \
    .getOrCreate()

df_train = spark.read.parquet("classified_train_data")
df_test = spark.read.parquet("classified_test_data")



In [193]:
df_train.limit(5) .toPandas ()

Unnamed: 0,screen_name,location,description,followers_count,friends_count,listed_count,favourites_count,verified,statuses_count,status,default_profile,name,bot
0,0,1,[25],,1,,,0,,1,0,1,0
1,1,0,"[, by, @inky, a, https:tcoMJyd6NkYaf, bot, sou...",93.0,0,24.0,0.0,0,5145.0,1,0,1,0
2,1,0,"[, by, @inky, micropoetry, a, bot]",10020.0,27,326.0,0.0,0,14477.0,1,0,1,0
3,1,0,"[, by, @mcmoots, your, Swifties, Follow, tweet...",110.0,111,14.0,0.0,0,3609.0,1,0,1,0
4,1,0,"[, by, before, thats, two, i, musical, kneel, ...",40.0,0,9.0,25.0,0,1868.0,1,0,1,0


✅ Task :

### Frequent Patterns Growth algorithm - FPGrowth()

Your data is clean and organized. It's your turn to create your **first** ML model with Spark.

Run the next code; it runs the **Frequent Patterns Growth** algorithm to extract patterns if those exist.

Tweak the minSupport and minCondifence.


<details><summary>Hint</summary>
<p>

Change the minSupport and minCondifence to 0.1 and see what happens.

</p>
</details>



In [194]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="description", minSupport=0.09, minConfidence=0.09)
fpGrowth_model = fpGrowth.fit(df_train)

# Display frequent itemsets.
fpGrowth_model.freqItemsets.show()

# Display generated association rules.
fpGrowth_model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
fpGrowth_model.transform(df_train).toPandas().transpose()


+-----+----+
|items|freq|
+-----+----+
|[for]| 184|
|[the]| 327|
|  [a]| 252|
|[bot]| 155|
| [in]| 216|
|[and]| 374|
|   []| 219|
| [to]| 213|
| [of]| 338|
|  [I]| 157|
| [by]| 323|
+-----+----+

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+



Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,...,1649,1650,1651,1652,1653,1654,1655,1656,1657,1658,1659,1660,1661,1662,1663,1664,1665,1666,1667,1668,1669,1670,1671,1672,1673,1674,1675,1676,1677,1678,1679,1680,1681,1682,1683,1684,1685,1686,1687,1688
screen_name,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1
location,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1
description,[25],"[, by, @inky, a, https:tcoMJyd6NkYaf, bot, sou...","[, by, @inky, micropoetry, a, bot]","[, by, @mcmoots, your, Swifties, Follow, tweet...","[, by, before, thats, two, i, musical, kneel, ...","[, by, gene, @pra514, Drosophila, elements, pa...","[, follow, _Ùªâ, @jalfiebutler, for, back, Fol...","[, here, @thricedotted, an, there, image, or]","[, mass, by, completely, against, fight, @thri...","[, starting, @avoision, Twitter, posting, with...","[, your, for, uninterrupted, the, location, an...","[, Üó__Öô, we, means, Wheever, Teiru, -, what,...","[-, Snapchat, Parineetichopra, ChopraParineeti...","[30, mins, @chevalier_cygne, for, not, with, b...","[@LethargicJOE, this, get, your, follow, You, ...","[@jeffthompson_, guitar, by, Twitter, generati...",[@mihynus],"[ADVICE, B‰ÐÓOT, SO‰Ð_URCE, NOT, GOO‹ÎÇD, FOR,...","[Adherent, Sci-Fi, Pakistani, Patriotic, Crick...","[CB, papers, SCFBM, twitterfeed, with, Briefin...","[Corp, Mad, Im, #digitalarchival, Pretty, Ludw...","[Corporation, Intel, President, -]","[Destigmatizing, a, misunderstood, our, unappr...","[EDT, planet, universe, our, home, discover, w...","[Experimentand, diversos, origen, Mexicano, Me...","[FREE, FOLLOW, BC, YOU, THIS, GAVE, SHE, @Cata...",[Female],"[Front-end, Architect]","[Here:, Law, Relationships, the, of, Attractio...","[I, Skype:willrowan1, heimlich, dancing, dont,...","[I, You, what, do, know, who, am, and]","[I, a, of, am, made, chocolate, moose]","[I, here, am, https:tcoeT03DwzhH6, Hello]","[Lesser, Bot, the, From, of, style, endless, «...","[Memphis, TN]","[NEE, I, ILL, ASS, ACK, KICK, FOLLOW, SLOGGIN,...","[Nature, plant, Science, new, automatically, t...","[Out, Coming, https:tcobUj2nsdaAn, Play, Autho...","[PST, 3-10, a, Complaints:, times, @beaugunder...","[Papers, by, mostly, fed, -, Run, @TomEdWhite,...",...,"[teaching, by, University, Stanford, feed, lea...","[that, for, @Dartmouth, the, Banker, were, Its...","[things, to, designs, people, help, Inspiring,...","[using, by, @notinventedhere, that, malfunctio...","[weee, rock, willll, you!, weeee]","[work, year, since, research, SACE, Kenya, Afr...","[writing, get, job, down, grow, them, a, thing...","[youre, here, with, me, now]","[äó¢, at, boy, https:tco17vp1IHXJF, https:tcou...",[‰ª»],"[‰Û¢, by, memes, #botALLY, bot, the, 2016, @ck...","[, by, at, @mattlaschneider, host, a, my, serv...","[#LeoFacts, your, Right, off, Start, -, #Leo, ...","[16, KWK]","[Great, oceanographer, Lakes, banjo, player]","[at, School, Affairs, NYU, Events, &, Director...","[cant, You, spell, other, without, words]","[erudition, notepad, Twitter, my, public, is, ...","[fav, by, @robdubbin, former, think, things, T...","[forward, lookin, to, the, weekend!, Everybodys]","[super, Twitterbot, microscopy, for, to, resol...","[, England""""""]","[, NSW""""""]","[, Verizon, driver, CART, and, NASCAR, Indycar...","[Arizona, , West, my, crew, to, Coast, in, the...","[Believer!, Dreamer!, MICAn, !]","[Founder, CEO, &, @Kryptnostic]","[Lin"""""", """"""Katie]","[Mittens, Ponies, Doorbells, Roses, Noodles, S...","[Post, Washington, reporter, Investigative]","[Programming, Anime, äóñ, English, other, Thai...","[account, papers, for, sequencing, twitter, em...","[at, Humor, Performer, @ucbtny, editor, @ToolT...","[at, hats, Inc, Galois, of, many, Wearer]","[by, fintech, for, @Aden_76, little, twitter, ...","[doooof, bot, Bidoof]","[elevate, 2, my, to, married, about, Proud, of...",[http:tcoYfHoThSDVQ],"[it, TWITTER, SHAWNNA, Booking:, Getting, to, ...","[link, #SpeakResponsibly, Host, @FoxSoccer, TV..."
followers_count,,93,10020,110,40,9,34,314,213,136,29,0,8.23432e+06,560,189,119,292,50,102,284,358,2376,742,2.22708e+07,0,107,0,1986,259,52,374816,32,17,4937,61191,0,1609,226643,337,173,...,493450,4309,1,633,649,325,2.53082e+06,6298,55859,2623,16,183,489055,96,27,32,72,250,280,14,307,,,1.18236e+06,2,115,289,0,481,4,41,803,2982,0,7230,6027,5,89,122108,458212
friends_count,1,0,27,111,0,0,787,0,0,1,1,44,254,0,366,1,2225,2,218,29,5000,19,38,263,32,541,1,4055,991,337,1090,17,564,5,1194,26,0,347,2,0,...,509,2316,328,43,30,104,840,3780,598,5,1,1,741,241,118,30,0,523,0,0,4,970,918,272,465,93,124,,0,492,72,31,794,247,947,0,662,329,5790,710
listed_count,,24,326,14,9,7,0,50,23,21,9,0,3716,17,0,21,0,10,1,31,12,94,27,87505,0,0,0,909,0,0,2651,2,0,135,1169,0,60,1809,42,7,...,6483,296,0,65,68,18,35966,82,549,52,8,16,1221,2,1,2,20,8,28,7,14,1642,0,9485,0,2,32,,78,0,2,34,95,0,1019,36,1,2,923,4944
favourites_count,,0,0,0,25,0,7,0,0,5,0,45,3,0,1,0,0,0,294,1,73,1,3,2329,59,151,0,1175,0,2503,7960,3,0,0,5313,39,0,2469,12,2,...,51,2583,27,76,0,12,2137,85334,92644,409,0,1,53,600,28,11,0,3313,0,0,5,,,2498,56,74,2524,,0,57,2737,0,4950,28,162,2,169,3,223,4586
verified,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,1,0,0,1,0,0,...,1,0,0,0,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,1
statuses_count,,5145,14477,3609,1868,130,2,2978,5685,34446,105,69,6123,17873,36,18719,0,1228,354,4865,1236,14,367778,46185,82,650,0,10207,0,247,22667,173,4,1641,16821,57,1670,7937,7870,1188,...,15252,5690,26,1627,213,151,104891,46617,65098,3952,3222,3511,17634,527,29,12,2233,596,6413,129,2060,,,9677,76,126,2770,,6391,76,9615,808,1501,24,75122,116980,208,498,24891,7554
status,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1


What did you get?



<details><summary>Check me after running the previous code multipale times</summary>
<p>

When tweaking the minSupport=0.21 and minConfidence=0.1

You get:

|items|freq|
|----|:----:|
|[and]| 389|


</p>
</details>

---

✅ **Task :**

### LinearRegression functionality

It's your turn to create your **second** ML model with Spark - Linear Regression.




<details><summary>What is linear regression used for?</summary>
<p>
Linear regression is a common Statistical Data Analysis technique. It is used to determine the extent to which there is a linear relationship between a dependent variable and one or more independent variables.
</p>
</details>


**But**, before jumping right into it, you should know:

Spark ML Linear Regression **input** is:
1. `label` of type Double - our classification
2. `features` of type - `Vector[Double]` - Vector of Double, turn all columns into one column named features.
Hence, you will transform all _numeric_ columns into one Vector.

Leave the `description` out as it is not relevant for your next task.
    
For creating `features` column we use Vector Assembler
``` python
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features",handleInvalid = "skip")
new_df = vecAssembler.transform(dataFrame)
new_df.show()

```
<details><summary><b>Did you know!</b></summary>
<p>

Vector has two types in Spark:
    
    1. Dense Vector
    2. Sparse Vector
    
_Sparse vector_ is when you have many values in the vector as zero or null.

_Dense vector_ is when most of the values in the vector are non zero or non-null.

</p>
</details>


🤔 **Question**

Have you noticed `handleInvalid` param?  
```python
handleInvalid = "skip"
```




What happens if you remove it?

Validate yourself with `vecAssembler.show()`

Notice! You have a new DataFrame now. 

Remember to check yourself and work with the new DataFrame

In [195]:
from pyspark.ml.feature import VectorAssembler

train = df_train.drop('description')
vecAssembler = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
df_train = vecAssembler.transform(train)
df_train.toPandas().transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,...,1573,1574,1575,1576,1577,1578,1579,1580,1581,1582,1583,1584,1585,1586,1587,1588,1589,1590,1591,1592,1593,1594,1595,1596,1597,1598,1599,1600,1601,1602,1603,1604,1605,1606,1607,1608,1609,1610,1611,1612
screen_name,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1
location,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,...,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1
followers_count,93,10020,110,40,9,34,314,213,136,29,0,8234316,560,189,119,292,50,102,284,358,2376,742,22270820,0,107,0,1986,259,52,374816,32,17,4937,61191,0,1609,226643,337,173,3211350,...,310,130170,16,493450,4309,1,633,649,325,2530820,6298,55859,2623,16,183,489055,96,27,32,72,250,280,14,307,1182355,2,115,289,481,4,41,803,2982,0,7230,6027,5,89,122108,458212
friends_count,0,27,111,0,0,787,0,0,1,1,44,254,0,366,1,2225,2,218,29,5000,19,38,263,32,541,1,4055,991,337,1090,17,564,5,1194,26,0,347,2,0,23,...,1056,986,1859,509,2316,328,43,30,104,840,3780,598,5,1,1,741,241,118,30,0,523,0,0,4,272,465,93,124,0,492,72,31,794,247,947,0,662,329,5790,710
listed_count,24,326,14,9,7,0,50,23,21,9,0,3716,17,0,21,0,10,1,31,12,94,27,87505,0,0,0,909,0,0,2651,2,0,135,1169,0,60,1809,42,7,5707,...,66,8281,0,6483,296,0,65,68,18,35966,82,549,52,8,16,1221,2,1,2,20,8,28,7,14,9485,0,2,32,78,0,2,34,95,0,1019,36,1,2,923,4944
favourites_count,0,0,0,25,0,7,0,0,5,0,45,3,0,1,0,0,0,294,1,73,1,3,2329,59,151,0,1175,0,2503,7960,3,0,0,5313,39,0,2469,12,2,751,...,11675,1860,262,51,2583,27,76,0,12,2137,85334,92644,409,0,1,53,600,28,11,0,3313,0,0,5,2498,56,74,2524,0,57,2737,0,4950,28,162,2,169,3,223,4586
verified,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,1,0,0,1,0,0,1,...,0,1,0,1,0,0,0,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,1,1
statuses_count,5145,14477,3609,1868,130,2,2978,5685,34446,105,69,6123,17873,36,18719,0,1228,354,4865,1236,14,367778,46185,82,650,0,10207,0,247,22667,173,4,1641,16821,57,1670,7937,7870,1188,8867,...,3335,55222,336,15252,5690,26,1627,213,151,104891,46617,65098,3952,3222,3511,17634,527,29,12,2233,596,6413,129,2060,9677,76,126,2770,6391,76,9615,808,1501,24,75122,116980,208,498,24891,7554
status,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1
default_profile,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,1,0,...,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,0,1,0,0,0,1,0,0,1,0,0,0


---


✅ **Task :** 

To run ML training phase on the scalar vector you need to create DataFrame out of it.

bot and features are the **only** columns that we care about.

Do it with drop function:
    
```python

output = df_train.drop("val1","val2")
```



In [196]:
output_train = df_train.drop('screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name')
output_train.show()

+---+--------------------+
|bot|            features|
+---+--------------------+
|  0|(11,[0,2,4,7,8,10...|
|  0|[1.0,0.0,10020.0,...|
|  0|[1.0,0.0,110.0,11...|
|  0|[1.0,0.0,40.0,0.0...|
|  1|[1.0,0.0,9.0,0.0,...|
|  0|[1.0,0.0,34.0,787...|
|  0|(11,[0,2,4,7,8,10...|
|  0|(11,[0,2,4,7,8,10...|
|  0|[1.0,0.0,136.0,1....|
|  0|[1.0,0.0,29.0,1.0...|
|  0|(11,[0,3,5,7,8,10...|
|  0|[1.0,0.0,8234316....|
|  0|(11,[0,2,4,7,8,10...|
|  0|[1.0,0.0,189.0,36...|
|  0|[1.0,0.0,119.0,1....|
|  1|(11,[0,2,3,9,10],...|
|  0|[1.0,0.0,50.0,2.0...|
|  0|[1.0,0.0,102.0,21...|
|  1|[1.0,0.0,284.0,29...|
|  0|[1.0,0.0,358.0,50...|
+---+--------------------+
only showing top 20 rows



After turning _numeric_ columns into one `features` column and dropping `description`:

We got left with creating `label` column:


Create a new DataFrame with `label` column:
Code sample:
```python
df_for_lr  = output_train.selectExpr("features", "bot as label")
df_for_lr.show()

df_for_lr.toPandas().transpose()
```


Notice that now `df_for_lr` is your new DataFrame for creating `LinearRegression` model

In [197]:
df_for_lr  = output_train.selectExpr("features", "bot as label")
df_for_lr.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(11,[0,2,4,7,8,10...|    0|
|[1.0,0.0,10020.0,...|    0|
|[1.0,0.0,110.0,11...|    0|
|[1.0,0.0,40.0,0.0...|    0|
|[1.0,0.0,9.0,0.0,...|    1|
|[1.0,0.0,34.0,787...|    0|
|(11,[0,2,4,7,8,10...|    0|
|(11,[0,2,4,7,8,10...|    0|
|[1.0,0.0,136.0,1....|    0|
|[1.0,0.0,29.0,1.0...|    0|
|(11,[0,3,5,7,8,10...|    0|
|[1.0,0.0,8234316....|    0|
|(11,[0,2,4,7,8,10...|    0|
|[1.0,0.0,189.0,36...|    0|
|[1.0,0.0,119.0,1....|    0|
|(11,[0,2,3,9,10],...|    1|
|[1.0,0.0,50.0,2.0...|    0|
|[1.0,0.0,102.0,21...|    0|
|[1.0,0.0,284.0,29...|    1|
|[1.0,0.0,358.0,50...|    0|
+--------------------+-----+
only showing top 20 rows



Run the next code.
Check out where you use the new DataFrame - `df_for_lr`

It creates a machine learning model out of linear regression.

Tweak the `maxIter`,`regParam` and `elasticNetParam` !

In [198]:
from pyspark.ml.regression import LinearRegression


lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(df_for_lr)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.023465709340704275,0.0]
Intercept: 0.13172978524419984
numIterations: 6
objectiveHistory: [0.5, 0.49998502993646804, 0.49994162718375174, 0.49994135814201496, 0.49994135647429916, 0.49994135646396143]
+--------------------+
|           residuals|
+--------------------+
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|  0.8448045054150959|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|-0.13172978524419984|
|  0.8448045054150959|
|-0.13172978524419984|
|-0.13172978524419984|
|  0.8448045054150959|
|-0.13172978524419984|
+--------------------+
only showing top 20 rows

RMSE: 0.654855
r2: 0.007697


What did you get?

How does it look like?

What is `r2`? `r2` is a shortcut for R Square: 
>R-squared is a statistical measure of how close the data are to the fitted regression line. It is also known as the coefficient of determination, or the coefficient of multiple determination for multiple regression.

What is RMSE?
> Root Mean Square Error (RMSE) is the standard deviation of the residuals (prediction errors). Residuals are a measure of how far from the regression line data points are; RMSE is a measure of how spread out these residuals are. In other words, it tells you how concentrated the data is around the line of best fit.

Try to play with the parameters and watch how they change.

RMSE alone is meaningless until we compare with the actual `label` value, such as mean, min and max. 
After such comparison, our RMSE looks pretty good.

Compare `RMSE` and `mean` output.
After such comparison, our RMSE looks pretty good.


In [199]:
df_for_lr.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|               1613|
|   mean|0.13391196528208307|
| stddev| 0.6575930882121828|
|    min|                  0|
|    max|                 19|
+-------+-------------------+



You built 2 machine learning models!

However, didn't get the best results.

It's OK!

And absolutly normal.

In chapter 4 you learn how to improve it.

### To load the models later, Save them to file:

In [200]:
lrModel.save("linearRegression_model")

In [201]:
fpGrowth_model.save("fpGrowth_model")

## Well Done! 👏👏👏
## You just finished: Apache Spark ML and create machine learning models¶
## Next Chapter: Evaluating ML models and using pipelines 

# 4 - Evaluating models and intro to Pipelines

In this chapter, you will:

• Learn to evaluate your ML model

• Learn to use Pipelines

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 60kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 40.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=30219f59837dbda0ba5ba608711d18c1bb158a0aeeab9f761248c395d103b7be
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [202]:
from pyspark.sql import SparkSession 
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.fpm import FPGrowth, FPGrowthModel


from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Pipelines") \
    .getOrCreate()

#### Load machine learning model :


Load the models from previous Chapter:

In [203]:
lr_model = LinearRegressionModel.load('linearRegression_model')

In [204]:
fpgrowth_model = FPGrowthModel.load('fpGrowth_model')

Evaluate the models:

For evaluation, load classified test data

In [205]:
df_test = spark.read.parquet("classified_test_data")

[From the docs:](https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html)

While there are many different types of classification algorithms, the evaluation of classification models all shares similar principles. 

In a supervised classification problem, there exists a _true output_ and a _model-generated predicted output_ for each data row. 



## Exercise 1: Evaluate your ML model 

### RegressionEvaluator functionality:


✅ **Task :** 


Start with predicting the outcome:
Use predict function

```python
    model.transform(vectorOfFeatures).select('prediction').show()
```

Notice that transform takes a vector of features as input.

Prediction represents if it's a bot or not.
1- bot
0- human

In [206]:
from pyspark.ml.feature import VectorAssembler

test = df_test.drop('description')
vecAssembler = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
test_df_with_vector = vecAssembler.transform(test)

model_test_prediction = lr_model.transform(test_df_with_vector)
model_test_prediction.select('bot','prediction').show()

+---+-------------------+
|bot|         prediction|
+---+-------------------+
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  1|0.13172978524419984|
+---+-------------------+
only showing top 20 rows



The model gave us a prediction of the chances for a specific row to
be a bot. We got numbers like 0.147 and 0.1021.

It is up to us to define the **threshold** for classifying a bot.
If it shows us 0.9? Will it satisfy us? How certain do we want to be in the classification?

**RegressionEvaluator** is the evaluator for regression-based models

Use regressionEvaluator to evaluate the model.

```pyhon
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="bot",metricName="r2")
R2 = lr_evaluator.evaluate(model_test_prediction)
```

Check out R2 :
>R-squared is a statistical measure of how close the data are to the fitted regression line. It is also known as the coefficient of determination, or the coefficient of multiple determination for multiple regression. 100% indicates that the model explains all the variability of the response data around its mean

From: [RegressionAnalysis](https://blog.minitab.com/blog/adventures-in-statistics-2/regression-analysis-how-do-i-interpret-r-squared-and-assess-the-goodness-of-fit)

Notice `metricName` param:

RegressionEvaluator Supports: - `rmse` (default): root mean squared error - `mse`: mean squared error - `r2`: R Sqaure metric - `mae`: mean absolute error


**Notice!** here we work with the train data and select both `bot` and `prediction` to get a feel for the classifier

In [207]:
from pyspark.ml.feature import VectorAssembler

df_train = spark.read.parquet("classified_train_data")

train = df_train.drop('description')
vecAssemblerTrain = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
vecAssemblerTrain = vecAssemblerTrain.transform(train)

model_train_prediction = lr_model.transform(vecAssemblerTrain)
model_train_prediction.select('bot','prediction').show()

lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="bot",metricName="r2")
R2 = lr_evaluator.evaluate(model_train_prediction)

print("R Squared (R2) on test data = %g" % R2)


+---+-------------------+
|bot|         prediction|
+---+-------------------+
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  1|0.15519549458490411|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  1|0.15519549458490411|
|  0|0.13172978524419984|
|  0|0.13172978524419984|
|  1|0.15519549458490411|
|  0|0.13172978524419984|
+---+-------------------+
only showing top 20 rows

R Squared (R2) on test data = 0.00769653


When looking back at the `Predictions` output, we understand that they don't help us much. 

`Predictions` output is a number between [0,1].


However, we expect 1 or 0: bot or human.
What can we do?
Decide on a threshold.


For Example, every prediction above 0.8 is bot. Bellow 0.8 is human.

Or maybe every prediction above 0.14?

---

✅ Task :

Use model statistics params:

For example
Check RMSE - Root Mean Squared Error
For both train and test.


Code sample:

```python
def getLRSummary(df):
    df = df.drop('description')
    vecAssembler = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
    vecAssembler = vecAssembler.transform(df)
    output_test = vecAssembler.drop('screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name')
    output_test  = output_test.selectExpr("features", "bot as label")
    # evaluate function returns LinearRegressionSummary instance that holds the evaluate results
    return lr_model.evaluate(output_test)
```


Here are function [r2 docs](https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegressionSummary.r2)

Check on both training and test set:

In [208]:

def getLRSummary(df):
    df = df.drop('description')
    vecAssembler = VectorAssembler(inputCols=['screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name'], outputCol="features", handleInvalid = "skip")
    vecAssembler = vecAssembler.transform(df)
    output = vecAssembler.drop('screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name')
    output  = output.selectExpr("features", "bot as label")
    # evaluate function returns LinearRegressionSummary instance that holds the evaluated results
    return lr_model.evaluate(output)
    

train_results = getLRSummary(df_train)
print("Root Mean Squared Error (RMSE) on train data = %g" % train_results.rootMeanSquaredError)

test_results = getLRSummary(df_test)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_results.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on train data = 0.654855
Root Mean Squared Error (RMSE) on test data = 0.296212


`LinearRegressionSummary` gives you a summary of the statistical algorithm evaluations.

In [209]:
print("r2 on test data = %g" % test_results.r2)
print("r2 on train data = %g" % train_results.r2)

r2 on test data = 0.017768
r2 on train data = 0.00769653


#### What did you learn?
What more evaluating params can you get out of LinearRegressionSummary instance?



**Reminder**
What is r2? R Square: 
>R-squared is a statistical measure of how close the data are to the fitted regression line. It is also known as the coefficient of determination, or the coefficient of multiple determination for multiple regression.

R Square measure how much of the variability in `bot` / `label` can be explained using the model.
We must be cautious that the performance on the training set to avoid overfitting of the model to the training set.
Overrfiting can create a model that is good only for the training set and not for the test set.


What is RMSE?
> Root Mean Square Error (RMSE) is the standard deviation of the residuals (prediction errors). Residuals are a measure of how far from the regression line data points are; RMSE is a measure of how spread out these residuals are. In other words, it tells you how concentrated the data is around the line of best fit.


## Exercise 2: Build Simple Spark ML Pipelines

ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help us create and tune practical machine learning pipelines.

In the previous exercise, you learned Logistic regression.

Logistic regression is used when the dependent variable is binary.
In our case, bot is binary - yes or no.

Linear regression is used to predict the continuous dependent variable. 
This explains the result received.


### Spark ML Pipelines

Start with a simple ML Pipelines:

In [210]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer


Remember that in Chapter 1 we split `description` into a list?
Let's do it with the `Tokenizer` functionality instead!

`Tokenizer` is part of the `pyspark.ml.feature`. `pyspark.ml.feature` give us many out of the box functionality for feature extraction. Feature extraction is the _data-science_ way of transforming columns into a new one.

Load saved data:

In [211]:
data = spark.read.parquet('train_data_only_description')
(trainingData, testData) = data.randomSplit([0.7, 0.3])

### Tokenizer, HashingTF, and Logistic Regression

✅ **Task :** 

Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr:


Use the next code sample and adjust it to your needs :
```python
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

```


After we understand that Linear Regression might not be good enough for our data science purposes, we are going to work with Logistic Regression. This is your **3rd** Machine Learning model with Spark ML 🎉

In [212]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


Call fit on Pipeline to get the model:

If it fails here, validates that `description` doesn't have null values.
Our HashingTF doesn't know how to handle null values.
If those exist, create a new DataFrame without them and use the new DataFrame to build the model.

In case you need it:
``` python
    trainingData = trainingData.dropna('description')
```

In [213]:
# Fit the pipeline to training documents.
model = pipeline.fit(trainingData)

Make predictions:

In [214]:
# Make predictions on test documents and print columns of interest.
prediction = model.transform(testData)
selected = prediction.select("description", "probability", "prediction")
for row in selected.collect():
    description, prob, prediction = row
    print("(%s) --> prob=%s, prediction=%f" % (description, str(prob), prediction))

( Northern Ireland""") --> prob=[0.9981671472843673,0.0012341744254552567,5.091466127993252e-05,0.0001421179285769968,5.091466127993252e-05,5.091466127993252e-05,5.091466127993252e-05,5.091466127993252e-05,5.091466127993252e-05,5.091466127993252e-05,0.00010015773264089402], prediction=0.000000
("""Ai__""") --> prob=[0.946808670577785,0.048697351826142016,0.0003768676894340745,0.0011070322277598893,0.0003768676894340745,0.0003768676894340745,0.0003768676894340745,0.0003768676894340745,0.0003768676894340745,0.0003768676894340745,0.0007488715422750489], prediction=0.000000
("""Dunia Tempat Berkumpulnya Para Pecinta One Piece! | Get the latest news about #OnePiece here! | One Piece Chapter 858 |""") --> prob=[0.9975359354820996,0.0018704456857287601,5.144130882591114e-05,0.0001365485513803895,5.144130882591114e-05,5.144130882591114e-05,5.144130882591114e-05,5.144130882591114e-05,5.144130882591114e-05,5.144130882591114e-05,9.698111900906137e-05], prediction=0.000000
("""I am one with the fo

In the text output search for
`prediction=1`

And write in the chat which description got classified as bot!


In the text output search for
`prediction=1`

And write in the chat which description got classified as bot!


## Exercise 3: Put everything together

### CrossValidator, BinaryClassificationEvaluator and ParamGridBuilder functionality

✅ **Task :** 


CrossValidator provide us the ability to run multiple training set and testing set within one function call - 
`fit`.

It runs the evaluation phase and chooses the best parameters.

Read about `CrossValidator` in the [docs](https://spark.apache.org/docs/latest/ml-tuning.html) and integrate it into your pipeline.

In the docs, search for `CrossValidator` `python` example.

Copy the example to the notebook and adjust it to your needs.

From the docs:
>`CrossValidator` - K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping randomly partitioned folds which are used as separate training and test datasets e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the test set exactly once.

<details><summary>Can't find the example, click here! </summary>
<p>

```python
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)
```
    
</p>
</details>


<details><summary>Answer</summary>
<p>

```python
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice


# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainingData)

prediction = cvModel.transform(testData)
selected = prediction.select("description", "probability", "prediction")
for row in selected.collect():
    print(row)
   
```
    
</p>
</details>

In [215]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice


# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(trainingData)

prediction = cvModel.transform(testData)
selected = prediction.select("description", "probability", "prediction")
for row in selected.collect():
    print(row)


Row(description=' Northern Ireland"""', probability=DenseVector([0.8605, 0.1323, 0.0006, 0.0018, 0.0006, 0.0006, 0.0006, 0.0006, 0.0006, 0.0006, 0.0012]), prediction=0.0)
Row(description='"""Ai__"""', probability=DenseVector([0.8948, 0.0988, 0.0005, 0.0016, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0005, 0.0011]), prediction=0.0)
Row(description='"""Dunia Tempat Berkumpulnya Para Pecinta One Piece! | Get the latest news about #OnePiece here! | One Piece Chapter 858 |"""', probability=DenseVector([0.9886, 0.0098, 0.0001, 0.0004, 0.0001, 0.0001, 0.0001, 0.0001, 0.0001, 0.0001, 0.0003]), prediction=0.0)
Row(description='"""I am one with the force', probability=DenseVector([0.9305, 0.0646, 0.0004, 0.0012, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0004, 0.0008]), prediction=0.0)
Row(description='"""One piece_______One piece___Twitter________bot___1_______________________________________________________________________________________________RT_____________"""', probability=DenseVector([0.97

In the text output search for
`prediction=1`

Write in the chat which `description` got classified as a bot!

Notice that in some of the `description` exists the word - `bot`.

Meaning your algorithm found it without being told directly to search for the word bot 🤓

---

In the last task, you used `BinaryClassificationEvaluator` since it is more accurate to our needs. It works with Binary data - bot or human.
`ParamGridBuilder` is a utility that helps us construct a parameter grid for our algorithm. It helps us test out various models built with various params. 

`ParamGridBuilder` is part of `pyspark.ml.tuning` lib. 


## Well Done! 👏👏👏
## You just finished chapter 4: Spark ML - Evaluating models and intro to Pipelines
## This was the last chapter for **Apache Spark ML First Steps**

## I hope you enjoyed it!


[@adipolak](https://twitter.com/AdiPolak)