# PySpark basics and getting started

In [3]:
import pyspark

In [2]:
import pandas as pd
pd.read_csv("test1.csv", sep = ';')

Unnamed: 0,Name,Age
0,hinat,31
1,suki,30
2,momo,29


In [4]:
from pyspark.sql import SparkSession # create pyspark session

In [6]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [7]:
spark

In [11]:
df_pyspark = spark.read.csv('test1.csv', sep = ';')

In [12]:
df_pyspark

DataFrame[_c0: string, _c1: string]

In [13]:
df_pyspark.show()

+-----+---+
|  _c0|_c1|
+-----+---+
| Name|Age|
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [26]:
df_pyspark = spark.read.option('header','true').csv('test1.csv', sep = ';')

In [27]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [29]:
df_pyspark.head(3)

[Row(Name='hinat', Age='31'),
 Row(Name='suki', Age='30'),
 Row(Name='momo', Age='29')]

In [31]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



In [32]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001F086109630>>

# PySpark Dataframes

In [33]:
from pyspark.sql import SparkSession

In [34]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [80]:
## read dataset
df_pyspark = spark.read.option('header', 'true').csv('test1.csv', sep = ';', inferSchema = True)

In [81]:
df_pyspark.show()

+-----+---+
| Name|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [82]:
## check schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [83]:
spark.read.csv('test1.csv', header = True, inferSchema = True, sep = ';').show()
spark.read.csv('test1.csv', header = True, inferSchema = True, sep = ';').printSchema()

+-----+---+
| Name|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [84]:
df_pyspark.columns

['Name', 'Age']

In [85]:
df_pyspark.head(3)

[Row(Name='hinat', Age=31), Row(Name='suki', Age=30), Row(Name='momo', Age=29)]

In [86]:
## selecting a column
df_pyspark.show()

+-----+---+
| Name|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [87]:
df_pyspark.select('Name').show()

+-----+
| Name|
+-----+
|hinat|
| suki|
| momo|
+-----+



In [88]:
df_pyspark.select(['Name', 'Age']).show()

+-----+---+
| Name|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [89]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int')]

In [90]:
df_pyspark.describe().show()

+-------+-----+----+
|summary| Name| Age|
+-------+-----+----+
|  count|    3|   3|
|   mean| null|30.0|
| stddev| null| 1.0|
|    min|hinat|  29|
|    max| suki|  31|
+-------+-----+----+



In [93]:
### adding columns in dataframe
df_pyspark = df_pyspark.withColumn('Exp', df_pyspark['Age']+10)

In [94]:
df_pyspark.show()

+-----+---+---+
| Name|Age|Exp|
+-----+---+---+
|hinat| 31| 41|
| suki| 30| 40|
| momo| 29| 39|
+-----+---+---+



In [95]:
### drop columns
df_pyspark = df_pyspark.drop('Exp')

In [96]:
df_pyspark.show()

+-----+---+
| Name|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [97]:
### rename columns
df_pyspark.withColumnRenamed('Name', 'Fname').show()

+-----+---+
|Fname|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [98]:
spark.stop 

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001F086109630>>

# Missing values

In [99]:
from pyspark.sql import SparkSession

In [100]:
spark = SparkSession.builder.appName('Missing value').getOrCreate()

In [103]:
df_pyspark = spark.read.csv('test2.csv', sep = ';', header = True, inferSchema = True)
df_pyspark.show()

+-----+----+----+------+
| Name| Age| Exp|Salary|
+-----+----+----+------+
|hinat|  31|  10| 30000|
| suki|  30|   8| 25000|
| momo|  29|   4| 20000|
|  zum|  24|   3| 20000|
| maki|  21|   1| 15000|
| null|  23|   2| 18000|
|  avo|null|null| 40000|
| null|  34|  10| 38000|
| null|  36|null|  null|
+-----+----+----+------+



In [105]:
## drop the columns
df_pyspark.drop('Name').show()

