# ALL ABOUT SPARK  TUTORIAL


It is described as the general engine for large-scale data processing. It is used to analyze big data (enormous data) on local machines and also in clusters of computers using programming languages such as Python, Scala, and Java on cloud service providers such as Amazon Web Services (AWS Elastic MapReduce) and Azure (Databricks).

  ### INSTALLING APACHE SPARK IN PC
Spark runs on top of Scala, and Scala runs on the Java runtime environment. So, we will need to first install Java on our machine. Keep in mind that with Java, we will need to install it in a new folder that we can create on the local disk if we are using Windows.

We will also need to install a python environment such as Anaconda.
  
We will need to configure the paths to these environments for Spark to run. We can run Spark in two main ways that we will be discussing. First, we can save the file that we are running as a .py file, or we can use a notebook such as VS Code where we will create an extension such as a .ipynb file (This will be our approach in this Spark tutorial). You can still create a Jupyter notebook, and it will work just fine. 

## HOW SPARK WORKS
Before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, the RDD was replaced by the Dataset, which is similar to RDD but with more optimizations. Dataset has higher performance than RDD.     
  ##### so how does rdd work? resilient distributed dataframe 
 Let's imagine you have a large dataset that you want to analyze or process. Instead of trying to handle all that data on one computer, which might be slow or even impossible, you decide to spread it across multiple computers to speed things up.

Here's where the Resilient Distributed Dataset (RDD) comes into play. Think of RDD as a way to chop up your big dataset into smaller pieces and distribute those pieces across a bunch of computers, forming a kind of team of processors.

Now, RDD isn't just about spreading data around; it's also about making sure your data is safe. Let's say one of those computers fails or encounters an issue while processing its part of the data. RDD has a way to recover from that failure without losing the whole analysis. It's like having a backup plan in case one of your teammates needs a break.

So, in simple terms, RDD helps you efficiently handle and process large amounts of data by breaking it into manageable pieces, distributing those pieces to different computers, and ensuring that your analysis can withstand hiccups without losing all your progress. It's like teamwork for big data!


##### Dataset
In the dataset(A Dataset is a distributed collection of data) we use the sparksql. A dataframe is a dataset organized into named columns and can be used in a relational database.Datasets were built on top of rdds ,they combine RDDs and dataframes and they provide perfomance benefits of the spark optimizer

    BENEFITS OF DATASET
   
    - Ease of use
   
    - Optimization benefits  its fast
   
    - its sipports bothe batch and real time processing unlike the rdds which only support batch processing

   

### Whats new in spark 3.0

1. It's faster - utilizes processes such as adaptive execution and dynamic partition pruning.

2. There is deep Kubernetes support.

3. There is binary file support.

4. There is depreciation of the MLlib library (for RDD). (As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.)

5. Takes advantage so that we can use GPU hardware.

6. Makes use of Spark GraphX for analyzing social networks and how people are connected.

7. There is ACID support for data lakes with Delta Lake.

### Advanatages of using the dataset api rather than the rdd.
1.Its scalable (can be used on many machines)
2.its fast
3.its hot (its common everywhere)
4its easier to code in python,java and scala
5.sparks runs everywhere- runs in hadoop,kubernetes,standaloneand in the cloud

###### components of the spark core
 1.mllib

 2.spark streaming
 
 3.sparksql
 
 4.Graphx
  ### Why are people moving to the sparksql(dataframe)
   -contains row objects

   -can run sql queries
 
   -can have a schema
 
  -can read and write to json

   -also communcates with a jbdc connection

   -can use a different mapper(line) to map a header to a dataset

   -To use sql we infer the schema and register the dataframe as a table

   - we use functions to avoid using rdds

   #### why use python ?
   1. There is no need to compile and manage dependencies
   2. There is less coding overhead
   3. Chances are  you alrady know python
   #### Benefits of using it with scala
   1. Scala is a more popular choice than spark
   2. Spark is built in scala, so coding in scala is 'native ' to spark
   3. New features and libraries tend to be scala first

### Here is how our tuitorial will look like
 1.We will learn how to use pyspark from the basics

 2.we will explore how to use rdds and write some queries
 
 3.we will move on to spark sql
 
 4 we will go to basics od mllib and progress to more complex mllib such as tunning algorithms
 
 5 we will then learn on spark streaming

#### We will be looking at spark sql and basics how to work eith datasets and ways of querying data using sparksql

In [1]:
#### After fully configuring the spark (java,python,and hadoop)it should run succesfully
import pyspark

In [2]:
### We will be rading datasets using the normal pandas  and we wil see how different it is from reading dataframe using spark
import pandas as pd
df=pd.read_csv('data.csv')

In [3]:
#### We will be looking at the first rows of our data (we have just created this dataset for some simple manipulations however when we delve into deeper part of our project we will venture into more complex datasets)
df.head()

Unnamed: 0,name,age
0,simon,34
1,brenda,12
2,sani,23
3,mark,45
4,john,79


In [4]:
#### We will need to first import spark session
from pyspark.sql import SparkSession

In [5]:
##We will also need to create the spark session and give it a name and remember after every spark session we will need to terminante it.
spark=SparkSession.builder.appName('data').getOrCreate()

In [6]:
spark

In [52]:
## we  will be reading the dataframe using pyspark using the code below (HOWEVER THIS IS NOT THE IDEAL WAY SINCE WE CAN SEE THAT EVEN THE HEADERS HAS BEEN MADE AS PART OF OUR DATAFRAME)
df_spark1 = spark.read.csv('data.csv')

In [53]:
df_spark1.show()

+------+----+
|   _c0| _c1|
+------+----+
|  name| age|
| simon|  34|
|brenda|  12|
|  sani|  23|
|  mark|  45|
|  john|  79|
|   sam|  67|
| bruce|  33|
| henry|  20|
+------+----+



In [54]:
## we will need to look at the schema but  we cannot read it effectively since we have not loaded our data correctly
df_spark1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [55]:
### this is the ideal way to read dataframes in pyspark where we set the header as true this makes sure that the header columns are not part of our dataset and the schema is well recognized 
### such that pyspark recognizes the correct datatypes in each column 
df_sparks= spark.read.csv('data.csv',header=True,inferSchema=True)

In [56]:
df_sparks.show()

+------+----+
|  name| age|
+------+----+
| simon|  34|
|brenda|  12|
|  sani|  23|
|  mark|  45|
|  john|  79|
|   sam|  67|
| bruce|  33|
| henry|  20|
+------+----+



In [13]:
#### Now we can see that it has correctly recognized datatypes such as integer before it regarded all datatypes as strings
df_sparks.printSchema()

