## This is a notebook to start practicing with PySpark.

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create my session
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [4]:
spark

In [5]:
# Read the dataset
df_pyspark = spark.read.csv('table.csv')

In [6]:
df_pyspark.show()

+------+---+---------+
|   _c0|_c1|      _c2|
+------+---+---------+
|  name|age|     city|
| Paula| 30|   Madrid|
|  Fran| 32|Barcelona|
|Marina| 25| Valencia|
+------+---+---------+



We can see Spark has created a new header (c0, c1, c2) for my dataset, but I already have a header (name, age, city). 

Next, let's read the dataset again with the option **header=True** so it considers the first row as my header.

In [7]:
df_pyspark = spark.read.csv('table.csv', header=True)

In [8]:
df_pyspark.show()

+------+---+---------+
|  name|age|     city|
+------+---+---------+
| Paula| 30|   Madrid|
|  Fran| 32|Barcelona|
|Marina| 25| Valencia|
+------+---+---------+



In [9]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [10]:
# Head of dataset
df_pyspark.head(3)

[Row(name='Paula', age='30', city='Madrid'),
 Row(name='Fran', age='32', city='Barcelona'),
 Row(name='Marina', age='25', city='Valencia')]

In [11]:
# Check the schema (data types)
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- city: string (nullable = true)



By default, it has read all features' types as strings. Let's fix this by reading the csv file again but now using the option **inferSchema=True**.

In [12]:
df_pyspark = spark.read.csv('table.csv', header=True, inferSchema=True)

In [13]:
# Check the schema (data types)
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



Now we can see age has been read as integer type.

In [14]:
# Columns
df_pyspark.columns

['name', 'age', 'city']

In [15]:
# Select a column
df_pyspark.select('name')

DataFrame[name: string]

In [16]:
# Visualise the column
df_pyspark.select('name').show()

+------+
|  name|
+------+
| Paula|
|  Fran|
|Marina|
+------+



In [17]:
# Visualise multiple columns
df_pyspark.select('name','city').show()

+------+---------+
|  name|     city|
+------+---------+
| Paula|   Madrid|
|  Fran|Barcelona|
|Marina| Valencia|
+------+---------+



In [18]:
# Check the data types
df_pyspark.dtypes

[('name', 'string'), ('age', 'int'), ('city', 'string')]

In [19]:
# Describe function
df_pyspark.describe().show()

+-------+-----+-----------------+---------+
|summary| name|              age|     city|
+-------+-----+-----------------+---------+
|  count|    3|                3|        3|
|   mean| null|             29.0|     null|
| stddev| null|3.605551275463989|     null|
|    min| Fran|               25|Barcelona|
|    max|Paula|               32| Valencia|
+-------+-----+-----------------+---------+



## Adding, dropping and renaming columns

In [20]:
# Add a column
add_df_pyspark = df_pyspark.withColumn('age in 2 years', df_pyspark['age'] + 2)

In [21]:
add_df_pyspark.show()

+------+---+---------+--------------+
|  name|age|     city|age in 2 years|
+------+---+---------+--------------+
| Paula| 30|   Madrid|            32|
|  Fran| 32|Barcelona|            34|
|Marina| 25| Valencia|            27|
+------+---+---------+--------------+



In [22]:
# Drop a column
drop_df_pyspark = add_df_pyspark.drop('city')
drop_df_pyspark.show()

+------+---+--------------+
|  name|age|age in 2 years|
+------+---+--------------+
| Paula| 30|            32|
|  Fran| 32|            34|
|Marina| 25|            27|
+------+---+--------------+



In [23]:
# Rename the columns
df_pyspark.withColumnRenamed('name', 'New Name').show()

+--------+---+---------+
|New Name|age|     city|
+--------+---+---------+
|   Paula| 30|   Madrid|
|    Fran| 32|Barcelona|
|  Marina| 25| Valencia|
+--------+---+---------+



## Handling missing values

In [71]:
# Read the dataset
missing = spark.read.csv('missing_values.csv', header=True, inferSchema=True)

In [72]:
# Visualise dataset with null values
missing.show()

+-----+----+------+----------+
| name| age|  city|experience|
+-----+----+------+----------+
|Paula|  30|Madrid|         5|
| Toni|  35|  Roma|         1|
| Fran|  40|  null|      null|
| Erea|null|  null|      null|
| null|null|  null|      null|
+-----+----+------+----------+