+----+----+------+
| Age| Exp|Salary|
+----+----+------+
|  31|  10| 30000|
|  30|   8| 25000|
|  29|   4| 20000|
|  24|   3| 20000|
|  21|   1| 15000|
|  23|   2| 18000|
|null|null| 40000|
|  34|  10| 38000|
|  36|null|  null|
+----+----+------+



In [106]:
df_pyspark.show()

+-----+----+----+------+
| Name| Age| Exp|Salary|
+-----+----+----+------+
|hinat|  31|  10| 30000|
| suki|  30|   8| 25000|
| momo|  29|   4| 20000|
|  zum|  24|   3| 20000|
| maki|  21|   1| 15000|
| null|  23|   2| 18000|
|  avo|null|null| 40000|
| null|  34|  10| 38000|
| null|  36|null|  null|
+-----+----+----+------+



In [107]:
df_pyspark.na.drop().show()

+-----+---+---+------+
| Name|Age|Exp|Salary|
+-----+---+---+------+
|hinat| 31| 10| 30000|
| suki| 30|  8| 25000|
| momo| 29|  4| 20000|
|  zum| 24|  3| 20000|
| maki| 21|  1| 15000|
+-----+---+---+------+



In [109]:
## any == how
df_pyspark.na.drop(how = 'any').show()

+-----+---+---+------+
| Name|Age|Exp|Salary|
+-----+---+---+------+
|hinat| 31| 10| 30000|
| suki| 30|  8| 25000|
| momo| 29|  4| 20000|
|  zum| 24|  3| 20000|
| maki| 21|  1| 15000|
+-----+---+---+------+



In [110]:
## threshold of 2 
df_pyspark.na.drop(how = 'any', thresh = 2).show()

+-----+----+----+------+
| Name| Age| Exp|Salary|
+-----+----+----+------+
|hinat|  31|  10| 30000|
| suki|  30|   8| 25000|
| momo|  29|   4| 20000|
|  zum|  24|   3| 20000|
| maki|  21|   1| 15000|
| null|  23|   2| 18000|
|  avo|null|null| 40000|
| null|  34|  10| 38000|
+-----+----+----+------+



In [111]:
## subset - drops null values from 'Exp'
df_pyspark.na.drop(how = 'any', subset = ['Exp']).show()

+-----+---+---+------+
| Name|Age|Exp|Salary|
+-----+---+---+------+
|hinat| 31| 10| 30000|
| suki| 30|  8| 25000|
| momo| 29|  4| 20000|
|  zum| 24|  3| 20000|
| maki| 21|  1| 15000|
| null| 23|  2| 18000|
| null| 34| 10| 38000|
+-----+---+---+------+



In [152]:
### fill missing value
df_pyspark.na.fill({'Name': 'Missing value'}).show()

+-------------+----+----+------+
|         Name| Age| Exp|Salary|
+-------------+----+----+------+
|        hinat|  31|  10| 30000|
|         suki|  30|   8| 25000|
|         momo|  29|   4| 20000|
|          zum|  24|   3| 20000|
|         maki|  21|   1| 15000|
|Missing value|  23|   2| 18000|
|          avo|null|null| 40000|
|Missing value|  34|  10| 38000|
|Missing value|  36|null|  null|
+-------------+----+----+------+



In [142]:
df_pyspark.show()

+-----+----+----+------+
| Name| Age| Exp|Salary|
+-----+----+----+------+
|hinat|  31|  10| 30000|
| suki|  30|   8| 25000|
| momo|  29|   4| 20000|
|  zum|  24|   3| 20000|
| maki|  21|   1| 15000|
| null|  23|   2| 18000|
|  avo|null|null| 40000|
| null|  34|  10| 38000|
| null|  36|null|  null|
+-----+----+----+------+



In [143]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age', 'Exp', 'Salary'],
    outputCols = ["{}_imputed".format(c) for c in ['Age', 'Exp', 'Salary']]).setStrategy('mean')