root
 |-- name: string (nullable = true)
 |--  age: integer (nullable = true)



In [14]:
type(df_sparks)

pyspark.sql.dataframe.DataFrame

In [15]:
### when we want to identify the columns of our data
df_sparks.columns

['name', ' age']

In [57]:
## We want only to get data from our 'name' column
df_sparks.select('name').show()

+------+
|  name|
+------+
| simon|
|brenda|
|  sani|
|  mark|
|  john|
|   sam|
| bruce|
| henry|
+------+



In [17]:
### we are looking at the datatypes of our data
df_sparks.dtypes

[('name', 'string'), (' age', 'int')]

In [18]:
### similar to pandas pysaprk also can do statistical analysis of our data using the .describe function
df_sparks.describe().show()

+-------+------+------------------+
|summary|  name|               age|
+-------+------+------------------+
|  count|     8|                 8|
|   mean|  NULL|            39.125|
| stddev|  NULL|23.381540337869712|
|    min|brenda|                12|
|    max| simon|                79|
+-------+------+------------------+



In [58]:
## These will be complex a little bit where we will be ranking the ages and we will add another column showing the rank of these
from pyspark.sql import Window
from pyspark.sql.functions import col, rank

# Assuming df_sparks is your DataFrame
windowSpec = Window.orderBy(col(' age').desc())

df_sparks = df_sparks.withColumn('age_rank', rank().over(windowSpec))

df_sparks.show()





+------+----+--------+
|  name| age|age_rank|
+------+----+--------+
|  john|  79|       1|
|   sam|  67|       2|
|  mark|  45|       3|
| simon|  34|       4|
| bruce|  33|       5|
|  sani|  23|       6|
| henry|  20|       7|
|brenda|  12|       8|
+------+----+--------+



In [36]:
##We will be dropping a column
df_sparks.drop("Experience after 2 years").show()

+------+----+
|  name| age|
+------+----+
| simon|  34|
|brenda|  12|
|  sani|  23|
|  mark|  45|
|  john|  79|
|   sam|  67|
| bruce|  33|
| henry|  20|
+------+----+



In [38]:
## We will see how we can rename our 'name' column to 'Name'
df_sparks.withColumnRenamed('name','Name').show()

+------+----+
|  Name| age|
+------+----+
| simon|  34|
|brenda|  12|
|  sani|  23|
|  mark|  45|
|  john|  79|
|   sam|  67|
| bruce|  33|
| henry|  20|
+------+----+



In [59]:
spark.stop()

#### We will load another dataframe and we will do some manipulation on these dataframe( Hope we still learning)
###  We will be working with handling missing values and data

In [40]:
##We will start another sesssion
from pyspark.sql import SparkSession

In [42]:
### Remember we wil need to initiate another sparksession 
spark=SparkSession.builder.appName('missing').getOrCreate()

In [43]:
spark

In [44]:
## Remember this is the ideal way of loading data when learning pyspark
df_sparks1= spark.read.csv('missing.csv',header=True,inferSchema=True)

In [48]:
##Remember we have loaded another dataset that has null values
df_sparks1.show()

+-----+----+----------+------+
| name| age|experience|salary|
+-----+----+----------+------+
|simon|  34|         2|  NULL|
| mark|  45|        22| 34000|
|  ivy|NULL|         1|  2000|
|  eva|  50|        34| 80000|
|  eve|  78|        35|  NULL|
|brian|  55|        23| 56000|
| josh|  66|      NULL| 78000|
+-----+----+----------+------+



In [49]:
### We will be dropping the null values and this approach is not advocated for since its drops  the whole row even when missing only one value
df_sparks1.na.drop().show()

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
| mark| 45|        22| 34000|
|  eva| 50|        34| 80000|
|brian| 55|        23| 56000|
+-----+---+----------+------+



In [51]:
### We will be introducing another function where we add 'all' .This only drops a whole row if all the values in the colum are null
df_sparks1.na.drop(how='all').show()

+-----+----+----------+------+
| name| age|experience|salary|
+-----+----+----------+------+
|simon|  34|         2|  NULL|
| mark|  45|        22| 34000|
|  ivy|NULL|         1|  2000|
|  eva|  50|        34| 80000|
|  eve|  78|        35|  NULL|
|brian|  55|        23| 56000|
| josh|  66|      NULL| 78000|
+-----+----+----------+------+



In [52]:
### we can add another input to our function thats a threshold where we will be dropping a row when all the null values exceed a certain number
df_sparks1.na.drop(how='all',thresh=2).show()

+-----+----+----------+------+
| name| age|experience|salary|
+-----+----+----------+------+
|simon|  34|         2|  NULL|
| mark|  45|        22| 34000|
|  ivy|NULL|         1|  2000|
|  eva|  50|        34| 80000|
|  eve|  78|        35|  NULL|
|brian|  55|        23| 56000|
| josh|  66|      NULL| 78000|
+-----+----+----------+------+



In [53]:
### we further can add input to our function when we are narrowing down to how we are dropping the columns (in the below code we are narrowing down to the column 'salary')
df_sparks1.na.drop(how='all',subset=['salary']).show()

+-----+----+----------+------+
| name| age|experience|salary|
+-----+----+----------+------+
| mark|  45|        22| 34000|
|  ivy|NULL|         1|  2000|
|  eva|  50|        34| 80000|
|brian|  55|        23| 56000|
| josh|  66|      NULL| 78000|
+-----+----+----------+------+



### we will be filling the missing (null)values using the mean or median.Mostly median is advocated for to reduce the effect of skeweness of our data.We will be filling our missing values using the median and the mean  respectively



In [56]:
imputer.fit(df_sparks1).transform(df_sparks1).show()

+-----+----+----------+------+-----------+------------------+--------------+
| name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+-----+----+----------+------+-----------+------------------+--------------+
|simon|  34|         2|  NULL|         34|                 2|         50000|
| mark|  45|        22| 34000|         45|                22|         34000|
|  ivy|NULL|         1|  2000|         54|                 1|          2000|
|  eva|  50|        34| 80000|         50|                34|         80000|
|  eve|  78|        35|  NULL|         78|                35|         50000|
|brian|  55|        23| 56000|         55|                23|         56000|
| josh|  66|      NULL| 78000|         66|                19|         78000|
+-----+----+----------+------+-----------+------------------+--------------+



In [57]:
###We have filled now using the median
from pyspark.ml.feature import Imputer
imputer =Imputer(
    inputCols=['age','experience','salary'],
    outputCols=["{}_imputed".format(c)for c in ['age','experience','salary']]
).setStrategy("median")

