<a href="https://colab.research.google.com/github/leadbreak/algorithm_study/blob/main/pyspark_study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tutorial 01


In [78]:
!pip install pyspark



In [79]:
import pyspark

In [80]:
import pandas as pd

df = pd.DataFrame(
    {
        'Name':['Qscar', "Keith", "Luca"],
     'Age':[29,39,28]
     }
)

df

Unnamed: 0,Name,Age
0,Qscar,29
1,Keith,39
2,Luca,28


In [81]:
df.to_csv("test1.csv", index=False)

In [82]:
from pyspark.sql import SparkSession

In [83]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [84]:
spark

In [85]:
df_pyspark = spark.read.csv('test1.csv')
df_pyspark

DataFrame[_c0: string, _c1: string]

In [86]:
df_pyspark.show()

+-----+---+
|  _c0|_c1|
+-----+---+
| Name|Age|
|Qscar| 29|
|Keith| 39|
| Luca| 28|
+-----+---+



In [87]:
spark.read.option('header', 'true').csv('test1.csv')

DataFrame[Name: string, Age: string]

In [88]:
spark.read.option('header', 'true').csv('test1.csv').show()

+-----+---+
| Name|Age|
+-----+---+
|Qscar| 29|
|Keith| 39|
| Luca| 28|
+-----+---+



In [89]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [90]:
df_pyspark.head(3)

[Row(_c0='Name', _c1='Age'),
 Row(_c0='Qscar', _c1='29'),
 Row(_c0='Keith', _c1='39')]

In [91]:
df_pyspark.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



# Tutorial 2 : PySpark DataFrames

In [92]:
import pandas as pd

df = pd.DataFrame(
    {
        'Name':['Qscar', "Keith", "Luca"],
        'Age':[29,39,28],
        'Experience':[1,11,2]
     }
)

df.to_csv("test2.csv", index=False)

In [93]:
from pyspark.sql import SparkSession

In [94]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [95]:
spark

In [96]:
## read the dataset
df_pyspark = spark.read.option('header', 'true').csv('test2.csv')
df_pyspark

DataFrame[Name: string, Age: string, Experience: string]

In [97]:
spark.read.option('header', 'true').csv('test2.csv').show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Qscar| 29|         1|
|Keith| 39|        11|
| Luca| 28|         2|
+-----+---+----------+



In [98]:
### Check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)



In [99]:
df_pyspark.columns

['Name', 'Age', 'Experience']

In [100]:
df_pyspark.head(3)

[Row(Name='Qscar', Age='29', Experience='1'),
 Row(Name='Keith', Age='39', Experience='11'),
 Row(Name='Luca', Age='28', Experience='2')]

In [101]:
df_pyspark.select('Name').show()

+-----+
| Name|
+-----+
|Qscar|
|Keith|
| Luca|
+-----+



In [102]:
df_pyspark.select(['Name', 'Experience'] ).show()

+-----+----------+
| Name|Experience|
+-----+----------+
|Qscar|         1|
|Keith|        11|
| Luca|         2|
+-----+----------+



In [103]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'string'), ('Experience', 'string')]

In [104]:
df_pyspark.describe().show()

+-------+-----+-----------------+-----------------+
|summary| Name|              Age|       Experience|
+-------+-----+-----------------+-----------------+
|  count|    3|                3|                3|
|   mean| null|             32.0|4.666666666666667|
| stddev| null|6.082762530298219|5.507570547286102|
|    min|Keith|               28|                1|
|    max|Qscar|               39|                2|
+-------+-----+-----------------+-----------------+



In [105]:
### Adding columns in data frame
df_pyspark = df_pyspark.withColumn('Experience After 2 years',df_pyspark['Experience']+2)
df_pyspark.show()

+-----+---+----------+------------------------+
| Name|Age|Experience|Experience After 2 years|
+-----+---+----------+------------------------+
|Qscar| 29|         1|                     3.0|
|Keith| 39|        11|                    13.0|
| Luca| 28|         2|                     4.0|
+-----+---+----------+------------------------+



In [106]:
### Drop the columns
df_pyspark = df_pyspark.drop('Experience After 2 years')
df_pyspark.show()

+-----+---+----------+
| Name|Age|Experience|
+-----+---+----------+
|Qscar| 29|         1|
|Keith| 39|        11|
| Luca| 28|         2|
+-----+---+----------+



In [107]:
### Rename the columns
df_pyspark.withColumnRenamed('Name',"New Name").show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|   Qscar| 29|         1|
|   Keith| 39|        11|
|    Luca| 28|         2|
+--------+---+----------+



