<a href="https://colab.research.google.com/github/paris3169/DataSciene-Projects/blob/main/pyspark_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np

In [None]:
!pip install pyspark

In [6]:
import pyspark

In [6]:
pyspark.__version__

'3.3.2'

In [5]:
#let's create a Spark Context
from pyspark import SparkContext

## Playing with Text with Pyspark Context and RDD  objects

In [9]:
%%writefile example.txt
first
second line
the third line
then a fourth line

Writing example.txt


In [6]:
sc=SparkContext()

In [7]:
sc

In [11]:
#let's define avaliable for the RDD text dataset
text_rdd=sc.textFile("example.txt")

In [12]:
text_rdd

example.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [17]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

In [13]:
#let's apply to the RDD text object a transformation using the map tranformation
words= text_rdd.map(lambda line:line.split())

In [15]:
words  #this is just another RDD onbject created by the map transformation

PythonRDD[4] at RDD at PythonRDD.scala:53

In [16]:
#let's apply now an action to this object to see some results
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

its' evident the difference when applying the map tranformation with the line split lambda expression

In [18]:
#using flatMap tranformation of RDD together with the collect action
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [74]:
%%writefile services.txt
#Eventid  Timestamp Customer  State Amount,
201 10/13/2017 100 NY 131 100
202 10/14/2017 101 NY 131 200
203 10/15/2017 102 CA 131 350
204 10/16/2017 103 NY 131 799
205 10/17/2017 104 CA 131 320
206 10/18/2017 105 CA 131 250

Overwriting services.txt


In [75]:
services=sc.textFile("services.txt")

In [76]:
services.take(3)

['#Eventid  Timestamp Customer  State Amount,',
 '201 10/13/2017 100 NY 131 100',
 '202 10/14/2017 101 NY 131 200']

In [77]:
services.map(lambda line: line.split()).take(3)

[['#Eventid', 'Timestamp', 'Customer', 'State', 'Amount,'],
 ['201', '10/13/2017', '100', 'NY', '131', '100'],
 ['202', '10/14/2017', '101', 'NY', '131', '200']]

In [78]:
clean=services.map(lambda line: line[1:] if line[0]=="#" else line)

In [79]:
clean=clean.map(lambda line:line.split())

In [80]:
clean.collect()

[['Eventid', 'Timestamp', 'Customer', 'State', 'Amount,'],
 ['201', '10/13/2017', '100', 'NY', '131', '100'],
 ['202', '10/14/2017', '101', 'NY', '131', '200'],
 ['203', '10/15/2017', '102', 'CA', '131', '350'],
 ['204', '10/16/2017', '103', 'NY', '131', '799'],
 ['205', '10/17/2017', '104', 'CA', '131', '320'],
 ['206', '10/18/2017', '105', 'CA', '131', '250']]

In [81]:
clean.map(lambda lst:(lst[3],lst[-1])).collect()

[('State', 'Amount,'),
 ('NY', '100'),
 ('NY', '200'),
 ('CA', '350'),
 ('NY', '799'),
 ('CA', '320'),
 ('CA', '250')]

In [82]:
pairs=clean.map(lambda lst:(lst[3],lst[-1]))

In [83]:
pairs.collect()

[('State', 'Amount,'),
 ('NY', '100'),
 ('NY', '200'),
 ('CA', '350'),
 ('NY', '799'),
 ('CA', '320'),
 ('CA', '250')]

In [84]:
#we can reduceByKey you need to have RDD arranged in such a way that first col is the key
rekey=pairs.reduceByKey(lambda amt1,amt2:float(amt1)+float(amt2))

In [85]:
rekey.collect()

[('State', 'Amount,'), ('NY', 1099.0), ('CA', 920.0)]

In [86]:
#get rid of (State,Amount)
rekey2=rekey.filter(lambda x: not x[0]=="State")

In [87]:
rekey2.collect()

[('NY', 1099.0), ('CA', 920.0)]

In [91]:
rekey3=rekey2.sortBy(lambda x:x[1],ascending=True)

In [92]:
rekey3.collect()

[('CA', 920.0), ('NY', 1099.0)]

## Additional Practice on PySpark using You Tube video lecture from Krish Naik

Additional practice following you tube tutorial:https://www.youtube.com/watch?v=7I4YZwaJgPs&list=PLZoTAELRMXVNjiiawhzZ0afHcPvC8jpcg&index=2


for understanding the difference between a Spark Context and a Spark Session please read this blog: https://www.ksolves.com/blog/big-data/spark/sparksession-vs-sparkcontext-what-are-the-differences#:~:text=In%20earlier%20versions%20of%20Spark,programming%20with%20DataFrame%20and%20Dataset.

##Playing with PySpark DataFrames

In [4]:
from pyspark.sql import SparkSession  #see that this is needed to work with DataFrames and this is way this is part of sql library

In [5]:
spark=SparkSession.builder.appName("Dataframe").getOrCreate()

In [6]:
spark

In [7]:
#read the dataframe
df_pyspark=spark.read.option("header","true").csv("test.csv")

