# Apache spark

Apache Spark is a simple, fast, scalable unified analytics engine for large scale data processing.Spark is currently one of the most popular tools for big data analytics. Although still used by many companies, hadoop is a slightly older technology compared to spark. In general, Hadoop MapReduce is slower than Spark because Hadoop writes data out to disk during intermediate steps.fundamental unit of spark is RDD.
we can use Spark with other programming languages like python,java,scala,R and SQL.

# Features of RDD

### in-memory computation
### Lazy evaluaton
### Fault-tolorence
### immutability
### partitioning
### persistence
### coarse gained operation 
like groupBy,map,filter etc.

# why we should use Spark over Hadoop?
Spark is 100 times faster in memory and 10 times faster on disk.Its also sorts the data much faster then hadoop.
Where as Hadoop stores data on multiple sources and processes it in batches via MapReduce.Spark is much faster on machine learning applications

# Spark Components
## Core
Spark Core contains the basic functionality of Spark, including components for task scheduling, memory management, fault recovery, interacting with storage systems, and more. Spark Core is also home to the API that defines resilient distributed datasets (RDDs), which are Spark's main programming abstraction.

## MLlib
MLlib is a scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives.


## Streaming
Enables processing of live streams of data. Spark Streaming provides an API for manipulating data streams that are similar to Spark Core’s RDD API.

## SQL

Package for working with structured data. It allows querying data via SQL as well as Apache Hive. It supports various sources of data, like Hive tables, Parquet, JSON, CSV, etc.

![](111.png)


# Pyspark

PySpark is a Spark API that allows you to interact with Spark functionality with Python. PySpark is a particularly flexible tool for exploratory big data analysis because it integrates with the rest of the Python data analysis ecosystem, including pandas , NumPy , and Matplotlib.


# how Spark runs:-

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext or sparksession object in your main program (called the driver program).

Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos, YARN or Kubernetes), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.
![](pyspark.png)


### Spark runs on hadoop,apachemesos,kubernetes,standalone and cloud.

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [9]:
import pyspark
import findspark

In [10]:
#to read a file we use pandas 
import pandas as pd
pd.read_csv("Book1.csv")

Unnamed: 0,Name,Age
0,sai,20
1,nikhil,24
2,juluri,23
3,sumith,21
4,kushal,12


In [11]:
#if we want to work with pyspark we always need to start a session
#in order to start a spark session
from pyspark.sql import SparkSession

In [14]:
#here we will create a variable with sparksession and give a name to our session
spark =SparkSession.builder.appName('dataframe').getOrCreate()
#a spark session will be created

FileNotFoundError: [WinError 2] The system cannot find the file specified

In [24]:
## to read a file
df_pyspark=spark.read.option('header','true').csv('book1.csv',inferSchema=True)

NameError: name 'spark' is not defined

In [25]:
## check the schema
df_pyspark.printSchema()

NameError: name 'df_pyspark' is not defined

In [None]:
#getting columns
df_pyspark.columns()
#picking up certain column
df_pyspark.select('name')
#picking up multiple columns
df_pyspark.select(['name','age'])

In [None]:
#we cant show the data as we do in pandas
df_pyspark['name'].show()

In [None]:
#to describe the data 
df_pyspark.describe().show()

In [None]:
# adding columns in dataframe
df_pyspark.withColumn("column name","value")
df_pyspark.withColumn("age after 2 years",df_pyspark['age']+2)

In [None]:
# dropping cloumns
df_pyspark.drop("column name")

In [None]:
#how to rename the columns
df_pyspark.withColumnRenamed('name','new name')

# Handling missing values

### dropping NAN values

In [None]:
#dropping NAN values
df_pyspark.na.drop().show() #if we dont give any values in the drop function the rows will be dropped

In [None]:
df_pyspark.na.drop(how="any").show() #if we have any value in a row then row will be dropped
df_pyspark.na.drop(how="all").show() #if all the values in a row are null then row will be dropped

#threshold
#checks the non null values according to threshold value
df_pyspark.na.drop(how="any",thresh=2).show() 

#subset
#when their is nan value in the selected subset(feature) then whole record will be deleted
df_pyspark.na.drop(how="any",subset=['Experience']).show() 

### filling missing values

In [None]:
df_pyspark.na.fill("value") #whereever the missing values are their it will fill with value
df_pyspark.na.fill("value",'Age') #if you want fill the nan values in the selected features

In [None]:
#imputer function
from pyspark.ml.feature import Imputer
imputer=Imputer(inputCols=['age','Experience','salary'],
               outputCols=["{}_imputed".format(c) for c in ['age','Experience','Salary']]).setStrategy("mean")

In [None]:
#adding imputed cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

# Filter Operations
a filter operation is very much important for data preprocessing technique. if we want to retrive some of the records based on some conditions.

In [None]:
#salary of the people less then 20k
df_pyspark.filter("Salary<=20000").show()

In [None]:
df_pyspark.filter("Salary<=20000").select(['Name','Age']).show() #for selecting the data

In [None]:
#writing two conditions using and and or operation
df_pyspark.filter((df_pyspark['Salary']<=20000) &
                  (df_pyspark['Salary']>=15000)).show()

In [None]:
#using not operation
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

# Pyspark GroupBy and Aggregrate Function

In [None]:
#groupby and aggregate function works together
df_pyspark.groupBy('Name').sum().show()  #summing up the salary based on the name

In [None]:
df_pyspark.groupBy('departments').sum().show() #summing up the salary by department

In [None]:
df_pyspark.groupBy('departments').count().show() #how many are working in the department

In [None]:
# Aggregate
df_pyspark.agg({'salary':'Sum'}).show() #giving key value pairs

# Mlib
vectorAssembler is used to group the independent features and it will make as a new features

In [None]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['age','experience'],outputcol='independent feature')

In [None]:
output=featureassembler.transform(df_pyspark)

In [None]:
output.show()

In [None]:
final_data=output.select('independent features','salary')

In [None]:
final_data.show()

In [None]:
from pyspark.ml.regression import LinearRegression #using linear regression
train_data,test_data=final_data.randomSplit([0.75,0.25])  #traintest split
regressor=LinearRegression(featuresCol='Independent Features',labelCol='Salary')
regressor=regressor.fit(train_data)

In [None]:
#coefficients
regressor.coefficients

In [None]:
#intercepts
regressor.intercept

In [None]:
#predictions
pred_results=regressor.evaluate(test_data)

In [None]:
pred_results.predictions.show()

In [None]:
pred_results.meanabsoluteerror,pred_results.meansquarederror