In [108]:
import pandas as pd
import numpy as np

df = pd.DataFrame(
    {
        'Name':['Qscar', "Keith", "Luca", "Helena", "Joy", np.nan],
        'Age':[29,np.nan,28, 25, np.nan, np.nan],
        'Experience':[1,11,2,np.nan,1, np.nan],
        'Salary':[30000,50000,20000,10000, np.nan, np.nan]
     }
)

df.to_csv("test3.csv", index=False)
df

Unnamed: 0,Name,Age,Experience,Salary
0,Qscar,29.0,1.0,30000.0
1,Keith,,11.0,50000.0
2,Luca,28.0,2.0,20000.0
3,Helena,25.0,,10000.0
4,Joy,,1.0,
5,,,,


In [109]:
sdf = spark.read.csv('test3.csv')
sdf.show()

+------+----+----------+-------+
|   _c0| _c1|       _c2|    _c3|
+------+----+----------+-------+
|  Name| Age|Experience| Salary|
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
|  null|null|      null|   null|
+------+----+----------+-------+



In [110]:
sdf = spark.read.csv('test3.csv',header=True, inferSchema=True)
sdf.show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
|  null|null|      null|   null|
+------+----+----------+-------+



In [111]:
### drop the columns
sdf.drop('Name').show()

+----+----------+-------+
| Age|Experience| Salary|
+----+----------+-------+
|29.0|       1.0|30000.0|
|null|      11.0|50000.0|
|28.0|       2.0|20000.0|
|25.0|      null|10000.0|
|null|       1.0|   null|
|null|      null|   null|
+----+----------+-------+



In [112]:
### dropna()
sdf.na.drop(how='any').show()

+-----+----+----------+-------+
| Name| Age|Experience| Salary|
+-----+----+----------+-------+
|Qscar|29.0|       1.0|30000.0|
| Luca|28.0|       2.0|20000.0|
+-----+----+----------+-------+



In [113]:
sdf.show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
|  null|null|      null|   null|
+------+----+----------+-------+



In [114]:
### parameter tuning
sdf.na.drop(how='all').show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
+------+----+----------+-------+



In [115]:
sdf.na.drop(how='any', thresh=2).show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
+------+----+----------+-------+



In [116]:
sdf.na.drop(how='any', thresh=2, subset=['Name','Experience', 'Salary']).show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
+------+----+----------+-------+



In [117]:
### Filling the Missing Value 

sdf.na.fill('Missing!', ['Experience', 'Age']).show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
|  null|null|      null|   null|
+------+----+----------+-------+



In [118]:
sdf.show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
| Qscar|29.0|       1.0|30000.0|
| Keith|null|      11.0|50000.0|
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      null|10000.0|
|   Joy|null|       1.0|   null|
|  null|null|      null|   null|
+------+----+----------+-------+



In [119]:
 from pyspark.ml.feature import Imputer
 
 imputer = Imputer(
     inputCols=['Age', 'Experience', 'Salary'],
     outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
      ).setStrategy("mean")

In [120]:
# Add imputation cols to df

imputer.fit(sdf).transform(sdf).show()

+------+----+----------+-------+------------------+------------------+--------------+
|  Name| Age|Experience| Salary|       Age_imputed|Experience_imputed|Salary_imputed|
+------+----+----------+-------+------------------+------------------+--------------+
| Qscar|29.0|       1.0|30000.0|              29.0|               1.0|       30000.0|
| Keith|null|      11.0|50000.0|27.333333333333332|              11.0|       50000.0|
|  Luca|28.0|       2.0|20000.0|              28.0|               2.0|       20000.0|
|Helena|25.0|      null|10000.0|              25.0|              3.75|       10000.0|
|   Joy|null|       1.0|   null|27.333333333333332|               1.0|       27500.0|
|  null|null|      null|   null|27.333333333333332|              3.75|       27500.0|
+------+----+----------+-------+------------------+------------------+--------------+



In [121]:
 from pyspark.ml.feature import Imputer
 
 imputer = Imputer(
     inputCols=['Age', 'Experience', 'Salary'],
     outputCols=["{}".format(c) for c in ['Age', 'Experience', 'Salary']]
      ).setStrategy("mean")

sdf = imputer.fit(sdf).transform(sdf)
sdf.show()