In [58]:
## now we have the dataframe combined with the new data (the dataframe where all the missing values are filled with the median values)
imputer.fit(df_sparks1).transform(df_sparks1).show()

+-----+----+----------+------+-----------+------------------+--------------+
| name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+-----+----+----------+------+-----------+------------------+--------------+
|simon|  34|         2|  NULL|         34|                 2|         56000|
| mark|  45|        22| 34000|         45|                22|         34000|
|  ivy|NULL|         1|  2000|         50|                 1|          2000|
|  eva|  50|        34| 80000|         50|                34|         80000|
|  eve|  78|        35|  NULL|         78|                35|         56000|
|brian|  55|        23| 56000|         55|                23|         56000|
| josh|  66|      NULL| 78000|         66|                22|         78000|
+-----+----+----------+------+-----------+------------------+--------------+



#### FILTER OPERATIONS

In [59]:
##3 we will be filtering the rows where the salary is less or equal to 50000
df_sparks1.filter('salary<=50000').show()

+----+----+----------+------+
|name| age|experience|salary|
+----+----+----------+------+
|mark|  45|        22| 34000|
| ivy|NULL|         1|  2000|
+----+----+----------+------+



In [60]:
### we will be only selecting the two columns where the data meets the condition we have above
df_sparks1.filter('salary<=50000').select(['name','age']).show()

+----+----+
|name| age|
+----+----+
|mark|  45|
| ivy|NULL|
+----+----+



In [64]:
### We will be  filering the data where salary is between 20000 and 80000
df_sparks1.filter((df_sparks1['salary']<=80000)& (df_sparks1['salary']>=20000)).show()

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
| mark| 45|        22| 34000|
|  eva| 50|        34| 80000|
|brian| 55|        23| 56000|
| josh| 66|      NULL| 78000|
+-----+---+----------+------+



In [65]:
### We will be selecting only two columns whose criteria meets the threshold we have above
df_sparks1.filter((df_sparks1['salary']<=80000)& (df_sparks1['salary']>=20000)).select(['name','age']).show()

+-----+---+
| name|age|
+-----+---+
| mark| 45|
|  eva| 50|
|brian| 55|
| josh| 66|
+-----+---+



In [66]:
### We will have to stop the spark session and we will have to terminate it
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001CD025DFD10>>

### GROUP BY AND FUNCTIONS

In [67]:
###since we are starting another spark session we will have to terminate our previous session
from pyspark.sql import SparkSession

In [77]:
### We willl have to build a sparksession
sparks=SparkSession.builder.appName('dept').getOrCreate()

In [78]:
sparks

In [79]:
### we will load our data in spark
df_spark2= spark.read.csv('department.csv',header=True,inferSchema=True)

In [80]:
df_spark2.show()

+-----+-------+------+
| name|   dept|salary|
+-----+-------+------+
|simon|   data| 20000|
|simon|bigdata| 40000|
|simon|     ux| 30000|
| mark|   data| 45000|
|  ben|     ux| 56000|
|  tom|bigdata| 22000|
|  ben|     ux| 34000|
|  tom|   data| 12000|
| mark|     ux|  1000|
+-----+-------+------+



In [81]:
### We will be looking at our datatypes and also this helps in looking if there are nulls in our data
df_spark2.printSchema()

root
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: integer (nullable = true)



In [82]:
### As we have seen from the data above we will be looking the sum of salry where we will be grouping by names
df_spark2.groupBy('name').sum().show()

+-----+-----------+
| name|sum(salary)|
+-----+-----------+
|simon|      90000|
|  ben|      90000|
| mark|      46000|
|  tom|      34000|
+-----+-----------+



In [83]:
### we will be grouping the data using the dept column 
df_spark2.groupBy('dept').sum().show()

+-------+-----------+
|   dept|sum(salary)|
+-------+-----------+
|   data|      77000|
|bigdata|      62000|
|     ux|     121000|
+-------+-----------+



In [85]:
### we will be looking for the mean where we will be grouping by name
df_spark2.groupBy('name').mean().show()

+-----+-----------+
| name|avg(salary)|
+-----+-----------+
|simon|    30000.0|
|  ben|    45000.0|
| mark|    23000.0|
|  tom|    17000.0|
+-----+-----------+



In [86]:
### We will be counting the number of items by group using the dept column
df_spark2.groupBy('dept').count().show()

+-------+-----+
|   dept|count|
+-------+-----+
|   data|    3|
|bigdata|    2|
|     ux|    4|
+-------+-----+



In [87]:
### we will be aggregating all the salary of all  the members in our department
df_spark2.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|     260000|
+-----------+



# RESILIENT DISTRIBUTED DATASET

Altough depreciating rdds cannot be ignored in matters relating to spark .Its still supported and its still useful.We will be manipulating data using both rdd and sparksql and we will be seeing the differenece in the execution of coe and we will definitely see the easier approach of the two

### Kindly remember we will be comparing all the queries we have learnt using rdd to the dataframe using sparksql below and see how different they are from each other

In [3]:
#We will be implementing a script that finds the average number of friends by age  in our 'fake friends'.csv 
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster('local').setAppName("friends")
sc= SparkContext(conf=conf)
## in this function when you look at our data we have choosen our third and fourth  column  and we cast them as interger values so we can perfom arithmetic calculations on them look at the data
# and see that we have ignored our first and second column
def parseLine(line):
    fields=line.split(',')
    age=int(fields[2])
    numFriends=int(fields[3])
    return(age,numFriends)
## we load our data
lines =sc.textFile('fakefriends.csv')

## now we have a new rdd that contains the age and the number of friends in our dataset

rdd=lines.map(parseLine)

# In the first lambda it ensures that our first column(age column) remains untouched [map values leaves the key untouched  (in our data the key is the age column)] and  the values (numfriends)
#are put into our function
#The second lambda  aggregates everything together for each age(new rdd)
totals=rdd.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+ y[1]))

## Total number of friends and total number of times that age was encountered and divide  and we  get the final result
averages=totals.mapValues(lambda x: x[0] /x[1])
results=averages.collect()
for result in results:
    print (result)