In [8]:
df_pyspark.show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [9]:
df_pyspark.printSchema()  #this is similar to pandas .info() for a dataframe

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [10]:
df_pyspark=spark.read.csv("test.csv",header=True,inferSchema=True)  #this is the best option to read the file
df_pyspark

DataFrame[name: string, age: int, Experience: int, Salary: int]

In [11]:
df_pyspark.show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [12]:
df_pyspark.printSchema()  #this is to check the data type of the element of the database

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [13]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [14]:
df_pyspark.columns  #this is similar to df.columns in pandas

['name', 'age', 'Experience', 'Salary']

In [15]:
df_pyspark.head(3)  #this is similar to pandas df command .head()

[Row(name='Paris', age=54, Experience=10, Salary=20000),
 Row(name='Manu', age=49, Experience=8, Salary=50000),
 Row(name='Gabriele', age=19, Experience=9, Salary=15000)]

In [16]:
df_pyspark.show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [17]:
#to pick up a specific columns
df_pyspark.select("name").show()

+--------+
|    name|
+--------+
|   Paris|
|    Manu|
|Gabriele|
|  Davide|
|  Manesh|
|    null|
|    null|
+--------+



In [18]:
df_pyspark.select(["name","Experience"]).show()  #this is retutning a dataframe

+--------+----------+
|    name|Experience|
+--------+----------+
|   Paris|        10|
|    Manu|         8|
|Gabriele|         9|
|  Davide|         4|
|  Manesh|      null|
|    null|        10|
|    null|      null|
+--------+----------+



In [19]:
df_pyspark.dtypes