+------+------------------+----------+-------+
|  Name|               Age|Experience| Salary|
+------+------------------+----------+-------+
| Qscar|              29.0|       1.0|30000.0|
| Keith|27.333333333333332|      11.0|50000.0|
|  Luca|              28.0|       2.0|20000.0|
|Helena|              25.0|      3.75|10000.0|
|   Joy|27.333333333333332|       1.0|27500.0|
|  null|27.333333333333332|      3.75|27500.0|
+------+------------------+----------+-------+



In [122]:
sdf = sdf.na.fill('NoNamed', ['Name'])
sdf.show()

+-------+------------------+----------+-------+
|   Name|               Age|Experience| Salary|
+-------+------------------+----------+-------+
|  Qscar|              29.0|       1.0|30000.0|
|  Keith|27.333333333333332|      11.0|50000.0|
|   Luca|              28.0|       2.0|20000.0|
| Helena|              25.0|      3.75|10000.0|
|    Joy|27.333333333333332|       1.0|27500.0|
|NoNamed|27.333333333333332|      3.75|27500.0|
+-------+------------------+----------+-------+



In [123]:
sdf.filter("Salary<=20000").show()

+------+----+----------+-------+
|  Name| Age|Experience| Salary|
+------+----+----------+-------+
|  Luca|28.0|       2.0|20000.0|
|Helena|25.0|      3.75|10000.0|
+------+----+----------+-------+



In [124]:
sdf.filter("Salary<=20000").select(['Name', "Age"]).show()

+------+----+
|  Name| Age|
+------+----+
|  Luca|28.0|
|Helena|25.0|
+------+----+



In [125]:
sdf.filter( ( sdf['Salary'] <= 20000 ) & ( sdf['Salary'] >= 15000 ) ).show()

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|Luca|28.0|       2.0|20000.0|
+----+----+----------+-------+



In [126]:
sdf.filter( ( sdf['Salary'] > 20000 ) | ( sdf['Salary'] < 15000 ) ).show()

+-------+------------------+----------+-------+
|   Name|               Age|Experience| Salary|
+-------+------------------+----------+-------+
|  Qscar|              29.0|       1.0|30000.0|
|  Keith|27.333333333333332|      11.0|50000.0|
| Helena|              25.0|      3.75|10000.0|
|    Joy|27.333333333333332|       1.0|27500.0|
|NoNamed|27.333333333333332|      3.75|27500.0|
+-------+------------------+----------+-------+



## Groupby & Aggregation


In [127]:
import pandas as pd
import numpy as np

df = pd.DataFrame(
    {
        'Name':['Qscar','Qscar', "Keith",'Qscar', "Luca", "Helena","Helena","Helena", "Joy","Joy"],
        'Departments':["Data Science","Big Data","IoT","Data Science","Big Data","IoT","Data Science","Data Science","Big Data","Data Science"],
        'Salary':[10000,5000,4000,4000,3000,20000,10000,5000,10000,2000]
     }
)

df.to_csv("test3.csv", index=False)
df

Unnamed: 0,Name,Departments,Salary
0,Qscar,Data Science,10000
1,Qscar,Big Data,5000
2,Keith,IoT,4000
3,Qscar,Data Science,4000
4,Luca,Big Data,3000
5,Helena,IoT,20000
6,Helena,Data Science,10000
7,Helena,Data Science,5000
8,Joy,Big Data,10000
9,Joy,Data Science,2000


In [128]:
sdf = spark.read.csv('test3.csv',header=True, inferSchema=True)
sdf.show()

+------+------------+------+
|  Name| Departments|Salary|
+------+------------+------+
| Qscar|Data Science| 10000|
| Qscar|    Big Data|  5000|
| Keith|         IoT|  4000|
| Qscar|Data Science|  4000|
|  Luca|    Big Data|  3000|
|Helena|         IoT| 20000|
|Helena|Data Science| 10000|
|Helena|Data Science|  5000|
|   Joy|    Big Data| 10000|
|   Joy|Data Science|  2000|
+------+------------+------+



In [129]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Agg').getOrCreate()

In [130]:
spark

In [131]:
sdf = spark.read.csv("test3.csv", header=True, inferSchema=True)
sdf.show()

+------+------------+------+
|  Name| Departments|Salary|
+------+------------+------+
| Qscar|Data Science| 10000|
| Qscar|    Big Data|  5000|
| Keith|         IoT|  4000|
| Qscar|Data Science|  4000|
|  Luca|    Big Data|  3000|
|Helena|         IoT| 20000|
|Helena|Data Science| 10000|
|Helena|Data Science|  5000|
|   Joy|    Big Data| 10000|
|   Joy|Data Science|  2000|
+------+------------+------+



In [132]:
sdf.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [133]:
## Groupby
sdf.groupBy('Name')