(33, 325.3333333333333)
(26, 242.05882352941177)
(55, 295.53846153846155)
(40, 250.8235294117647)
(68, 269.6)
(59, 220.0)
(37, 249.33333333333334)
(54, 278.0769230769231)
(38, 193.53333333333333)
(27, 228.125)
(53, 222.85714285714286)
(57, 258.8333333333333)
(56, 306.6666666666667)
(43, 230.57142857142858)
(36, 246.6)
(22, 206.42857142857142)
(35, 211.625)
(45, 309.53846153846155)
(60, 202.71428571428572)
(67, 214.625)
(19, 213.27272727272728)
(30, 235.8181818181818)
(51, 302.14285714285717)
(25, 197.45454545454547)
(21, 350.875)
(42, 303.5)
(49, 184.66666666666666)
(48, 281.4)
(50, 254.6)
(39, 169.28571428571428)
(32, 207.9090909090909)
(58, 116.54545454545455)
(64, 281.3333333333333)
(31, 267.25)
(52, 340.6363636363636)
(24, 233.8)
(20, 165.0)
(62, 220.76923076923077)
(41, 268.55555555555554)
(44, 282.1666666666667)
(69, 235.2)
(65, 298.2)
(61, 256.22222222222223)
(28, 209.1)
(66, 276.44444444444446)
(46, 223.69230769230768)
(29, 215.91666666666666)
(18, 343.375)
(47, 233.22222222222

In [7]:
sc.stop()

### FILTERING IN RDD

In [8]:
## In the follwoing  code we will be running to find the minimum weather observed in the year 1800 we will be using the 1800.csv

# importing the dependencies
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

# our function is splitting each lines by coommas extacting the station is from first field and entry type  and extracting the temperature from the thirs field and
# change it to float and convert it to F
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("1800.csv")
parsedLines = lines.map(parseLine)

# we will be looking at the min if its not min the it does not get passed on
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

# we combine all the minimum values for each rdd and find the min
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))



ITE00100554	5.36F
EZE00100082	7.70F


In [12]:
sc.stop()

In [13]:
### In the following rdd  its adds up the toal amount spent by each customer .
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("SpendByCustomerSorted")
sc = SparkContext(conf = conf)

#The customer id becomes our key and the values becomes the amounnt spent
def extractCustomerPricePairs(line):
    fields = line.split(',')
    return (int(fields[0]), float(fields[2]))

input = sc.textFile("customer-orders.csv")
mappedInput = input.map(extractCustomerPricePairs)

 ## adds all the vlaues encountered for all our customer id 
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)

#Changed for Python 3 compatibility  :the below two lines of code just sort the values from  the least to the largest value
#flipped = totalByCustomer.map(lambda (x,y):(y,x))
flipped = totalByCustomer.map(lambda x: (x[1], x[0]))

totalByCustomerSorted = flipped.sortByKey()

results = totalByCustomerSorted.collect();
for result in results:
    print(result)

(3309.38, 45)
(3790.570000000001, 79)
(3924.230000000001, 96)
(4042.6499999999987, 23)
(4172.289999999998, 99)
(4178.500000000001, 75)
(4278.049999999997, 36)
(4297.260000000001, 98)
(4316.299999999999, 47)
(4327.729999999999, 77)
(4367.62, 13)
(4384.33, 48)
(4394.599999999999, 49)
(4475.569999999999, 94)
(4505.79, 67)
(4517.27, 50)
(4524.509999999999, 78)
(4561.069999999999, 5)
(4628.4, 57)
(4635.799999999997, 83)
(4642.259999999999, 91)
(4647.129999999999, 74)
(4652.939999999999, 84)
(4659.63, 3)
(4664.589999999998, 12)
(4681.919999999999, 66)
(4701.019999999999, 56)
(4707.41, 21)
(4727.860000000001, 80)
(4735.030000000001, 14)
(4735.200000000002, 37)
(4755.070000000001, 7)
(4756.8899999999985, 44)
(4765.05, 31)
(4812.489999999998, 82)
(4815.050000000002, 4)
(4819.700000000001, 10)
(4830.549999999999, 88)
(4836.859999999999, 20)
(4851.479999999999, 89)
(4876.840000000002, 95)
(4898.460000000002, 38)
(4904.209999999999, 76)
(4908.81, 86)
(4915.889999999999, 27)
(4921.27, 18)
(4945.299

### As we have seen the implementation of  rdds in code ,Its the original way of using spark and as we can see its helpful to learn as it still widely used in manipulation of data using spark we will then comapare the code we have seen so far while using rdd to using saprk sql .awe will be comparing which one is easier implementing the code above and also the run time for the code

## comparing the rdd to sparksql

In [14]:
sc.stop()

In [40]:
# In the first code we were finding the number of friends by age and we will compare that code while using sparksql(uses the  dataframe api)
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as functions

spark =SparkSession.builder.appName('friends').getOrCreate()

### we will load our data in spark
lines= spark.read.csv('fakefriends-header.csv',header=True,inferSchema=True)

# Select the columns "age" and "friends"
friendsbyage = lines.select("age", "friends")

# Group by 'age' and calculate the average number of friends, rounding to 2 decimal places
average_friends_by_age = friendsbyage.groupBy('age').agg(round(avg('friends'), 2).alias('average_friends'))

# Order the DataFrame by 'age'
sorted_average_friends = average_friends_by_age.orderBy('age')

# Show the result
sorted_average_friends.show()




+---+---------------+
|age|average_friends|
+---+---------------+
| 18|         343.38|
| 19|         213.27|
| 20|          165.0|
| 21|         350.88|
| 22|         206.43|
| 23|          246.3|
| 24|          233.8|
| 25|         197.45|
| 26|         242.06|
| 27|         228.13|
| 28|          209.1|
| 29|         215.92|
| 30|         235.82|
| 31|         267.25|
| 32|         207.91|
| 33|         325.33|
| 34|          245.5|
| 35|         211.63|
| 36|          246.6|
| 37|         249.33|
+---+---------------+
only showing top 20 rows



In [41]:
spark.stop()

In [42]:
## In the following line fo code we will  be finding the min temperature  where we will create a schema to convert our data to a datframe
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark = SparkSession.builder.appName("MinTemperatures").getOrCreate()

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])

# // Read the file as dataframe
df = spark.read.schema(schema).csv("1800.csv")
df.printSchema()

# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")

# Select only stationID and temperature
stationTemps = minTemps.select("stationID", "temperature")

# Aggregate to find minimum temperature for every station
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()