## Dropping missing values

In [73]:
# Drop rows with at least 1 null value
missing.na.drop().show()

+-----+---+------+----------+
| name|age|  city|experience|
+-----+---+------+----------+
|Paula| 30|Madrid|         5|
| Toni| 35|  Roma|         1|
+-----+---+------+----------+



In [74]:
# Drop rows where all values are null
missing.na.drop(how='all').show()

+-----+----+------+----------+
| name| age|  city|experience|
+-----+----+------+----------+
|Paula|  30|Madrid|         5|
| Toni|  35|  Roma|         1|
| Fran|  40|  null|      null|
| Erea|null|  null|      null|
+-----+----+------+----------+



In [75]:
# Drop rows that have less than 'thresh' non-null values, so
# 'thresh' is the minimum non-null values we want in a row
print('Original')
missing.show()
print('After threshold dropping')
missing.na.drop(thresh=2).show()

Original
+-----+----+------+----------+
| name| age|  city|experience|
+-----+----+------+----------+
|Paula|  30|Madrid|         5|
| Toni|  35|  Roma|         1|
| Fran|  40|  null|      null|
| Erea|null|  null|      null|
| null|null|  null|      null|
+-----+----+------+----------+

After threshold dropping
+-----+---+------+----------+
| name|age|  city|experience|
+-----+---+------+----------+
|Paula| 30|Madrid|         5|
| Toni| 35|  Roma|         1|
| Fran| 40|  null|      null|
+-----+---+------+----------+



In [76]:
# Drop rows with null values only on a specific subset (column)
print('Original')
missing.show()
print('After subset dropping')
missing.na.drop(subset='name').show()

Original
+-----+----+------+----------+
| name| age|  city|experience|
+-----+----+------+----------+
|Paula|  30|Madrid|         5|
| Toni|  35|  Roma|         1|
| Fran|  40|  null|      null|
| Erea|null|  null|      null|
| null|null|  null|      null|
+-----+----+------+----------+

After subset dropping
+-----+----+------+----------+
| name| age|  city|experience|
+-----+----+------+----------+
|Paula|  30|Madrid|         5|
| Toni|  35|  Roma|         1|
| Fran|  40|  null|      null|
| Erea|null|  null|      null|
+-----+----+------+----------+



## Filling missing values

In [77]:
# Fill missing values of a specific column
missing.na.fill('Missing city', 'city').show()

+-----+----+------------+----------+
| name| age|        city|experience|
+-----+----+------------+----------+
|Paula|  30|      Madrid|         5|
| Toni|  35|        Roma|         1|
| Fran|  40|Missing city|      null|
| Erea|null|Missing city|      null|
| null|null|Missing city|      null|
+-----+----+------------+----------+



In [78]:
# Prepare imputation columns using Imputer according to a strategy (mean, in this case)
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ['age', 'experience'],
                  outputCols = ['{}_imputed'.format(c) for c in ['age', 'experience']]).setStrategy('mean')

In [79]:
# Dataset with missing values
print('Original dataset')
missing.show()
# Add imputed columns
print('After adding imputed columns')
imputer.fit(missing).transform(missing).show()

Original dataset
+-----+----+------+----------+
| name| age|  city|experience|
+-----+----+------+----------+
|Paula|  30|Madrid|         5|
| Toni|  35|  Roma|         1|
| Fran|  40|  null|      null|
| Erea|null|  null|      null|
| null|null|  null|      null|
+-----+----+------+----------+

After adding imputed columns
+-----+----+------+----------+-----------+------------------+
| name| age|  city|experience|age_imputed|experience_imputed|
+-----+----+------+----------+-----------+------------------+
|Paula|  30|Madrid|         5|         30|                 5|
| Toni|  35|  Roma|         1|         35|                 1|
| Fran|  40|  null|      null|         40|                 3|
| Erea|null|  null|      null|         35|                 3|
| null|null|  null|      null|         35|                 3|
+-----+----+------+----------+-----------+------------------+



## Filter operations

In [91]:
employee = spark.read.csv('employee.csv', header=True, inferSchema=True)

In [92]:
# Show the dataset
employee.show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| Paula| 30|        10| 30000|
|  Fran| 32|         8| 25000|
|Sandra| 55|         4| 20000|
|Carlos| 24|         3| 20000|
|  Alma| 21|         1| 15000|
| Bruno| 40|         2| 10000|
+------+---+----------+------+