<pyspark.sql.group.GroupedData at 0x7fa098efc6d0>

In [134]:
sdf.groupBy('Name').sum()

DataFrame[Name: string, sum(Salary): bigint]

In [135]:
### Grouped to find the maximum salary
sdf.groupBy('Name').sum().show()

+------+-----------+
|  Name|sum(Salary)|
+------+-----------+
| Keith|       4000|
| Qscar|      19000|
|  Luca|       3000|
|   Joy|      12000|
|Helena|      35000|
+------+-----------+



In [136]:
### Groupby Departments which maximum salary
sdf.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|         IoT|      24000|
|    Big Data|      18000|
|Data Science|      31000|
+------------+-----------+



In [137]:
sdf.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(Salary)|
+------------+-----------+
|         IoT|    12000.0|
|    Big Data|     6000.0|
|Data Science|     6200.0|
+------------+-----------+



In [138]:
sdf.groupby('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IoT|    2|
|    Big Data|    3|
|Data Science|    5|
+------------+-----+



In [139]:
sdf.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [140]:
sdf.agg({"Salary":"avg"}).show()

+-----------+
|avg(Salary)|
+-----------+
|     7300.0|
+-----------+



# Tutorial 3 : Pyspark ML

In [141]:
df = pd.DataFrame(
    {
        'Name':['Qscar',"Keith","Luca", "Helena","Joy"],
        'age':[29,39,27,25,25],
        'Salary':[10000,50000,8000,4000,3000],
        'experience':[2,12,2,1,1]
     }
)

df.to_csv("test3.csv", index=False)
df

Unnamed: 0,Name,age,Salary,experience
0,Qscar,29,10000,2
1,Keith,39,50000,12
2,Luca,27,8000,2
3,Helena,25,4000,1
4,Joy,25,3000,1


In [142]:
sdf = spark.read.csv("test3.csv", header=True, inferSchema=True)
sdf.show()

+------+---+------+----------+
|  Name|age|Salary|experience|
+------+---+------+----------+
| Qscar| 29| 10000|         2|
| Keith| 39| 50000|        12|
|  Luca| 27|  8000|         2|
|Helena| 25|  4000|         1|
|   Joy| 25|  3000|         1|
+------+---+------+----------+



In [143]:
training = sdf
training.show()

+------+---+------+----------+
|  Name|age|Salary|experience|
+------+---+------+----------+
| Qscar| 29| 10000|         2|
| Keith| 39| 50000|        12|
|  Luca| 27|  8000|         2|
|Helena| 25|  4000|         1|
|   Joy| 25|  3000|         1|
+------+---+------+----------+



In [144]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [145]:
training.columns

['Name', 'age', 'Salary', 'experience']

[Age,Experience] ----> new feature -----> independatn feature

In [146]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['age','experience'], outputCol='Independent Feature')
output = assembler.transform(training)
output.show()

+------+---+------+----------+-------------------+
|  Name|age|Salary|experience|Independent Feature|
+------+---+------+----------+-------------------+
| Qscar| 29| 10000|         2|         [29.0,2.0]|
| Keith| 39| 50000|        12|        [39.0,12.0]|
|  Luca| 27|  8000|         2|         [27.0,2.0]|
|Helena| 25|  4000|         1|         [25.0,1.0]|
|   Joy| 25|  3000|         1|         [25.0,1.0]|
+------+---+------+----------+-------------------+



In [147]:
output.columns

['Name', 'age', 'Salary', 'experience', 'Independent Feature']

In [148]:
finalized_data = output.select("Independent Feature", "Salary")
finalized_data.show()

+-------------------+------+
|Independent Feature|Salary|
+-------------------+------+
|         [29.0,2.0]| 10000|
|        [39.0,12.0]| 50000|
|         [27.0,2.0]|  8000|
|         [25.0,1.0]|  4000|
|         [25.0,1.0]|  3000|
+-------------------+------+



In [149]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol='Independent Feature', labelCol='Salary')
regressor = regressor.fit(train_data)

In [150]:
### Coefficients
regressor.coefficients

DenseVector([1000.0, 3000.0])

In [151]:
### Intercepts
regressor.intercept

-25000.000000001288

In [153]:
### Prediction
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+-------------------+------+------------------+
|Independent Feature|Salary|        prediction|
+-------------------+------+------------------+
|         [25.0,1.0]|  4000|2999.9999999999927|
|        [39.0,12.0]| 50000|49999.999999998945|
+-------------------+------+------------------+



In [154]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(500.00000000053114, 500000.0000000073)