[('name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [20]:
#similar to pandas we can use the describe method
df_pyspark.describe().show()

+-------+------+------------------+------------------+------------------+
|summary|  name|               age|        Experience|            Salary|
+-------+------+------------------+------------------+------------------+
|  count|     5|                 6|                 5|                 5|
|   mean|  null|              34.5|               8.2|           32600.0|
| stddev|  null|15.578831791889916|2.4899799195977463|14621.901381147392|
|    min|Davide|                15|                 4|             15000|
|    max| Paris|                54|                10|             50000|
+-------+------+------------------+------------------+------------------+



In [21]:
#adding columns we use the withColumn method which take the new column name and the new column to add
df_pyspark=df_pyspark.withColumn("Experience after 2 years",df_pyspark["Experience"]+2)

In [22]:
df_pyspark.show()

+--------+----+----------+------+------------------------+
|    name| age|Experience|Salary|Experience after 2 years|
+--------+----+----------+------+------------------------+
|   Paris|  54|        10| 20000|                      12|
|    Manu|  49|         8| 50000|                      10|
|Gabriele|  19|         9| 15000|                      11|
|  Davide|  15|         4|  null|                       6|
|  Manesh|null|      null| 40000|                    null|
|    null|  34|        10| 38000|                      12|
|    null|  36|      null|  null|                    null|
+--------+----+----------+------+------------------------+



In [23]:
#to drop a column with can do using the drop method like in pandas
df_pyspark=df_pyspark.drop("Experience after 2 years")

In [24]:
df_pyspark.show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [25]:
#to rename the column
df_pyspark=df_pyspark.withColumnRenamed("name","Name")

In [26]:
df_pyspark.show()

+--------+----+----------+------+
|    Name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [27]:
from pyspark.sql import SparkSession

In [28]:
spark=SparkSession.builder.appName("Dataframe practice").getOrCreate()

In [29]:
df_pyspark=spark.read.csv("test.csv",header=True,inferSchema=True)

In [30]:
df_pyspark.show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [31]:
df=df_pyspark.drop("age")

In [32]:
type(df)

pyspark.sql.dataframe.DataFrame

In [33]:
df.show()

+--------+----------+------+
|    name|Experience|Salary|
+--------+----------+------+
|   Paris|        10| 20000|
|    Manu|         8| 50000|
|Gabriele|         9| 15000|
|  Davide|         4|  null|
|  Manesh|      null| 40000|
|    null|        10| 38000|
|    null|      null|  null|
+--------+----------+------+



In [34]:
df_pyspark.na.drop().show()

+--------+---+----------+------+
|    name|age|Experience|Salary|
+--------+---+----------+------+
|   Paris| 54|        10| 20000|
|    Manu| 49|         8| 50000|
|Gabriele| 19|         9| 15000|
+--------+---+----------+------+



In [35]:
df_pyspark.na.drop(how="all").show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [37]:
df_pyspark.na.drop(how="any",subset=["Experience"]).show()

+--------+---+----------+------+
|    name|age|Experience|Salary|
+--------+---+----------+------+
|   Paris| 54|        10| 20000|
|    Manu| 49|         8| 50000|
|Gabriele| 19|         9| 15000|
|  Davide| 15|         4|  null|
|    null| 34|        10| 38000|
+--------+---+----------+------+



In [39]:
df_pyspark.na.fill(value="missing value",subset=["age","Experience"]).show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [40]:
df_pyspark.show()

+--------+----+----------+------+
|    name| age|Experience|Salary|
+--------+----+----------+------+
|   Paris|  54|        10| 20000|
|    Manu|  49|         8| 50000|
|Gabriele|  19|         9| 15000|
|  Davide|  15|         4|  null|
|  Manesh|null|      null| 40000|
|    null|  34|        10| 38000|
|    null|  36|      null|  null|
+--------+----+----------+------+



In [41]:
df=df_pyspark.na.drop()

In [42]:
df.show()

+--------+---+----------+------+
|    name|age|Experience|Salary|
+--------+---+----------+------+
|   Paris| 54|        10| 20000|
|    Manu| 49|         8| 50000|
|Gabriele| 19|         9| 15000|
+--------+---+----------+------+



In [43]:
#filter operations using pyspark
#find our salary of pepople <= 40000
df.filter("Salary<=40000").show()

+--------+---+----------+------+
|    name|age|Experience|Salary|
+--------+---+----------+------+
|   Paris| 54|        10| 20000|
|Gabriele| 19|         9| 15000|
+--------+---+----------+------+



In [44]:
df.filter("Salary<=40000").select(["name","age"]).show()

+--------+---+
|    name|age|
+--------+---+
|   Paris| 54|
|Gabriele| 19|
+--------+---+



In [47]:
df.filter(df["Salary"]<=40000).show()

+--------+---+----------+------+
|    name|age|Experience|Salary|
+--------+---+----------+------+
|   Paris| 54|        10| 20000|
|Gabriele| 19|         9| 15000|
+--------+---+----------+------+



In [48]:
#just some practice on Spark MLlib the Spark Machine Laerning Library
training=spark.read.csv("/content/test.csv",header=True,inferSchema=True)

In [49]:
training.show()

+--------+---+----------+------+
|    name|age|Experience|Salary|
+--------+---+----------+------+
|   Paris| 54|        10| 20000|
|    Manu| 49|         8| 50000|
|Gabriele| 19|         9| 15000|
|  Davide| 15|         4| 45000|
|  Manesh| 31|         7| 40000|
|    Chus| 34|        10| 38000|
|   Khris| 36|         8| 25000|
+--------+---+----------+------+



In [50]:
training.columns

['name', 'age', 'Experience', 'Salary']

In [51]:
training.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [52]:
from pyspark.ml.feature import VectorAssembler

In [54]:
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent feature")

In [56]:
output=featureassembler.transform(training)

In [57]:
output.show()

+--------+---+----------+------+-------------------+
|    name|age|Experience|Salary|Independent feature|
+--------+---+----------+------+-------------------+
|   Paris| 54|        10| 20000|        [54.0,10.0]|
|    Manu| 49|         8| 50000|         [49.0,8.0]|
|Gabriele| 19|         9| 15000|         [19.0,9.0]|
|  Davide| 15|         4| 45000|         [15.0,4.0]|
|  Manesh| 31|         7| 40000|         [31.0,7.0]|
|    Chus| 34|        10| 38000|        [34.0,10.0]|
|   Khris| 36|         8| 25000|         [36.0,8.0]|
+--------+---+----------+------+-------------------+



In [58]:
finalized_data=output.select(["Independent feature","Salary"])
finalized_data.show()

+-------------------+------+
|Independent feature|Salary|
+-------------------+------+
|        [54.0,10.0]| 20000|
|         [49.0,8.0]| 50000|
|         [19.0,9.0]| 15000|
|         [15.0,4.0]| 45000|
|         [31.0,7.0]| 40000|
|        [34.0,10.0]| 38000|
|         [36.0,8.0]| 25000|
+-------------------+------+



In [59]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=finalized_data.randomSplit([0.75,0.25])

In [60]:
train_data.show()

+-------------------+------+
|Independent feature|Salary|
+-------------------+------+
|         [19.0,9.0]| 15000|
|         [36.0,8.0]| 25000|
|         [49.0,8.0]| 50000|
|        [54.0,10.0]| 20000|
+-------------------+------+



In [61]:
test_data.show()

+-------------------+------+
|Independent feature|Salary|
+-------------------+------+
|         [15.0,4.0]| 45000|
|         [31.0,7.0]| 40000|
|        [34.0,10.0]| 38000|
+-------------------+------+



In [69]:
regressor=LinearRegression(featuresCol="Independent feature",labelCol="Salary")

In [70]:
regres=regressor.fit(train_data)

In [71]:
regres.coefficients

DenseVector([643.8127, -11989.9666])

In [72]:
regres.intercept

106981.60535117141

In [74]:
pre_results=regres.evaluate(test_data)

In [77]:
pre_results.predictions.show()

+-------------------+------+------------------+
|Independent feature|Salary|        prediction|
+-------------------+------+------------------+
|         [15.0,4.0]| 45000| 68678.92976588674|
|         [31.0,7.0]| 40000|43010.033444816225|
|        [34.0,10.0]| 38000| 8971.571906354395|
+-------------------+------+------------------+