In [93]:
# Find salaries less than or equal to 20000
employee.filter('Salary <= 20000').show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|Sandra| 55|         4| 20000|
|Carlos| 24|         3| 20000|
|  Alma| 21|         1| 15000|
| Bruno| 40|         2| 10000|
+------+---+----------+------+



In [94]:
# Find salaries less than or equal to 20000
# and show only name and salary columns
employee.filter('Salary <= 20000').select(['name', 'salary']).show()

+------+------+
|  name|salary|
+------+------+
|Sandra| 20000|
|Carlos| 20000|
|  Alma| 15000|
| Bruno| 10000|
+------+------+



In [95]:
# Find salaries less than or equal to 20000
# and age less than 35
employee.filter((employee['salary'] <= 20000) & 
                (employee['age'] < 35)).show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|Carlos| 24|         3| 20000|
|  Alma| 21|         1| 15000|
+------+---+----------+------+



In [96]:
# Find employees over 30 years old
# or with experience over 5
print('Original dataset')
employee.show()
print('Filtered dataset')
employee.filter((employee['age'] > 30) | 
                (employee['experience'] > 5)).show()

Original dataset
+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| Paula| 30|        10| 30000|
|  Fran| 32|         8| 25000|
|Sandra| 55|         4| 20000|
|Carlos| 24|         3| 20000|
|  Alma| 21|         1| 15000|
| Bruno| 40|         2| 10000|
+------+---+----------+------+

Filtered dataset
+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
| Paula| 30|        10| 30000|
|  Fran| 32|         8| 25000|
|Sandra| 55|         4| 20000|
| Bruno| 40|         2| 10000|
+------+---+----------+------+



In [97]:
# Find employees that are not named Paula
employee.filter(~(employee['name'] == 'Paula')).show()

+------+---+----------+------+
|  name|age|experience|salary|
+------+---+----------+------+
|  Fran| 32|         8| 25000|
|Sandra| 55|         4| 20000|
|Carlos| 24|         3| 20000|
|  Alma| 21|         1| 15000|
| Bruno| 40|         2| 10000|
+------+---+----------+------+



## Aggregate and GroupBy Functions

In [101]:
# Read the dataset
company = spark.read.csv('company.csv', header=True, inferSchema=True)

In [105]:
# Show dataset
company.show()

+------+---------------+------+
|  name|     department|salary|
+------+---------------+------+
| Paula|   Data Science| 10000|
|  Fran|       Big Data|  5000|
|Sandra|    Development|  4000|
|Carlos|       Big Data| 20000|
|  Alma|   Data Science| 15000|
| Bruno|Human Resources| 10000|
+------+---------------+------+



In [111]:
# Group dataset by department with sum of salaries
company.groupBy('department').sum().show()

+---------------+-----------+
|     department|sum(salary)|
+---------------+-----------+
|       Big Data|      25000|
|    Development|       4000|
|   Data Science|      25000|
|Human Resources|      10000|
+---------------+-----------+



In [118]:
# Group dataset by department with maximum salary
print('Original dataset')
company.show()
print('Grouped dataset')
company.groupBy('department').max().show()

Original dataset
+------+---------------+------+
|  name|     department|salary|
+------+---------------+------+
| Paula|   Data Science| 10000|
|  Fran|       Big Data|  5000|
|Sandra|    Development|  4000|
|Carlos|       Big Data| 20000|
|  Alma|   Data Science| 15000|
| Bruno|Human Resources| 10000|
+------+---------------+------+

Grouped dataset
+---------------+-----------+
|     department|max(salary)|
+---------------+-----------+
|       Big Data|      20000|
|    Development|       4000|
|   Data Science|      15000|
|Human Resources|      10000|
+---------------+-----------+



In [121]:
# Find the number of employees in each department
company.groupBy('department').count().show()

+---------------+-----+
|     department|count|
+---------------+-----+
|       Big Data|    2|
|    Development|    1|
|   Data Science|    2|
|Human Resources|    1|
+---------------+-----+



In [143]:
# Find sum of all salaries
company.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|      64000|
+-----------+



In [133]:
# Find number of distinct departments
from pyspark.sql.functions import countDistinct
company.agg(countDistinct('department')).show()

+-----------------+
|count(department)|
+-----------------+
|                4|
+-----------------+