In [144]:
# add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-----+----+----+------+-----------+-----------+--------------+
| Name| Age| Exp|Salary|Age_imputed|Exp_imputed|Salary_imputed|
+-----+----+----+------+-----------+-----------+--------------+
|hinat|  31|  10| 30000|         31|         10|         30000|
| suki|  30|   8| 25000|         30|          8|         25000|
| momo|  29|   4| 20000|         29|          4|         20000|
|  zum|  24|   3| 20000|         24|          3|         20000|
| maki|  21|   1| 15000|         21|          1|         15000|
| null|  23|   2| 18000|         23|          2|         18000|
|  avo|null|null| 40000|         28|          5|         40000|
| null|  34|  10| 38000|         34|         10|         38000|
| null|  36|null|  null|         36|          5|         25750|
+-----+----+----+------+-----------+-----------+--------------+



In [154]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001F086109630>>

# Filter

In [184]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Filter').getOrCreate()

In [185]:
# got tired of using df_pyspark
df = spark.read.csv('test1.csv', header = True, inferSchema = True, sep = ';')

In [186]:
df.show()

+-----+---+
| Name|Age|
+-----+---+
|hinat| 31|
| suki| 30|
| momo| 29|
+-----+---+



In [187]:
## cats age less than 30
df.filter("Age<=30").show()

+----+---+
|Name|Age|
+----+---+
|suki| 30|
|momo| 29|
+----+---+



In [188]:
df.filter("Age<=30").select('Name').show()

+----+
|Name|
+----+
|suki|
|momo|
+----+



In [189]:
df.filter(~(df['Age']<=30) & (df['Name'] == 'suki')).show()

+----+---+
|Name|Age|
+----+---+
+----+---+



In [190]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001F091747BE0>>

# GroupBy and Aggregate Functions

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Group').getOrCreate()

In [3]:
spark

In [5]:
df = spark.read.csv('test3.csv', header = True, inferSchema = True, sep = ';')

In [7]:
df.show()

+-----+-----------+------+
| Name|Departments|Salary|
+-----+-----------+------+
|hinat|         DS| 10000|
| suki|        IOT|  5000|
| momo|         BD|  4000|
|  zum|         BD|  4000|
| maki|         DS|  3000|
|bussy|         DS| 20000|
|  avo|        IOT| 10000|
|Sunny|         BD|  5000|
|Sunny|         DS| 10000|
|Krish|         BD|  2000|
+-----+-----------+------+



In [8]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [10]:
## Groupby
df.groupBy('Name').sum().show()

+-----+-----------+
| Name|sum(Salary)|
+-----+-----------+
| suki|       5000|
|Sunny|      15000|
|  zum|       4000|
|Krish|       2000|
|bussy|      20000|
| maki|       3000|
| momo|       4000|
|hinat|      10000|
|  avo|      10000|
+-----+-----------+



In [14]:
## Groupby departments
df.groupBy('Departments').sum().show()

+-----------+-----------+
|Departments|sum(Salary)|
+-----------+-----------+
|        IOT|      15000|
|         BD|      15000|
|         DS|      43000|
+-----------+-----------+



In [15]:
df.groupBy('Departments').mean().show()

+-----------+-----------+
|Departments|avg(Salary)|
+-----------+-----------+
|        IOT|     7500.0|
|         BD|     3750.0|
|         DS|    10750.0|
+-----------+-----------+



In [16]:
df.groupBy('Departments').count().show()

+-----------+-----+
|Departments|count|
+-----------+-----+
|        IOT|    2|
|         BD|    4|
|         DS|    4|
+-----------+-----+



In [17]:
df.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [19]:
df.groupBy('Name').min().show()

+-----+-----------+
| Name|min(Salary)|
+-----+-----------+
| suki|       5000|
|Sunny|       5000|
|  zum|       4000|
|Krish|       2000|
|bussy|      20000|
| maki|       3000|
| momo|       4000|
|hinat|      10000|
|  avo|      10000|
+-----+-----------+



In [20]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000001A216C4C4C0>>