# Convert temperature to fahrenheit and sort the dataset
minTempsByStationF = minTempsByStation.withColumn("temperature",
                                                  func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
                                                  .select("stationID", "temperature").sort("temperature")
                                                  
# Collect, format, and print the results
results = minTempsByStationF.collect()

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
    
spark.stop()


root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+

ITE00100554	5.36F
EZE00100082	7.70F


In [43]:
## We will also be looking for the total amount spent by a customers
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

spark = SparkSession.builder.appName("TotalSpentByCustomer").master("local[*]").getOrCreate()

# Create schema when reading customer-orders
customerOrderSchema = StructType([ \
                                  StructField("cust_id", IntegerType(), True),
                                  StructField("item_id", IntegerType(), True),
                                  StructField("amount_spent", FloatType(), True)
                                  ])

# Load up the data into spark dataset
customersDF = spark.read.schema(customerOrderSchema).csv("customer-orders.csv")

totalByCustomer = customersDF.groupBy("cust_id").agg(func.round(func.sum("amount_spent"), 2) \
                                      .alias("total_spent"))

totalByCustomerSorted = totalByCustomer.sort("total_spent")

totalByCustomerSorted.show(totalByCustomerSorted.count())

spark.stop()


+-------+-----------+
|cust_id|total_spent|
+-------+-----------+
|     45|    3309.38|
|     79|    3790.57|
|     96|    3924.23|
|     23|    4042.65|
|     99|    4172.29|
|     75|     4178.5|
|     36|    4278.05|
|     98|    4297.26|
|     47|     4316.3|
|     77|    4327.73|
|     13|    4367.62|
|     48|    4384.33|
|     49|     4394.6|
|     94|    4475.57|
|     67|    4505.79|
|     50|    4517.27|
|     78|    4524.51|
|      5|    4561.07|
|     57|     4628.4|
|     83|     4635.8|
|     91|    4642.26|
|     74|    4647.13|
|     84|    4652.94|
|      3|    4659.63|
|     12|    4664.59|
|     66|    4681.92|
|     56|    4701.02|
|     21|    4707.41|
|     80|    4727.86|
|     14|    4735.03|
|     37|     4735.2|
|      7|    4755.07|
|     44|    4756.89|
|     31|    4765.05|
|     82|    4812.49|
|      4|    4815.05|
|     10|     4819.7|
|     88|    4830.55|
|     20|    4836.86|
|     89|    4851.48|
|     95|    4876.84|
|     38|    4898.46|
|     76| 

##### We have seen how the two codes compare and we can see the using the dataframe api is realtively easier of the two to code and  also the time spent in execution of the code is relatively  faster  and thus some of the reason why many developers and many people in the industry are going for the dataframe api .These are some of the codes that are been implemented  and examples below we willl be looking at  writing indepth pyspark queries for manipulation using BIG DATA.

## MACHINE LEARNING

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

1. ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering

2. Featurization: feature extraction, transformation, dimensionality reduction, and selection

3. Pipelines: tools for constructing, evaluating, and tuning ML Pipelines

4. Persistence: saving and load algorithms, models, and Pipelines

5. Utilities: linear algebra, statistics, data handling, etc.


As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.


#### What are the implications?

1. MLlib will still support the RDD-based API in spark.mllib with bug fixes.
2. MLlib will not add new features to the RDD-based API.
3. In the Spark 2.x releases, MLlib will add features to the DataFrames-based API to reach feature parity with the RDD-based API.

#### Why is MLlib switching to the DataFrame-based API?

1. DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.
2. The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages.
3. DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the Pipelines guide for details.
   What is “Spark ML”?

“Spark ML” is not an official name but occasionally used to refer to the MLlib DataFrame-based API. This is majorly due to the org.apache.spark.ml Scala package name used by the DataFrame-based API, and the “Spark ML Pipelines” term we used initially to emphasize the pipeline concept.

#### Is MLlib deprecated?

No. MLlib includes both the RDD-based API and the DataFrame-based API. The RDD-based API is now in maintenance mode. But neither API is deprecated, nor MLlib as a whole.

These are some of the main points  that are worth noting as we shift  to using the dataframe api .You can refer to the documentation for further information

#### Why Mllib in machine learning is becoming more used

1. Ease of use -Usable in Java, Scala, Python, and R.
2. Perfomance - High-quality algorithms, 100x faster than MapReduce.
3. Runs everywhere- Spark runs on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud, against diverse data sources.

##### In the code blocks  we will be implementing machine learning from basics and we scale towards bulding more accurate by also tuning the hyperparameters to achieve better accuracy.We will be using simple python scripts and scale towards complex queries making thie machine learning tuitorial begginner friendly as possible.  We will be perfoming a regression task and a classification task in this tuitorial .(Hope we still learning)

### PYSAPRK REGRESSION MODEL ALGORHTHIM 

##### In the following notebook we will be predicting the prices of houses in paris based on 1.SquareMetres 2. Number of rooms 3. Whether it has a yard or not 4.Whether has a pool or not 5. Number of floors 6.city code 7CityPartRange 8. Number of previous owners 9.is new built 10. Whether has a storm Protector or not 11. Whether has a basement or not 12. Whether has a garage 13. Whether has a storage room or not 14. Has a guest room

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('housing').getOrCreate()
house_df = spark.read.csv('ParisHousing.csv', header = True, inferSchema = True)
house_df.printSchema()

root
 |-- squareMeters: integer (nullable = true)
 |-- numberOfRooms: integer (nullable = true)
 |-- hasYard: integer (nullable = true)
 |-- hasPool: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- cityCode: integer (nullable = true)
 |-- cityPartRange: integer (nullable = true)
 |-- numPrevOwners: integer (nullable = true)
 |-- made: integer (nullable = true)
 |-- isNewBuilt: integer (nullable = true)
 |-- hasStormProtector: integer (nullable = true)
 |-- basement: integer (nullable = true)
 |-- attic: integer (nullable = true)
 |-- garage: integer (nullable = true)
 |-- hasStorageRoom: integer (nullable = true)
 |-- hasGuestRoom: integer (nullable = true)
 |-- price: double (nullable = true)



In [8]:
# In the below code we will be looking a the distribution of all the numerical columns in our dataset
house_df.describe().toPandas()


Unnamed: 0,summary,squareMeters,numberOfRooms,hasYard,hasPool,floors,cityCode,cityPartRange,numPrevOwners,made,isNewBuilt,hasStormProtector,basement,attic,garage,hasStorageRoom,hasGuestRoom,price
0,count,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0
1,mean,49870.1312,50.3584,0.5087,0.4968,50.2763,50225.4861,5.5101,5.5217,2005.4885,0.4991,0.4999,5033.1039,5028.0106,553.1212,0.503,4.9946,4993447.525749963
2,stddev,28774.37535029503,28.81669636927458,0.4999493023602426,0.5000147612582521,28.88917127111252,29006.675799293174,2.87202417160515,2.856666792700276,9.308089589340009,0.5000241918339955,0.5000249918746555,2876.7295448116365,2894.3322098165813,262.0501698906411,0.5000160013441162,3.1764098913678978,2877424.109945015
3,min,89.0,1.0,0.0,0.0,1.0,3.0,1.0,1.0,1990.0,0.0,0.0,0.0,1.0,100.0,0.0,0.0,10313.5
4,max,99999.0,100.0,1.0,1.0,100.0,99953.0,10.0,10.0,2021.0,1.0,1.0,10000.0,10000.0,1000.0,1.0,10.0,10006771.2


In [9]:

# Here we will look at the correlation of our indipendent variables in respect to our target variable column

import six
for i in house_df.columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to for price ", i, house_df.stat.corr('price',i))

Correlation to for price  squareMeters 0.9999993570640745
Correlation to for price  numberOfRooms 0.009590905935479128
Correlation to for price  hasYard -0.006119244882540526
Correlation to for price  hasPool -0.00507034083386251
Correlation to for price  floors 0.0016542562406504926
Correlation to for price  cityCode -0.001539367348580816
Correlation to for price  cityPartRange 0.008812911660535336
Correlation to for price  numPrevOwners 0.016618826067943387
Correlation to for price  made -0.007209526254690673
Correlation to for price  isNewBuilt -0.010642774359518865
Correlation to for price  hasStormProtector 0.0074959113342807394
Correlation to for price  basement -0.003967482178851144
Correlation to for price  attic -0.000599514077496332
Correlation to for price  garage -0.017229051207338166
Correlation to for price  hasStorageRoom -0.0034852993013792864
Correlation to for price  hasGuestRoom -0.0006439241048174541
Correlation to for price  price 1.0


In [10]:

house_df.dtypes

[('squareMeters', 'int'),
 ('numberOfRooms', 'int'),
 ('hasYard', 'int'),
 ('hasPool', 'int'),
 ('floors', 'int'),
 ('cityCode', 'int'),
 ('cityPartRange', 'int'),
 ('numPrevOwners', 'int'),
 ('made', 'int'),
 ('isNewBuilt', 'int'),
 ('hasStormProtector', 'int'),
 ('basement', 'int'),
 ('attic', 'int'),
 ('garage', 'int'),
 ('hasStorageRoom', 'int'),
 ('hasGuestRoom', 'int'),
 ('price', 'double')]

#### missing values

#### after looking  whether our data has null values ,remember the previous code that we have written in earlier stages of our tuitorial ,then we could have have used those methods to filll in the missing values.keep note that there is no defined way of handling missing values and the approach that we use mainly depend on the specific data we are using

In [11]:

from pyspark.sql.functions import col, sum
null_counts = house_df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in house_df.columns])

# Show the result
null_counts.show()

+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+-----+
|squareMeters|numberOfRooms|hasYard|hasPool|floors|cityCode|cityPartRange|numPrevOwners|made|isNewBuilt|hasStormProtector|basement|attic|garage|hasStorageRoom|hasGuestRoom|price|
+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+-----+
|           0|            0|      0|      0|     0|       0|            0|            0|   0|         0|                0|       0|    0|     0|             0|           0|    0|
+------------+-------------+-------+-------+------+--------+-------------+-------------+----+----------+-----------------+--------+-----+------+--------------+------------+-----+



#### model building
 ##### In our data as we can clearly see all the data is an integer format.Thus we will not be encoding our data,However for general knowledge we would have used the string indexer to encode our values so that our machine learning model can understand our data. However we have handles this in the classification tuitorial that we will be learning as we progress into later stages of  this tuitorial.

### we will be using a vector assemblor just as the name suggests it will be a vector where we combine all our indipendent column (x values if we were building ml models in pandas)into one long vector.The VectorAssembler is commonly used in Spark machine learning pipelines to prepare features for model training. By combining multiple input columns into a single vector column, it enables easier processing and compatibility with Spark ML algorithms that expect input in vector form.

In [12]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = 
['squareMeters', 'numberOfRooms', 'hasYard', 'hasPool', 'floors', 'cityCode', 'cityPartRange', 'numPrevOwners', 'made', 'isNewBuilt', 'hasStormProtector', 'basement', 'attic', 'garage', 'hasStorageRoom', 'hasGuestRoom'
], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_df)

vhouse_df.take(1)

[Row(squareMeters=75523, numberOfRooms=3, hasYard=0, hasPool=1, floors=63, cityCode=9373, cityPartRange=3, numPrevOwners=8, made=2005, isNewBuilt=0, hasStormProtector=1, basement=4313, attic=9005, garage=956, hasStorageRoom=0, hasGuestRoom=7, price=7559081.5, features=DenseVector([75523.0, 3.0, 0.0, 1.0, 63.0, 9373.0, 3.0, 8.0, 2005.0, 0.0, 1.0, 4313.0, 9005.0, 956.0, 0.0, 7.0]))]

In [13]:
column_list = house_df.columns
print(column_list)

['squareMeters', 'numberOfRooms', 'hasYard', 'hasPool', 'floors', 'cityCode', 'cityPartRange', 'numPrevOwners', 'made', 'isNewBuilt', 'hasStormProtector', 'basement', 'attic', 'garage', 'hasStorageRoom', 'hasGuestRoom', 'price']


In [14]:
vhouse_df = vhouse_df.select(['features', 'price'])
vhouse_df.show(3)

+--------------------+---------+
|            features|    price|
+--------------------+---------+
|[75523.0,3.0,0.0,...|7559081.5|
|[80771.0,39.0,1.0...|8085989.5|
|[55712.0,58.0,0.0...|5574642.1|
+--------------------+---------+
only showing top 3 rows



In [15]:
# we will be training our data into our train and test dataframe
splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

### LINEAR REGRESSION MODEL

In [16]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))


Coefficients: [100.00021861637728,-0.5175076000027911,2980.046796688874,2980.973072158858,54.85448717034218,-0.0013110530884183462,47.626320875614155,-1.1101269980230821,-3.1465764010972412,108.76773471426917,130.1536880715282,0.0014356327042063847,-0.006889181027848809,0.11070896922565554,33.642134737808966,-4.465488608949495]
Intercept: 6658.399968358891


In [17]:

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1900.416648
r2: 1.000000


In [23]:
import pickle
import numpy as np

In [19]:
house_df.dtypes

[('squareMeters', 'int'),
 ('numberOfRooms', 'int'),
 ('hasYard', 'int'),
 ('hasPool', 'int'),
 ('floors', 'int'),
 ('cityCode', 'int'),
 ('cityPartRange', 'int'),
 ('numPrevOwners', 'int'),
 ('made', 'int'),
 ('isNewBuilt', 'int'),
 ('hasStormProtector', 'int'),
 ('basement', 'int'),
 ('attic', 'int'),
 ('garage', 'int'),
 ('hasStorageRoom', 'int'),
 ('hasGuestRoom', 'int'),
 ('price', 'double')]

In [21]:
house_df.head()

Row(squareMeters=75523, numberOfRooms=3, hasYard=0, hasPool=1, floors=63, cityCode=9373, cityPartRange=3, numPrevOwners=8, made=2005, isNewBuilt=0, hasStormProtector=1, basement=4313, attic=9005, garage=956, hasStorageRoom=0, hasGuestRoom=7, price=7559081.5)

In [13]:

lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","price","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-------+--------------------+
|        prediction|  price|            features|
+------------------+-------+--------------------+
|13183.681253584029|13229.1|[123.0,61.0,0.0,0...|
|16647.408403486486|15488.0|[128.0,38.0,0.0,1...|
|20891.090756537975|22670.7|[141.0,16.0,0.0,1...|
|17874.423979471714|17071.0|[143.0,27.0,0.0,0...|
|16317.835583469037|16799.2|[148.0,91.0,0.0,0...|
+------------------+-------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 1


In [14]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","price","features").show()

+------------------+-------+--------------------+
|        prediction|  price|            features|
+------------------+-------+--------------------+
|13183.681253584029|13229.1|[123.0,61.0,0.0,0...|
|16647.408403486486|15488.0|[128.0,38.0,0.0,1...|
|20891.090756537975|22670.7|[141.0,16.0,0.0,1...|
|17874.423979471714|17071.0|[143.0,27.0,0.0,0...|
|16317.835583469037|16799.2|[148.0,91.0,0.0,0...|
|23500.506945738438|22499.2|[153.0,71.0,0.0,1...|
|18481.649008751654|17363.0|[163.0,6.0,0.0,0....|
|27748.883387062084|26533.9|[187.0,96.0,1.0,1...|
|28275.707007199715|28295.6|[202.0,18.0,1.0,0...|
|24367.995726818004|24058.9|[211.0,10.0,0.0,0...|
|26747.636314698957|27438.4|[229.0,49.0,1.0,0...|
| 35917.63878875479|34373.4|[302.0,23.0,1.0,0...|
| 38013.89273764825|37972.0|[350.0,65.0,0.0,0...|
| 42207.31617907483|39316.3|[369.0,25.0,0.0,0...|
| 46270.79047106901|45524.8|[371.0,29.0,1.0,0...|
| 45420.40686515109|44847.8|[388.0,11.0,1.0,0...|
|49139.835387514206|54382.3|[405.0,38.0,0.0,1...|


####   Our model is perfoming at its best very rare chances of this but the perfomance of the data on the test and train data is remarkable. The model is perfoming at at 100% .Means that we have tuned the hyperparameters properly but cannot also mean that our data is too perfect to perfom prediction on .This kind of data we cannot actualy find it in the industry however, if you ever perfom a linear regression task on a dataset using spark this is an ideal way to approach that problem

### DECISION TREE REGRESSOR

In [15]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'price')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 91431.3


In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming you already have the required imports and have trained the DecisionTreeRegressor

# Evaluate R2 score on train data
train_predictions = dt_model.transform(train_df)
train_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")
train_r2 = train_evaluator.evaluate(train_predictions)
print("R2 score on train data = %g" % train_r2)

# Evaluate R2 score on test data
test_predictions = dt_model.transform(test_df)
test_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")
test_r2 = test_evaluator.evaluate(test_predictions)
print("R2 score on test data = %g" % test_r2)

R2 score on train data = 0.999027
R2 score on test data = 0.998989


In [17]:
dt_model.featureImportances

SparseVector(16, {0: 1.0})

In [18]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'price', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'price', 'features').show()

+------------------+-------+--------------------+
|        prediction|  price|            features|
+------------------+-------+--------------------+
|127262.40527384522|13229.1|[123.0,61.0,0.0,0...|
| 97906.28473723664|15488.0|[128.0,38.0,0.0,1...|
|151952.45069790722|22670.7|[141.0,16.0,0.0,1...|
|144904.70835000914|17071.0|[143.0,27.0,0.0,0...|
|125369.92808927408|16799.2|[148.0,91.0,0.0,0...|
|153383.51573638432|22499.2|[153.0,71.0,0.0,1...|
|155024.78433768972|17363.0|[163.0,6.0,0.0,0....|
| 156321.1876818048|26533.9|[187.0,96.0,1.0,1...|
|176166.17372363282|28295.6|[202.0,18.0,1.0,0...|
|155417.36407659348|24058.9|[211.0,10.0,0.0,0...|
| 160866.8837550857|27438.4|[229.0,49.0,1.0,0...|
|144001.76410902606|34373.4|[302.0,23.0,1.0,0...|
|177090.16379258948|37972.0|[350.0,65.0,0.0,0...|
|164157.72139291026|39316.3|[369.0,25.0,0.0,0...|
|161809.16011527993|45524.8|[371.0,29.0,1.0,0...|
|170860.33814884422|44847.8|[388.0,11.0,1.0,0...|
|141449.04365600704|54382.3|[405.0,38.0,0.0,1...|


In [19]:
spark.stop()

##### As we can see the r2 score of the decision tree regressor is also perfoming fairly well.We can see that althogh our model does not have hyperparameters  its still perfoming well (you can look at how the decision tree model  works on its offical documentation)


#### That will be all to cover as matters to regression models techniques are concerned(hope we are still learning)Lets move on to our next type of supervised machine learning model- The classification model


## CLASSIFICATION MODEL

#### In the following machine learning project we will be creating a project that predicts whether a customer willl commit to a bank deposit based on some certain idependent variables.This can be used to assess the credit risk of certain customers



Input variables: age, job, marital, education, default, balance, housing, loan, contact, day, month, duration, campaign, pdays, previous, poutcome. taget variable :deposit(Yes/No)



In [26]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('customer-churn').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [27]:
df.show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
| 56|     admin.| married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
| 41| technician| married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
| 55|   services| married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
| 54|     admin.| married| tertiary|     no|    184|     no|  

#### In the following column we willl be loooking whether our data has missing or null values

In [28]:

from pyspark.sql.functions import col, sum
null_counts = df.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the result
null_counts.show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|      0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+



#####  in the following we will be assessing the target column variable where we will see if our target column is balanced if not then we will need to balance the taget variable as our model would then be biased to a certain class.Keep in mind that this is a very important step in matters regarding building classification models are concerned and when the target column is imbalanced  means that our model becomes biased to a ceetain class.

In [29]:

df.groupby('deposit').count().toPandas()

Unnamed: 0,deposit,count
0,no,5873
1,yes,5289


##### Here we will be looking at the summary statistics of our numeric columns . This comes in handy when dealing with outliers and also looking at the distribution of the various columns.

In [30]:

numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas()

Unnamed: 0,summary,age,balance,day,duration,campaign,pdays,previous
0,count,11162.0,11162.0,11162.0,11162.0,11162.0,11162.0,11162.0
1,mean,41.2319476796273,1528.5385235620856,15.658036194230425,371.9938183121304,2.508421429851281,51.33040673714388,0.8325568894463358
2,stddev,11.913369192215518,3225.413325946149,8.420739541006462,347.12838571630687,2.7220771816614824,108.75828197197715,2.292007218670508
3,min,18.0,-6847.0,1.0,2.0,1.0,-1.0,0.0
4,max,95.0,81204.0,31.0,3881.0,63.0,854.0,58.0


In [31]:

df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')
cols = df.columns
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



####  We will start with category indexing ,One Hot encoding and Vector Assembler-a feature transformer that merges multiple columns into a vector column The code is available at the databricks site and it indexes each categorical column using the string Indexer and converts the indexed categories into one hot encoded varibales. The resulting output has the binary vectors appended to the end of each row. We use the StringIndexer again to encode our labels to label indices. Next, we use the VectorAssembler to combine all the feature columns into a single vector column.

In [32]:

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

#####  Vector Assembler: A VectorAssembler is created to assemble all the feature columns, including the one-hot encoded categorical columns and the numeric columns, into a single feature vector named 'features'. The input columns are specified as assemblerInputs, which is a concatenation of the one-hot encoded categorical column names and the numeric column names. The VectorAssembler is added to the stages list.

##### The stages list will contain all the necessary stages of the data preprocessing pipeline, including string indexing, one-hot encoding, label string indexing, and vector assembling. These stages can then be used in a Pipeline for further processing or training a machine learning mod

##### we will then use a pipeline to chain multiple transformers and estimators together to specify our algorithm workflow

In [33]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [34]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,label,features,age,job,marital,education,default,balance,housing,loan,contact,duration,campaign,pdays,previous,poutcome,deposit
0,1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",59,admin.,married,secondary,no,2343,yes,no,unknown,1042,1,-1,0,unknown,yes
1,1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",56,admin.,married,secondary,no,45,no,no,unknown,1467,1,-1,0,unknown,yes
2,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",41,technician,married,secondary,no,1270,yes,no,unknown,1389,1,-1,0,unknown,yes
3,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...",55,services,married,secondary,no,2476,yes,no,unknown,579,1,-1,0,unknown,yes
4,1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",54,admin.,married,tertiary,no,184,no,no,unknown,673,2,-1,0,unknown,yes


In [35]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 7855
Test Dataset Count: 3307


#### DECISIONTREECLASSIFIER

##### Decision trees are widely used since they can easily interpret, handle categorical data, extend multiclass classifications, and capture non-linearities and feature interactions.

In [36]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+----------+-----+--------------+----------+--------------------+
|age|       job|label| rawPrediction|prediction|         probability|
+---+----------+-----+--------------+----------+--------------------+
| 33|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 49|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 52|management|  0.0|[520.0,1931.0]|       1.0|[0.21215830273357...|
| 53|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 58|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 32|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 57|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 52|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 46|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 31|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
+---+----------+-----+--------------+----------+--------------------+
only showing top 10 

In [37]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.7808118726917547


We can clearly see that our decision tree algorithm is perfom poorly .Howevwer we can improve its perfomance by using using ensemble methods such as a random Forest classifier .Uses multiple decision trees thus abetter perfomance is expected

#### RANDOMFORESTCLASSIFIER

In [38]:

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---+----------+-----+--------------------+----------+--------------------+
|age|       job|label|       rawPrediction|prediction|         probability|
+---+----------+-----+--------------------+----------+--------------------+
| 33|management|  0.0|[13.7531588468082...|       0.0|[0.68765794234041...|
| 49|management|  0.0|[13.4127191903380...|       0.0|[0.67063595951690...|
| 52|management|  0.0|[7.65584951235426...|       1.0|[0.38279247561771...|
| 53|management|  0.0|[11.6803776655343...|       0.0|[0.58401888327671...|
| 58|management|  0.0|[14.3894234094507...|       0.0|[0.71947117047253...|
| 32|management|  0.0|[13.8880450105089...|       0.0|[0.69440225052544...|
| 57|management|  0.0|[13.2319245839861...|       0.0|[0.66159622919930...|
| 52|management|  0.0|[16.8463444127981...|       0.0|[0.84231722063990...|
| 46|management|  0.0|[16.7636699072479...|       0.0|[0.83818349536239...|
| 31|management|  0.0|[13.3442134015346...|       0.0|[0.66721067007673...|
+---+-------

In [39]:

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 0.8789997993785139


We can see that our model is perfoming better when we use an ensemble model and the higher the area under ROC means a better perfomance and a better accuracy. That bring us to an end of this tuitorial in matters consaning mllib and machine learning matters.We have covered both classifcation and regresssion tasks at scale and now henceforth you can cover any machine learning model using spark .(Hope we still learning)

In [3]:
spark.stop()

## SPARKSTREAMING

Spark Structured Streaming makes it easy to build streaming applications and pipelines with the same and familiar Spark APIs.


Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.


#### Advantages of using spark for streaming

1. Easy to use - Spark Structured Streaming abstracts away complex streaming concepts such as incremental processing, checkpointing, and watermarks so that you can build streaming applications and pipelines without learning any new concepts or tools.
2. Unified batch and streaming APIs- Spark Structured Streaming provides the same structured APIs (DataFrames and Datasets) as Spark so that you don’t need to develop on or maintain two different technology stacks for batch and streaming. In addition, unified APIs make it easy to migrate your existing batch Spark jobs to streaming jobs.
3. Low latency and cost effective - Spark Structured Streaming uses the same underlying architecture as Spark so that you can take advantage of all the performance and cost optimizations built into the Spark engine. With Spark Structured Streaming, you can build low latency streaming applications and pipelines cost effectively.

For more information, you can refer to the documentation of Spark and learn more about Spark Streaming and its components in the following [link](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example).


##### Hope we have learned alot from this tutorial,keep on practising and with time you will understand this spark library and stat implementing it in code.Remember we  need to understand this framework to help us understand big data technologies in depth and actually manipulate  big data .A common way of handling big data that is becoming quite frequent is actually doing this computation of big data using clusters provided by cloud technologies such as AWS(ELASTIC MAP REDUCE) and AZURE(DATABRICKS).The computation is done by loading the data from an api and actually doing the manipulation in these  services or also loading data into buckets which act as datalakes and then doing the manipulation from there .Familliarise yourself with these cloud services and do these manipulations there ,it can handle big data effectively,fast and at scale.We  end it at there for now.  Hope we have learnt.


### Lets learn data